Introduzione ad Apache Kafka con la primavera

Persistenza in alto

Ho appena annunciato il nuovo corso Learn Spring , incentrato sui fondamenti di Spring 5 e Spring Boot 2:

>> SCOPRI IL CORSO

1. Panoramica

Apache Kafka è un sistema di elaborazione del flusso distribuito e tollerante ai guasti.

In questo articolo, tratteremo il supporto Spring per Kafka e il livello di astrazioni che fornisce rispetto alle API client Java native di Kafka.

Spring Kafka porta il semplice e tipico modello di programmazione dei modelli Spring con un KafkaTemplate e POJO basati sui messaggi tramite l' annotazione @KafkaListener .

2. Installazione e configurazione

Per scaricare e installare Kafka, fare riferimento alla guida ufficiale qui.

Dobbiamo anche aggiungere la dipendenza spring- kafka al nostro pom.xml:

 org.springframework.kafka spring-kafka 2.3.7.RELEASE 

L'ultima versione di questo artefatto può essere trovata qui.

La nostra applicazione di esempio sarà un'applicazione Spring Boot.

In questo articolo si presume che il server venga avviato utilizzando la configurazione predefinita e che nessuna porta del server venga modificata.

3. Configurazione degli argomenti

In precedenza eseguivamo strumenti della riga di comando per creare argomenti in Kafka come:

$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic

Ma con l'introduzione di AdminClient in Kafka, ora possiamo creare argomenti a livello di programmazione.

Dobbiamo aggiungere il bean KafkaAdmin Spring, che aggiungerà automaticamente argomenti per tutti i bean di tipo NewTopic:

@Configuration public class KafkaTopicConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin() { Map configs = new HashMap(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("baeldung", 1, (short) 1); } }

4. Produzione di messaggi

Per creare messaggi, prima dobbiamo configurare una ProducerFactory che imposta la strategia per la creazione di istanze di Kafka Producer .

Quindi abbiamo bisogno di un KafkaTemplate che avvolga un'istanza di Producer e fornisca metodi convenienti per inviare messaggi agli argomenti Kafka.

Le istanze del produttore sono thread-safe e quindi l'utilizzo di una singola istanza in un contesto dell'applicazione darà prestazioni più elevate. Di conseguenza, anche le istanze di KakfaTemplate sono thread-safe e si consiglia di utilizzare un'istanza.

4.1. Configurazione produttore

@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } }

4.2. Pubblicazione di messaggi

Possiamo inviare messaggi usando la classe KafkaTemplate :

@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String msg) { kafkaTemplate.send(topicName, msg); }

L' API di invio restituisce un oggetto ListenableFuture . Se vogliamo bloccare il thread di invio e ottenere il risultato sul messaggio inviato, possiamo chiamare l' API get dell'oggetto ListenableFuture . Il thread attenderà il risultato, ma rallenterà il produttore.

Kafka è una piattaforma di elaborazione del flusso veloce. Quindi è un'idea migliore gestire i risultati in modo asincrono in modo che i messaggi successivi non aspettino il risultato del messaggio precedente. Possiamo farlo tramite una richiamata:

public void sendMessage(String message) { ListenableFuture
    
      future = kafkaTemplate.send(topicName, message); future.addCallback(new ListenableFutureCallback
     
      () { @Override public void onSuccess(SendResult result) { System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } }); }
     
    

5. Consumo di messaggi

5.1. Configurazione consumatore

Per consumare messaggi, è necessario configurare ConsumerFactory e KafkaListenerContainerFactory . Una volta che questi bean sono disponibili nella fabbrica di bean Spring, i consumatori basati su POJO possono essere configurati utilizzando l' annotazione @KafkaListener .

L' annotazione @EnableKafka è richiesta sulla classe di configurazione per abilitare il rilevamento dell'annotazione @KafkaListener sui bean gestiti a molla:

@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put( ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory(props); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); return factory; } }

5.2. Consumo di messaggi

@KafkaListener(topics = "topicName", groupId = "foo") public void listenGroupFoo(String message) { System.out.println("Received Message in group foo: " + message); }

È possibile implementare più listener per un argomento , ciascuno con un ID di gruppo diverso. Inoltre, un consumatore può ascoltare messaggi da vari argomenti:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring supporta anche il recupero di una o più intestazioni di messaggio utilizzando l' annotazione @Header nel listener:

@KafkaListener(topics = "topicName") public void listenWithHeaders( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }

5.3. Consuming Messages from a Specific Partition

As you may have noticed, we had created the topic baeldung with only one partition. However, for a topic with multiple partitions, a @KafkaListener can explicitly subscribe to a particular partition of a topic with an initial offset:

@KafkaListener( topicPartitions = @TopicPartition(topic = "topicName", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "3", initialOffset = "0")}), containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }

Since the initialOffset has been sent to 0 in this listener, all the previously consumed messages from partitions 0 and three will be re-consumed every time this listener is initialized. If setting the offset is not required, we can use the partitions property of @TopicPartition annotation to set only the partitions without the offset:

@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. Adding Message Filter for Listeners

Listeners can be configured to consume specific types of messages by adding a custom filter. This can be done by setting a RecordFilterStrategy to the KafkaListenerContainerFactory:

@Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy( record -> record.value().contains("World")); return factory; }

A listener can then be configured to use this container factory:

@KafkaListener( topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter(String message) { System.out.println("Received Message in filtered listener: " + message); }

In this listener, all the messages matching the filter will be discarded.

6. Custom Message Converters

So far we have only covered sending and receiving Strings as messages. However, we can also send and receive custom Java objects. This requires configuring appropriate serializer in ProducerFactory and deserializer in ConsumerFactory.

Let's look at a simple bean class, which we will send as messages:

public class Greeting { private String msg; private String name; // standard getters, setters and constructor }

6.1. Producing Custom Messages

In this example, we will use JsonSerializer. Let's look at the code for ProducerFactory and KafkaTemplate:

@Bean public ProducerFactory greetingProducerFactory() { // ... configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory(configProps); } @Bean public KafkaTemplate greetingKafkaTemplate() { return new KafkaTemplate(greetingProducerFactory()); }

This new KafkaTemplate can be used to send the Greeting message:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. Consuming Custom Messages

Similarly, let's modify the ConsumerFactory and KafkaListenerContainerFactory to deserialize the Greeting message correctly:

@Bean public ConsumerFactory greetingConsumerFactory() { // ... return new DefaultKafkaConsumerFactory( props, new StringDeserializer(), new JsonDeserializer(Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(greetingConsumerFactory()); return factory; }

The spring-kafka JSON serializer and deserializer uses the Jackson library which is also an optional maven dependency for the spring-kafka project. So let's add it to our pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

Invece di utilizzare l'ultima versione di Jackson, si consiglia di utilizzare la versione aggiunta al pom.xml di spring-kafka.

Infine, dobbiamo scrivere un ascoltatore per consumare i messaggi di saluto :

@KafkaListener( topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") public void greetingListener(Greeting greeting) { // process greeting message }

7. Conclusione

In questo articolo, abbiamo trattato le basi del supporto Spring per Apache Kafka. Abbiamo dato una breve occhiata alle classi utilizzate per inviare e ricevere messaggi.

Il codice sorgente completo per questo articolo può essere trovato su GitHub. Prima di eseguire il codice, assicurati che il server Kafka sia in esecuzione e che gli argomenti siano creati manualmente.

Fondo di persistenza

Ho appena annunciato il nuovo corso Learn Spring , incentrato sui fondamenti di Spring 5 e Spring Boot 2:

>> SCOPRI IL CORSO