Creazione di una pipeline di dati con Kafka, Spark Streaming e Cassandra

1. Panoramica

Apache Kafka è una piattaforma scalabile, ad alte prestazioni e bassa latenza che consente di leggere e scrivere flussi di dati come un sistema di messaggistica . Possiamo iniziare con Kafka in Java abbastanza facilmente.

Spark Streaming fa parte della piattaforma Apache Spark che consente un'elaborazione scalabile, a velocità effettiva elevata e con tolleranza agli errori dei flussi di dati . Sebbene scritto in Scala, Spark offre API Java con cui lavorare.

Apache Cassandra è un archivio dati NoSQL distribuito ea colonne larghe . Maggiori dettagli su Cassandra sono disponibili nel nostro precedente articolo.

In questo tutorial, li combineremo per creare una pipeline di dati altamente scalabile e tollerante ai guasti per un flusso di dati in tempo reale .

2. Installazioni

Per iniziare, avremo bisogno di Kafka, Spark e Cassandra installati localmente sulla nostra macchina per eseguire l'applicazione. Vedremo come sviluppare una pipeline di dati utilizzando queste piattaforme man mano che procediamo.

Tuttavia, lasceremo tutte le configurazioni predefinite comprese le porte per tutte le installazioni che aiuteranno a far funzionare il tutorial senza problemi.

2.1. Kafka

L'installazione di Kafka sul nostro computer locale è abbastanza semplice e può essere trovata come parte della documentazione ufficiale. Useremo la versione 2.1.0 di Kafka.

Inoltre, Kafka richiede l'esecuzione di Apache Zookeeper, ma ai fini di questo tutorial, utilizzeremo l'istanza Zookeeper a nodo singolo fornita con Kafka.

Una volta che siamo riusciti ad avviare Zookeeper e Kafka in locale seguendo la guida ufficiale, possiamo procedere alla creazione del nostro argomento, denominato "messaggi":

 $KAFKA_HOME$\bin\windows\kafka-topics.bat --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic messages

Si noti che lo script precedente è per la piattaforma Windows, ma sono disponibili script simili anche per piattaforme simili a Unix.

2.2. Scintilla

Spark utilizza le librerie client di Hadoop per HDFS e YARN. Di conseguenza, può essere molto complicato assemblare le versioni compatibili di tutti questi . Tuttavia, il download ufficiale di Spark viene fornito preconfezionato con le versioni popolari di Hadoop. Per questo tutorial, utilizzeremo il pacchetto versione 2.3.0 "pre-costruito per Apache Hadoop 2.7 e versioni successive".

Una volta decompresso il pacchetto corretto di Spark, gli script disponibili possono essere utilizzati per inviare le applicazioni. Lo vedremo più avanti quando svilupperemo la nostra applicazione in Spring Boot.

2.3. Cassandra

DataStax rende disponibile una community edition di Cassandra per diverse piattaforme, incluso Windows. Possiamo scaricarlo e installarlo sulla nostra macchina locale molto facilmente seguendo la documentazione ufficiale. Useremo la versione 3.9.0.

Una volta che siamo riusciti a installare e avviare Cassandra sulla nostra macchina locale, possiamo procedere alla creazione del nostro spazio chiavi e tabella. Questo può essere fatto utilizzando la shell CQL fornita con la nostra installazione:

CREATE KEYSPACE vocabulary WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; USE vocabulary; CREATE TABLE words (word text PRIMARY KEY, count int);

Nota che abbiamo creato uno spazio dei nomi chiamato vocabolario e una tabella in esso chiamata parole con due colonne, parola e conteggio .

3. Dipendenze

Possiamo integrare le dipendenze Kafka e Spark nella nostra applicazione tramite Maven. Estrarremo queste dipendenze da Maven Central:

  • Core Spark
  • SQL Spark
  • Streaming di Spark
  • Streaming di Kafka Spark
  • Cassandra Spark
  • Cassandra Java Spark

E possiamo aggiungerli al nostro pom di conseguenza:

 org.apache.spark spark-core_2.11 2.3.0 provided   org.apache.spark spark-sql_2.11 2.3.0 provided   org.apache.spark spark-streaming_2.11 2.3.0 provided   org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.0   com.datastax.spark spark-cassandra-connector_2.11 2.3.0   com.datastax.spark spark-cassandra-connector-java_2.11 1.5.2 

Notare che alcune di queste dipendenze sono contrassegnate come fornite nell'ambito. Questo perché questi saranno resi disponibili dall'installazione di Spark in cui invieremo l'applicazione per l'esecuzione utilizzando spark-submit.

4. Spark Streaming - Strategie di integrazione Kafka

A questo punto, vale la pena parlare brevemente delle strategie di integrazione per Spark e Kafka.

Kafka ha introdotto una nuova API consumer tra le versioni 0.8 e 0.10. Pertanto, i pacchetti Spark Streaming corrispondenti sono disponibili per entrambe le versioni del broker. È importante scegliere il pacchetto giusto a seconda del broker disponibile e delle funzionalità desiderate.

4.1. Spark Streaming Kafka 0.8.0

La versione 0.8 è l'API di integrazione stabile con opzioni di utilizzo dell'approccio basato sul ricevitore o diretto . Non entreremo nei dettagli di questi approcci che possiamo trovare nella documentazione ufficiale. Un punto importante da notare qui è che questo pacchetto è compatibile con Kafka Broker versioni 0.8.2.1 o successive.

4.2. Spark Streaming Kafka 0.10

Questo è attualmente in uno stato sperimentale ed è compatibile solo con Kafka Broker versione 0.10.0 o successiva. Questo pacchetto offre solo l'approccio diretto, che ora fa uso della nuova API consumer di Kafka . Possiamo trovare maggiori dettagli su questo nella documentazione ufficiale. È importante sottolineare che non è compatibile con le versioni precedenti di Kafka Broker .

