-1
Good afternoon! I need to consume Jsons from Kafka topics and convert to entities, and so be able to save in the database, Postgres, in case
Configuration of the Consumer:
@Configuration
@EnableKafka
public class ConsumerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
properties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "Kafka");
return properties;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(String.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public EventConsumer consumer() {
return new EventConsumer();
}
}
Consumer:
@Slf4j
@Data
public class EventConsumer {
private EntidadeComercialRepository entidadeComercialRepository;
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "${topic.entidades_comerciais}")
public void kafkaConsumer(@Payload EntidadesComerciais entidadesComerciais) {
entidadeComercialRepository.save(entidadesComerciais);
log.info("received payload='{}'", entidadesComerciais);
latch.countDown();
}
}
Part of the entity
@lombok.Data
@NoArgsConstructor
@AllArgsConstructor
@Entity(name = "ENTIDADES_COMERCIAIS")
@Builder
public class EntidadesComerciais {
@Id
@Column(name = "CODIGO_DA_ENTIDADE_COMERCIAL")
private Long codigoDaEntidadeComercial;
@Column(name = "NOME_DA_ENTIDADE_COMERCIAL")
private String nomeDaEntidadeComercial;
@Column(name = "NOME_COMERCIAL")
private String nomeComercial;
@Column(name = "TIPO_DA_ENTIDADE_COMERCIAL")
private String tipoDaEntidadeComercial;
...
I am using Spring Kafka and the bank is Postgres. The exception that is returning:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void br.com.dchristofolli.kafka.EventConsumer.kafkaConsumer(br.com.dchristofolli.kafka.entity.EntidadesComerciais)]
Bean [EventConsumer(entidadeComercialRepository=null, latch=java.util.concurrent.CountDownLatch@144e36ae[Count = 1])]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [br.com.dchristofolli.kafka.entity.EntidadesComerciais] for GenericMessage [payload={"topic":"ENTIDADES_COMERCIAIS"...