Esattamente una volta in elaborazione in Kafka con Java

1. Panoramica

In questo tutorial, vedremo come Kafka garantisce la consegna una volta sola tra le applicazioni producer e consumer tramite l'API transazionale di recente introduzione.

Inoltre, utilizzeremo questa API per implementare produttori e consumatori transazionali per ottenere una consegna end-to-end esattamente una volta in un esempio di WordCount.

2. Consegna dei messaggi a Kafka

A causa di vari errori, i sistemi di messaggistica non possono garantire la consegna dei messaggi tra le applicazioni del produttore e quelle del consumatore. A seconda di come le applicazioni client interagiscono con tali sistemi, è possibile la seguente semantica del messaggio:

  • Se un sistema di messaggistica non duplicherà mai un messaggio ma potrebbe perdere il messaggio occasionale, lo chiamiamo al massimo una volta
  • Oppure, se non mancherà mai un messaggio ma potrebbe duplicare il messaggio occasionale, lo chiamiamo almeno una volta
  • Ma, se consegna sempre tutti i messaggi senza duplicazioni, è esattamente una volta

Inizialmente, Kafka supportava solo il recapito dei messaggi at-most-once e at-least-once.

Tuttavia, l'introduzione delle transazioni tra i broker Kafka e le applicazioni client garantisce la consegna una volta sola a Kafka . Per capirlo meglio, esaminiamo rapidamente l'API del client transazionale.

3. Dipendenze di Maven

Per lavorare con l'API di transazione, avremo bisogno del client Java di Kafka nel nostro pom:

 org.apache.kafka kafka-clients 2.0.0 

4. Un ciclo transazionale consuma-trasforma-produce

Per il nostro esempio, consumeremo messaggi da un argomento di input, frasi .

Quindi, per ogni frase, conteremo ogni parola e invieremo il conteggio delle singole parole a un argomento di output, conta .

Nell'esempio, assumeremo che siano già disponibili dati transazionali nell'argomento frasi .

4.1. Un produttore consapevole delle transazioni

Quindi aggiungiamo prima un tipico produttore Kafka.

Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092");

Inoltre, però, dobbiamo specificare un transactional.id e abilitare l' idempotence :

producerProps.put("enable.idempotence", "true"); producerProps.put("transactional.id", "prod-1"); KafkaProducer producer = new KafkaProducer(producerProps);

Perché abbiamo attivato idempotence, Kafka utilizzerà questa transazione id come parte del suo algoritmo per deduplicare qualsiasi messaggio produttore invia , assicurando idempotenza.

In poche parole, se il produttore invia accidentalmente lo stesso messaggio a Kafka più di una volta, queste impostazioni gli consentono di notarlo.

Tutto ciò che dobbiamo fare è assicurarci che l'ID della transazione sia distinto per ogni produttore , sebbene coerente tra i riavvii.

4.2. Abilitazione del produttore per le transazioni

Una volta che siamo pronti, dobbiamo anche chiamare initTransaction per preparare il produttore a utilizzare le transazioni:

producer.initTransactions();

Questo registra il produttore con il broker come uno che può utilizzare le transazioni, identificandolo tramite il suo transactional.id e un numero di sequenza, o epoch . A sua volta, il broker li utilizzerà per scrivere in anticipo qualsiasi azione in un registro delle transazioni.

E di conseguenza, il broker rimuoverà da quel registro tutte le azioni che appartengono a un produttore con lo stesso ID transazione ed epoca precedente , presumendo che provengano da transazioni defunte.

4.3. Un consumatore consapevole delle transazioni

Quando consumiamo, possiamo leggere tutti i messaggi su una partizione di argomento in ordine. Tuttavia, possiamo indicare con isolation.level che dovremmo aspettare per leggere i messaggi transazionali fino a quando la transazione associata non è stata confermata :

Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put("group.id", "my-group-id"); consumerProps.put("enable.auto.commit", "false"); consumerProps.put("isolation.level", "read_committed"); KafkaConsumer consumer = new KafkaConsumer(consumerProps); consumer.subscribe(singleton(“sentences”));

