How to convert a received JSON into my Spring Kafka Consumer into an entity ready to be saved in the bank?

Asked

Viewed 184 times

-1

Good afternoon! I need to consume Jsons from Kafka topics and convert to entities, and so be able to save in the database, Postgres, in case

Configuration of the Consumer:

@Configuration
@EnableKafka
public class ConsumerConfiguration {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "Kafka");
        return properties;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(String.class));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public EventConsumer consumer() {
        return new EventConsumer();
    }
}

Consumer:

@Slf4j
@Data
public class EventConsumer {
    private EntidadeComercialRepository entidadeComercialRepository;
    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "${topic.entidades_comerciais}")
    public void kafkaConsumer(@Payload EntidadesComerciais entidadesComerciais) {
        entidadeComercialRepository.save(entidadesComerciais);
        log.info("received payload='{}'", entidadesComerciais);
        latch.countDown();
    }
}

Part of the entity

@lombok.Data
@NoArgsConstructor
@AllArgsConstructor
@Entity(name = "ENTIDADES_COMERCIAIS")
@Builder
public class EntidadesComerciais {
    @Id
    @Column(name = "CODIGO_DA_ENTIDADE_COMERCIAL")
    private Long codigoDaEntidadeComercial;

    @Column(name = "NOME_DA_ENTIDADE_COMERCIAL")
    private String nomeDaEntidadeComercial;

    @Column(name = "NOME_COMERCIAL")
    private String nomeComercial;

    @Column(name = "TIPO_DA_ENTIDADE_COMERCIAL")
    private String tipoDaEntidadeComercial;
    ...

I am using Spring Kafka and the bank is Postgres. The exception that is returning:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void br.com.dchristofolli.kafka.EventConsumer.kafkaConsumer(br.com.dchristofolli.kafka.entity.EntidadesComerciais)]
Bean [EventConsumer(entidadeComercialRepository=null, latch=java.util.concurrent.CountDownLatch@144e36ae[Count = 1])]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [br.com.dchristofolli.kafka.entity.EntidadesComerciais] for GenericMessage [payload={"topic":"ENTIDADES_COMERCIAIS"...

1 answer

0

The exception itself already says

Cannot convert from [java.lang.String] to [br.com.dchristofolli.kafka.entity.EntidadesComerciais]

Look how you are instantiating your Consumer, passing an Deserializer of String. You must specify the class of your pojo in Deserializer, because it is for this type that the Object will be converted and passed to the method kafkaConsumer

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
            new StringDeserializer(),
            new JsonDeserializer<>(EntidadesComerciais.class));
}

Or you can get a String. Change the method signature to receive a String, and do the conversion in hand within the method, for example:

ObjectMapper objectMapper = new ObjectMapper()
objectMapper.readValue(carJson, EntidadesComerciais.class);

Reference: https://docs.spring.io/spring-kafka/reference/html/#messaging-message-Conversion

Browser other questions tagged

You are not signed in. Login or sign up in order to post.