Test di Kafka e Spring Boot

1. Panoramica

Apache Kafka è un sistema di elaborazione del flusso potente, distribuito e tollerante ai guasti. In un tutorial precedente, abbiamo imparato a lavorare con Spring e Kafka.

In questo tutorial, svilupperemo il precedente e impareremo come scrivere test di integrazione indipendenti e affidabili che non si basano su un server Kafka esterno in esecuzione .

Innanzitutto, inizieremo ma esamineremo come utilizzare e configurare un'istanza incorporata di Kafka. Poi vedremo come possiamo utilizzare il popolare framework Testcontainer dai nostri test.

2. Dipendenze

Ovviamente, dovremo aggiungere la dipendenza standard spring- kafka al nostro pom.xml :

 org.springframework.kafka spring-kafka 2.6.3.RELEASE 

Quindi avremo bisogno di altre due dipendenze specifiche per i nostri test . Innanzitutto, aggiungeremo l' artefatto spring-kafka-test :

 org.springframework.kafka spring-kafka-test 2.6.3.RELEASE test 

Infine, aggiungeremo la dipendenza Testcontainers Kafka, disponibile anche su Maven Central:

 org.testcontainers kafka 1.15.0 test 

Ora che abbiamo configurato tutte le dipendenze necessarie, possiamo scrivere una semplice applicazione Spring Boot utilizzando Kafka.

3. Una semplice applicazione Kafka produttore-consumatore

Durante questo tutorial, il fulcro dei nostri test sarà una semplice applicazione Spring Boot Kafka produttore-consumatore.

Cominciamo definendo il nostro punto di ingresso dell'applicazione:

@SpringBootApplication @EnableAutoConfiguration public class KafkaProducerConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaProducerConsumerApplication.class, args); } }

Come possiamo vedere, questa è un'applicazione Spring Boot standard. Dove possibile, vogliamo utilizzare i valori di configurazione predefiniti . Con questo in mente, utilizziamo l' annotazione @EnableAutoConfiguration per configurare automaticamente la nostra applicazione.

3.1. Installazione del produttore

Quindi, consideriamo un bean producer che useremo per inviare messaggi a un determinato argomento Kafka:

@Component public class KafkaProducer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class); @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, String payload) { LOGGER.info("sending payload="{}" to topic="{}"", payload, topic); kafkaTemplate.send(topic, payload); } }

Il nostro bean KafkaProducer definito sopra è semplicemente un wrapper attorno alla classe KafkaTemplate . Questa classe fornisce operazioni thread-safe di alto livello, come l'invio di dati all'argomento fornito, che è esattamente ciò che facciamo nel nostro metodo send .

3.2. Configurazione del consumatore

Allo stesso modo, ora definiremo un semplice bean consumer che ascolterà un argomento Kafka e riceverà messaggi:

@Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); private CountDownLatch latch = new CountDownLatch(1); private String payload = null; @KafkaListener(topics = "${test.topic}") public void receive(ConsumerRecord consumerRecord) { LOGGER.info("received payload="{}"", consumerRecord.toString()); setPayload(consumerRecord.toString()); latch.countDown(); } public CountDownLatch getLatch() { return latch; } public String getPayload() { return payload; } }

Il nostro semplice consumatore utilizza l' annotazione @KafkaListener sul metodo di ricezione per ascoltare i messaggi su un determinato argomento. Vedremo più avanti come configuriamo il test.topic dai nostri test.

Inoltre, il metodo di ricezione memorizza il contenuto del messaggio nel nostro bean e decrementa il conteggio della variabile di latch . Questa variabile è un semplice campo contatore thread-safe che utilizzeremo in seguito dai nostri test per assicurarci di aver ricevuto correttamente un messaggio .

Ora che abbiamo implementato la nostra semplice applicazione Kafka che utilizza Spring Boot, vediamo come scrivere test di integrazione.

4. Una parola sui test

In generale, quando si scrivono test di integrazione puliti, non dovremmo dipendere da servizi esterni che potremmo non essere in grado di controllare o che potrebbero smettere improvvisamente di funzionare . Ciò potrebbe avere effetti negativi sui risultati dei nostri test.

Allo stesso modo, se dipendiamo da un servizio esterno, in questo caso, un broker Kafka in esecuzione, probabilmente non saremo in grado di configurarlo, controllarlo e smontarlo nel modo che vogliamo dai nostri test.

4.1. Proprietà dell'applicazione

Useremo un set molto leggero di proprietà di configurazione dell'applicazione dai nostri test. Definiremo queste proprietà nel nostro file src / test / resources / application.yml :

spring: kafka: consumer: auto-offset-reset: earliest group-id: baeldung test: topic: embedded-test-topic

Questo è il set minimo di proprietà di cui abbiamo bisogno quando lavoriamo con un'istanza incorporata di Kafka o un broker locale.

La maggior parte di questi sono autoesplicativi, ma quello che dovremmo evidenziare di particolare importanza è l' auto-offset-reset della proprietà del consumatore : il più presto . Questa proprietà garantisce che il nostro gruppo di consumatori riceva i messaggi che inviamo perché il contenitore potrebbe iniziare dopo che gli invii sono stati completati.