L'utilizzo di un valore read_committed garantisce che non leggiamo alcun messaggio transazionale prima del completamento della transazione.

Il valore predefinito di isolation.level è read_uncommitted.

4.4. Consumare e trasformare per transazione

Ora che abbiamo il produttore e il consumatore configurati per scrivere e leggere a livello transazionale, possiamo consumare i record dal nostro argomento di input e contare ogni parola in ogni record:

ConsumerRecords records = consumer.poll(ofSeconds(60)); Map wordCountMap = records.records(new TopicPartition("input", 0)) .stream() .flatMap(record -> Stream.of(record.value().split(" "))) .map(word -> Tuple.of(word, 1)) .collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));

Nota che non c'è nulla di transazionale nel codice sopra. Ma, poiché abbiamo usato read_committed, significa che nessun messaggio che è stato scritto nell'argomento di input nella stessa transazione verrà letto da questo consumatore fino a quando non sarà stato scritto tutto.

Ora possiamo inviare il conteggio parole calcolato all'argomento di output.

Vediamo come possiamo produrre i nostri risultati, anche a livello transazionale.

4.5. Invia API

Per inviare i nostri conteggi come nuovi messaggi, ma nella stessa transazione, chiamiamo beginTransaction :

producer.beginTransaction();

Quindi, possiamo scrivere ciascuno nel nostro argomento "conteggi" con la chiave che è la parola e il conteggio è il valore:

wordCountMap.forEach((key,value) -> producer.send(new ProducerRecord("counts",key,value.toString())));

Note that because the producer can partition the data by the key, this means that transactional messages can span multiple partitions, each being read by separate consumers. Therefore, Kafka broker will store a list of all updated partitions for a transaction.

Note also that, within a transaction, a producer can use multiple threads to send records in parallel.

4.6. Committing Offsets

And finally, we need to commit our offsets that we just finished consuming. With transactions, we commit the offsets back to the input topic we read them from, like normal. Also though, we send them to the producer's transaction.

We can do all of this in a single call, but we first need to calculate the offsets for each topic partition:

Map offsetsToCommit = new HashMap(); for (TopicPartition partition : records.partitions()) { List
    
      partitionedRecords = records.records(partition); long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); }
    

Note that what we commit to the transaction is the upcoming offset, meaning we need to add 1.

Then we can send our calculated offsets to the transaction:

producer.sendOffsetsToTransaction(offsetsToCommit, "my-group-id");

4.7. Committing or Aborting the Transaction

And, finally, we can commit the transaction, which will atomically write the offsets to the consumer_offsets topic as well as to the transaction itself:

producer.commitTransaction();

This flushes any buffered message to the respective partitions. In addition, the Kafka broker makes all messages in that transaction available to the consumers.

Of course, if anything goes wrong while we are processing, for example, if we catch an exception, we can call abortTransaction:

try { // ... read from input topic // ... transform // ... write to output topic producer.commitTransaction(); } catch ( Exception e ) { producer.abortTransaction(); }

And drop any buffered messages and remove the transaction from the broker.

If we neither commit nor abort before the broker-configured max.transaction.timeout.ms, the Kafka broker will abort the transaction itself. The default value for this property is 900,000 milliseconds or 15 minutes.

5. Other consume-transform-produce Loops

What we've just seen is a basic consume-transform-produce loop which reads and writes to the same Kafka cluster.

Conversely, applications that must read and write to different Kafka clusters must use the older commitSync and commitAsync API. Typically, applications will store consumer offsets into their external state storage to maintain transactionality.

6. Conclusion

Per le applicazioni critiche per i dati, l'elaborazione end-to-end esattamente una volta è spesso imperativa.

In questo tutorial, abbiamo visto come usiamo Kafka per fare esattamente questo, usando le transazioni , e abbiamo implementato un esempio di conteggio delle parole basato sulle transazioni per illustrare il principio.

Sentiti libero di controllare tutti gli esempi di codice su GitHub.