Introduzione a Hazelcast Jet

1. Introduzione

In questo tutorial, impareremo a conoscere Hazelcast Jet. È un motore di elaborazione dati distribuito fornito da Hazelcast, Inc. ed è basato su Hazelcast IMDG.

Se vuoi conoscere Hazelcast IMDG, ecco un articolo per iniziare.

2. Che cos'è Hazelcast Jet?

Hazelcast Jet è un motore di elaborazione dati distribuito che tratta i dati come flussi. Può elaborare i dati archiviati in un database o in file, nonché i dati trasmessi in streaming da un server Kafka.

Inoltre, può eseguire funzioni aggregate su flussi di dati infiniti dividendo i flussi in sottoinsiemi e applicando l'aggregazione su ogni sottoinsieme. Questo concetto è noto come windowing nella terminologia Jet.

Possiamo distribuire Jet in un cluster di macchine e quindi inviargli i nostri lavori di elaborazione dati. Jet farà in modo che tutti i membri del cluster elaborino automaticamente i dati. Ogni membro del cluster consuma una parte dei dati e ciò semplifica la scalabilità fino a qualsiasi livello di velocità effettiva.

Ecco i tipici casi d'uso di Hazelcast Jet:

  • Elaborazione del flusso in tempo reale
  • Elaborazione batch veloce
  • Elaborazione di Java 8 Stream in modo distribuito
  • Elaborazione dati in microservizi

3. Configurazione

Per configurare Hazelcast Jet nel nostro ambiente, dobbiamo solo aggiungere una singola dipendenza Maven al nostro pom.xml .

Ecco come lo facciamo:

 com.hazelcast.jet hazelcast-jet 4.2 

Includendo questa dipendenza verrà scaricato un file jar da 10 Mb che ci fornisce tutta l'infrastruttura necessaria per costruire una pipeline di elaborazione dati distribuita.

L'ultima versione di Hazelcast Jet può essere trovata qui.

4. Applicazione di esempio

Per saperne di più su Hazelcast Jet, creeremo un'applicazione di esempio che accetta un input di frasi e una parola da trovare in quelle frasi e restituisce il conteggio della parola specificata in quelle frasi.

4.1. La pipeline

Una pipeline costituisce il costrutto di base per un'applicazione Jet. L'elaborazione all'interno di una pipeline segue questi passaggi:

  • leggere i dati da una fonte
  • trasformare i dati
  • scrivere i dati in un sink

Per la nostra applicazione, la pipeline leggerà da un elenco distribuito , applicherà la trasformazione di raggruppamento e aggregazione e infine scriverà su una mappa distribuita .

Ecco come scriviamo la nostra pipeline:

private Pipeline createPipeLine() { Pipeline p = Pipeline.create(); p.readFrom(Sources.list(LIST_NAME)) .flatMap(word -> traverseArray(word.toLowerCase().split("\\W+"))) .filter(word -> !word.isEmpty()) .groupingKey(wholeItem()) .aggregate(counting()) .writeTo(Sinks.map(MAP_NAME)); return p; }

Una volta che abbiamo letto dalla fonte, attraversiamo i dati e li dividiamo nello spazio usando un'espressione regolare. Successivamente, filtriamo gli spazi vuoti.

Infine, raggruppiamo le parole, le aggreghiamo e scriviamo i risultati su una mappa.

4.2. Il lavoro

Ora che la nostra pipeline è definita, creiamo un lavoro per l'esecuzione della pipeline.

Ecco come scriviamo una funzione countWord che accetta parametri e restituisce il conteggio:

public Long countWord(List sentences, String word) { long count = 0; JetInstance jet = Jet.newJetInstance(); try { List textList = jet.getList(LIST_NAME); textList.addAll(sentences); Pipeline p = createPipeLine(); jet.newJob(p).join(); Map counts = jet.getMap(MAP_NAME); count = counts.get(word); } finally { Jet.shutdownAll(); } return count; }

Creiamo prima un'istanza Jet per creare il nostro lavoro e utilizzare la pipeline. Successivamente, copiamo l' elenco di input in un elenco distribuito in modo che sia disponibile su tutte le istanze.

Quindi inviamo un lavoro utilizzando la pipeline che abbiamo costruito sopra. Il metodo newJob () restituisce un lavoro eseguibile avviato da Jet in modo asincrono. Il metodo join attende il completamento del lavoro e genera un'eccezione se il lavoro viene completato con un errore.

Quando il lavoro viene completato, i risultati vengono recuperati in una mappa distribuita , come definito nella nostra pipeline. Quindi, otteniamo la mappa dall'istanza Jet e otteniamo il conteggio delle parole contro di essa.

Infine, abbiamo chiuso l'istanza Jet. È importante chiuderlo al termine della nostra esecuzione, poiché l' istanza Jet avvia i propri thread . Altrimenti, il nostro processo Java sarà ancora attivo anche dopo che il nostro metodo è terminato.

Ecco uno unit test che verifica il codice che abbiamo scritto per Jet:

@Test public void whenGivenSentencesAndWord_ThenReturnCountOfWord() { List sentences = new ArrayList(); sentences.add("The first second was alright, but the second second was tough."); WordCounter wordCounter = new WordCounter(); long countSecond = wordCounter.countWord(sentences, "second"); assertEquals(3, countSecond); }

5. conclusione

In questo articolo, abbiamo imparato a conoscere Hazelcast Jet. Per saperne di più su di esso e sulle sue caratteristiche, fare riferimento al manuale.

Come al solito, il codice per gli esempi usati in questo articolo può essere trovato su Github.