Inoltre, configuriamo una proprietà dell'argomento con il valore embedded-test-topic , che è l'argomento che useremo dai nostri test.

5. Test utilizzando Kafka incorporato

In questa sezione, daremo un'occhiata a come utilizzare un'istanza di Kafka in memoria per eseguire i nostri test. Questo è anche noto come Kafka incorporato.

La dipendenza spring-kafka-test che abbiamo aggiunto in precedenza contiene alcune utilità utili per assistere nel test della nostra applicazione. In particolare, contiene la classe EmbeddedKafkaBroker .

Con questo in mente, andiamo avanti e scriviamo il nostro primo test di integrazione:

@SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) class EmbeddedKafkaIntegrationTest { @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own simple KafkaProducer"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }

Esaminiamo le parti chiave del nostro test. Per prima cosa, iniziamo decorando la nostra classe di test con due annotazioni Spring piuttosto standard:

  • L' annotazione @SpringBootTest garantirà che il nostro test avvenga il contesto dell'applicazione Spring
  • Usiamo anche l' annotazione @DirtiesContext , che assicurerà che questo contesto venga pulito e reimpostato tra diversi test

Ecco la parte cruciale, usiamo l' annotazione @EmbeddedKafka per inserire un'istanza di un EmbeddedKafkaBroker nei nostri test . Inoltre, sono disponibili diverse proprietà che possiamo utilizzare per configurare il nodo Kafka incorporato:

  • partizioni : questo è il numero di partizioni utilizzate per argomento. Per mantenere le cose belle e semplici, ne vogliamo solo uno dai nostri test
  • brokerProperties : proprietà aggiuntive per il broker Kafka. Anche in questo caso manteniamo le cose semplici e specifichiamo un listener di testo semplice e un numero di porta

Successivamente, colleghiamo automaticamente le nostre classi consumer e producer e configuriamo un argomento per utilizzare il valore dalla nostra application.properties .

Per il pezzo finale del puzzle, inviamo semplicemente un messaggio al nostro argomento di prova e verifichiamo che il messaggio sia stato ricevuto e contenga il nome del nostro argomento di prova .

Quando eseguiremo il nostro test, vedremo tra il verboso output di Spring:

... 12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer - sending payload="Sending with our own simple KafkaProducer" to topic="embedded-test-topic" ... 12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.b.kafka.embedded.KafkaConsumer - received payload= 'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, serialized key size = -1, serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),  key = null, value = Invio con il nostro semplice KafkaProducer) ' 

Ciò conferma che il nostro test funziona correttamente. Eccezionale! Ora abbiamo un modo per scrivere test di integrazione indipendenti e autonomi utilizzando un broker Kafka in memoria .

6. Test di Kafka con TestContainer

Sometimes we might see small differences between a real external service vs. an embedded in-memory instance of a service that has been specifically provided for testing purposes. Although unlikely, it could also be that the port used from our test might be occupied, causing a failure.

With that in mind, in this section, we'll see a variation on our previous approach to testing using the Testcontainers framework. We'll see how to instantiate and manage an external Apache Kafka broker hosted inside a Docker container from our integration test.

Let's define another integration test which will be quite similar to the one we saw in the previous section:

@RunWith(SpringRunner.class) @Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest(classes = KafkaProducerConsumerApplication.class) @DirtiesContext public class KafkaTestContainersLiveTest { @ClassRule public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own controller"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }

Let's take a look at the differences this time around. We're declaring the kafka field, which is a standard JUnit @ClassRule. This field is an instance of the KafkaContainer class that will prepare and manage the lifecycle of our container running Kafka.

To avoid port clashes, Testcontainers allocates a port number dynamically when our docker container starts. For this reason, we provide a custom consumer and producer factory configuration using the class KafkaTestContainersConfiguration:

@Bean public Map consumerConfigs() { Map props = new HashMap(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // more standard configuration return props; } @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); // more standard configuration return new DefaultKafkaProducerFactory(configProps); }

We then reference this configuration via the @Import annotation at the beginning of our test.

The reason for this is that we need a way to inject the server address into our application, which as previously mentioned, is generated dynamically. We achieve this by calling the getBootstrapServers() method, which will return the bootstrap server location:

bootstrap.servers = [PLAINTEXT://localhost:32789]

Now when we run our test, we should see that Testcontainers does several things:

  • Checks our local Docker setup.
  • Pulls the confluentinc/cp-kafka:5.4.3 docker image if necessary
  • Starts a new container and waits for it to be ready
  • Finally, shuts down and deletes the container after our test finishes

Again, this is confirmed by inspecting the test output:

13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Creating container for image: confluentinc/cp-kafka:5.4.3 13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! A working integration test using a Kafka docker container.

7. Conclusion

In questo articolo, abbiamo appreso un paio di approcci per testare le applicazioni Kafka con Spring Boot. Nel primo approccio, abbiamo visto come configurare e utilizzare un broker Kafka in memoria locale.

Quindi abbiamo visto come utilizzare Testcontainer per configurare un broker Kafka esterno in esecuzione all'interno di un container Docker dai nostri test.

Come sempre, il codice sorgente completo dell'articolo è disponibile su GitHub.