How to convert a received JSON into my Spring Kafka Consumer into an entity ready to be saved in the bank?


Good afternoon! I need to consume Jsons from Kafka topics and convert to entities, and so be able to save in the database, Postgres, in case

Configuration of the Consumer:

public class ConsumerConfiguration {
    private String bootstrapServers;

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "Kafka");
        return properties;

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(String.class));

    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public EventConsumer consumer() {
        return new EventConsumer();


public class EventConsumer {
    private EntidadeComercialRepository entidadeComercialRepository;
    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "${topic.entidades_comerciais}")
    public void kafkaConsumer(@Payload EntidadesComerciais entidadesComerciais) {;"received payload='{}'", entidadesComerciais);

Part of the entity

public class EntidadesComerciais {
    private Long codigoDaEntidadeComercial;

    @Column(name = "NOME_DA_ENTIDADE_COMERCIAL")
    private String nomeDaEntidadeComercial;

    @Column(name = "NOME_COMERCIAL")
    private String nomeComercial;

    @Column(name = "TIPO_DA_ENTIDADE_COMERCIAL")
    private String tipoDaEntidadeComercial;

I am using Spring Kafka and the bank is Postgres. The exception that is returning:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void]
Bean [EventConsumer(entidadeComercialRepository=null, latch=java.util.concurrent.CountDownLatch@144e36ae[Count = 1])]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [] for GenericMessage [payload={"topic":"ENTIDADES_COMERCIAIS"...

1 answer


The exception itself already says

Cannot convert from [java.lang.String] to []

Look how you are instantiating your Consumer, passing an Deserializer of String. You must specify the class of your pojo in Deserializer, because it is for this type that the Object will be converted and passed to the method kafkaConsumer

public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
            new StringDeserializer(),
            new JsonDeserializer<>(EntidadesComerciais.class));

Or you can get a String. Change the method signature to receive a String, and do the conversion in hand within the method, for example:

ObjectMapper objectMapper = new ObjectMapper()
objectMapper.readValue(carJson, EntidadesComerciais.class);


