Introduzione a KafkaStreams in Java

1. Panoramica

In questo articolo, esamineremo la libreria KafkaStreams .

KafkaStreams è progettato dai creatori di Apache Kafka . L'obiettivo principale di questo software è consentire ai programmatori di creare applicazioni di streaming efficienti e in tempo reale che potrebbero funzionare come microservizi.

KafkaStreams ci consente di utilizzare gli argomenti Kafka, analizzare o trasformare i dati e, potenzialmente, inviarli a un altro argomento Kafka.

Per dimostrare KafkaStreams, creeremo una semplice applicazione che legge frasi da un argomento, conta le occorrenze di parole e stampa il conteggio per parola.

È importante notare che la libreria KafkaStreams non è reattiva e non supporta le operazioni asincrone e la gestione della contropressione.

2. Dipendenza da Maven

Per iniziare a scrivere la logica di elaborazione del flusso utilizzando KafkaStreams, è necessario aggiungere una dipendenza a kafka-streams e kafka-client :

 org.apache.kafka kafka-streams 1.0.0   org.apache.kafka kafka-clients 1.0.0  

È inoltre necessario che Apache Kafka sia installato e avviato perché utilizzeremo un argomento Kafka. Questo argomento sarà l'origine dati per il nostro lavoro di streaming.

Possiamo scaricare Kafka e altre dipendenze richieste dal sito ufficiale.

3. Configurazione dell'input di KafkaStreams

La prima cosa che faremo è la definizione dell'argomento Kafka di input.

Possiamo usare lo strumento Confluent che abbiamo scaricato: contiene un server Kafka. Contiene anche il kafka-console-producer che possiamo usare per pubblicare messaggi su Kafka.

Per iniziare, eseguiamo il nostro cluster Kafka:

./confluent start

Una volta avviato Kafka, possiamo definire la nostra origine dati e il nome della nostra applicazione utilizzando APPLICATION_ID_CONFIG :

String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties(); streamsConfiguration.put( StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

Un parametro di configurazione cruciale è BOOTSTRAP_SERVER_CONFIG. Questo è l'URL della nostra istanza Kafka locale che abbiamo appena avviato:

private String bootstrapServers = "localhost:9092"; streamsConfiguration.put( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Successivamente, dobbiamo passare il tipo di chiave e il valore dei messaggi che verranno utilizzati da inputTopic:

streamsConfiguration.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

L'elaborazione del flusso è spesso con stato. Quando vogliamo salvare i risultati intermedi, dobbiamo specificare il parametro STATE_DIR_CONFIG .

Nel nostro test, stiamo usando un file system locale:

streamsConfiguration.put( StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); 

4. Creazione di una topologia di streaming

Una volta definito il nostro argomento di input, possiamo creare una topologia di streaming, ovvero una definizione di come gli eventi devono essere gestiti e trasformati.

Nel nostro esempio, vorremmo implementare un contatore di parole. Per ogni frase inviata a inputTopic, vogliamo dividerla in parole e calcolare l'occorrenza di ogni parola.

Possiamo usare un'istanza della classe KStreamsBuilder per iniziare a costruire la nostra topologia:

KStreamBuilder builder = new KStreamBuilder(); KStream textLines = builder.stream(inputTopic); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) .groupBy((key, word) -> word) .count();

Per implementare il conteggio delle parole, in primo luogo, dobbiamo dividere i valori utilizzando l'espressione regolare.

Il metodo split restituisce un array. Stiamo usando flatMapValues ​​() per appiattirlo. Altrimenti, ci ritroveremmo con un elenco di array e sarebbe scomodo scrivere codice utilizzando tale struttura.

Infine, stiamo aggregando i valori per ogni parola e chiamando il count () che calcolerà le occorrenze di una parola specifica.

5. Gestione dei risultati

Abbiamo già calcolato il conteggio delle parole dei nostri messaggi di input. Ora stampiamo i risultati sullo standard output usando il metodo foreach () :

wordCounts .foreach((w, c) -> System.out.println("word: " + w + " -> " + c));

In produzione, spesso questo lavoro di streaming potrebbe pubblicare l'output su un altro argomento Kafka.

Potremmo farlo usando il metodo to ():

String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String(); Serde longSerde = Serdes.Long(); wordCounts.to(stringSerde, longSerde, outputTopic);

La classe Serde ci fornisce serializzatori preconfigurati per i tipi Java che verranno utilizzati per serializzare gli oggetti in un array di byte. L'array di byte verrà quindi inviato all'argomento Kafka.

Stiamo usando String come chiave per il nostro argomento e Long come valore per il conteggio effettivo. Il metodo to () salverà i dati risultanti in outputTopic .

6. Avvio di KafkaStream Job

Fino a questo punto, abbiamo costruito una topologia che può essere eseguita. Tuttavia, il lavoro non è ancora iniziato.

Dobbiamo iniziare il nostro lavoro esplicitamente chiamando il metodo start () sull'istanza di KafkaStreams :

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); Thread.sleep(30000); streams.close();

Note that we are waiting 30 seconds for the job to finish. In a real-world scenario, that job would be running all the time, processing events from Kafka as they arrive.

We can test our job by publishing some events to our Kafka topic.

Let's start a kafka-console-producer and manually send some events to our inputTopic:

./kafka-console-producer --topic inputTopic --broker-list localhost:9092 >"this is a pony" >"this is a horse and pony" 

This way, we published two events to Kafka. Our application will consume those events and will print the following output:

word: -> 1 word: this -> 1 word: is -> 1 word: a -> 1 word: pony -> 1 word: -> 2 word: this -> 2 word: is -> 2 word: a -> 2 word: horse -> 1 word: and -> 1 word: pony -> 2

We can see that when the first message arrived, the word pony occurred only once. But when we sent the second message, the word pony happened for the second time printing: “word: pony -> 2″.

6. Conclusion

Questo articolo illustra come creare un'applicazione di elaborazione del flusso principale utilizzando Apache Kafka come origine dati e la libreria KafkaStreams come libreria di elaborazione del flusso.

Tutti questi esempi e frammenti di codice possono essere trovati nel progetto GitHub: questo è un progetto Maven, quindi dovrebbe essere facile da importare ed eseguire così com'è.