Tieni presente che per questo tutorial utilizzeremo il pacchetto 0.10. La dipendenza menzionata nella sezione precedente si riferisce solo a questo.

5. Sviluppo di una pipeline di dati

We'll create a simple application in Java using Spark which will integrate with the Kafka topic we created earlier. The application will read the messages as posted and count the frequency of words in every message. This will then be updated in the Cassandra table we created earlier.

Let's quickly visualize how the data will flow:

5.1. Getting JavaStreamingContext

Firstly, we'll begin by initializing the JavaStreamingContext which is the entry point for all Spark Streaming applications:

SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("WordCountingApp"); sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = new JavaStreamingContext( sparkConf, Durations.seconds(1));

5.2. Getting DStream from Kafka

Now, we can connect to the Kafka topic from the JavaStreamingContext:

Map kafkaParams = new HashMap(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList("messages"); JavaInputDStream
    
      messages = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams));
    

Please note that we've to provide deserializers for key and value here. For common data types like String, the deserializer is available by default. However, if we wish to retrieve custom data types, we'll have to provide custom deserializers.

Here, we've obtained JavaInputDStream which is an implementation of Discretized Streams or DStreams, the basic abstraction provided by Spark Streaming. Internally DStreams is nothing but a continuous series of RDDs.

5.3. Processing Obtained DStream

We'll now perform a series of operations on the JavaInputDStream to obtain word frequencies in the messages:

JavaPairDStream results = messages .mapToPair( record -> new Tuple2(record.key(), record.value()) ); JavaDStream lines = results .map( tuple2 -> tuple2._2() ); JavaDStream words = lines .flatMap( x -> Arrays.asList(x.split("\\s+")).iterator() ); JavaPairDStream wordCounts = words .mapToPair( s -> new Tuple2(s, 1) ).reduceByKey( (i1, i2) -> i1 + i2 );

5.4. Persisting Processed DStream into Cassandra

Finally, we can iterate over the processed JavaPairDStream to insert them into our Cassandra table:

wordCounts.foreachRDD( javaRdd -> { Map wordCountMap = javaRdd.collectAsMap(); for (String key : wordCountMap.keySet()) { List wordList = Arrays.asList(new Word(key, wordCountMap.get(key))); JavaRDD rdd = streamingContext.sparkContext().parallelize(wordList); javaFunctions(rdd).writerBuilder( "vocabulary", "words", mapToRow(Word.class)).saveToCassandra(); } } );

5.5. Running the Application

As this is a stream processing application, we would want to keep this running:

streamingContext.start(); streamingContext.awaitTermination();

6. Leveraging Checkpoints

In a stream processing application, it's often useful to retain state between batches of data being processed.

For example, in our previous attempt, we are only able to store the current frequency of the words. What if we want to store the cumulative frequency instead? Spark Streaming makes it possible through a concept called checkpoints.

We'll now modify the pipeline we created earlier to leverage checkpoints:

Please note that we'll be using checkpoints only for the session of data processing. This does not provide fault-tolerance. However, checkpointing can be used for fault tolerance as well.

There are a few changes we'll have to make in our application to leverage checkpoints. This includes providing the JavaStreamingContext with a checkpoint location:

streamingContext.checkpoint("./.checkpoint");

Here, we are using the local filesystem to store checkpoints. However, for robustness, this should be stored in a location like HDFS, S3 or Kafka. More on this is available in the official documentation.

Next, we'll have to fetch the checkpoint and create a cumulative count of words while processing every partition using a mapping function:

JavaMapWithStateDStream
    
      cumulativeWordCounts = wordCounts .mapWithState( StateSpec.function( (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2 output = new Tuple2(word, sum); state.update(sum); return output; } ) );
    

Once we get the cumulative word counts, we can proceed to iterate and save them in Cassandra as before.

Please note that while data checkpointing is useful for stateful processing, it comes with a latency cost. Hence, it's necessary to use this wisely along with an optimal checkpointing interval.

7. Understanding Offsets

If we recall some of the Kafka parameters we set earlier:

kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false);

These basically mean that we don't want to auto-commit for the offset and would like to pick the latest offset every time a consumer group is initialized. Consequently, our application will only be able to consume messages posted during the period it is running.

If we want to consume all messages posted irrespective of whether the application was running or not and also want to keep track of the messages already posted, we'll have to configure the offset appropriately along with saving the offset state, though this is a bit out of scope for this tutorial.

This is also a way in which Spark Streaming offers a particular level of guarantee like “exactly once”. This basically means that each message posted on Kafka topic will only be processed exactly once by Spark Streaming.

8. Deploying Application

We can deploy our application using the Spark-submit script which comes pre-packed with the Spark installation:

$SPARK_HOME$\bin\spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local[2] \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Tieni presente che il jar che creiamo utilizzando Maven dovrebbe contenere le dipendenze che non sono contrassegnate come fornite nell'ambito.

Dopo aver inviato questa domanda e aver pubblicato alcuni messaggi nell'argomento Kafka che abbiamo creato in precedenza, dovremmo vedere i conteggi cumulativi delle parole pubblicati nella tabella Cassandra che abbiamo creato in precedenza.

9. Conclusione

Per riassumere, in questo tutorial, abbiamo imparato a creare una semplice pipeline di dati utilizzando Kafka, Spark Streaming e Cassandra. Abbiamo anche imparato a sfruttare i checkpoint in Spark Streaming per mantenere lo stato tra i batch.

Come sempre, il codice per gli esempi è disponibile su GitHub.