Introduzione ad Apache Flink con Java

1. Panoramica

Apache Flink è un framework di elaborazione di Big Data che consente ai programmatori di elaborare una grande quantità di dati in modo molto efficiente e scalabile.

In questo articolo, introdurremo alcuni dei concetti principali dell'API e delle trasformazioni standard dei dati disponibili nell'API Java di Apache Flink . Lo stile fluido di questa API semplifica il lavoro con il costrutto centrale di Flink: la raccolta distribuita.

Per prima cosa, daremo un'occhiata alle trasformazioni dell'API DataSet di Flink e le useremo per implementare un programma di conteggio parole. Quindi daremo una breve occhiata all'API DataStream di Flink , che consente di elaborare flussi di eventi in tempo reale.

2. Dipendenza da Maven

Per iniziare, dobbiamo aggiungere le dipendenze Maven alle librerie flink-java e flink-test-utils :

 org.apache.flink flink-java 1.2.0   org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Concetti principali dell'API

Quando lavoriamo con Flink, dobbiamo conoscere un paio di cose relative alla sua API:

  • Ogni programma Flink esegue trasformazioni su raccolte di dati distribuite. Sono disponibili numerose funzioni per la trasformazione dei dati, tra cui filtraggio, mappatura, unione, raggruppamento e aggregazione
  • Un'operazione sink in Flink avvia l'esecuzione di un flusso per produrre il risultato desiderato del programma , come salvare il risultato nel file system o stamparlo sullo standard output
  • Le trasformazioni Flink sono pigre, il che significa che non vengono eseguite finché non viene richiamata un'operazione sink
  • L'API Apache Flink supporta due modalità di operazioni: batch e in tempo reale. Se hai a che fare con un'origine dati limitata che può essere elaborata in modalità batch, utilizzerai l' API DataSet . Se desideri elaborare flussi illimitati di dati in tempo reale, devi utilizzare l' API DataStream

4. Trasformazioni API DataSet

Il punto di ingresso al programma Flink è un'istanza della classe ExecutionEnvironment , che definisce il contesto in cui viene eseguito un programma.

Creiamo un ExecutionEnvironment per avviare la nostra elaborazione:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Notare che quando si avvia l'applicazione sulla macchina locale, verrà eseguita l'elaborazione sulla JVM locale. Se si desidera avviare l'elaborazione su un cluster di macchine, è necessario installare Apache Flink su quelle macchine e configurare di conseguenza ExecutionEnvironment .

4.1. Creazione di un DataSet

Per iniziare a eseguire le trasformazioni dei dati, dobbiamo fornire i dati al nostro programma.

Creiamo un'istanza della classe DataSet utilizzando il nostro ExecutionEnvironement :

DataSet amounts = env.fromElements(1, 29, 40, 50);

È possibile creare un DataSet da più origini, come Apache Kafka, un file CSV o praticamente qualsiasi altra origine dati.

4.2. Filtra e riduci

Dopo aver creato un'istanza della classe DataSet , è possibile applicarvi trasformazioni.

Supponiamo che tu voglia filtrare i numeri che sono al di sopra di una certa soglia e poi sommarli tutti . Puoi usare le trasformazioni filter () e reduce () per ottenere ciò:

int threshold = 30; List collect = amounts .filter(a -> a > threshold) .reduce((integer, t1) -> integer + t1) .collect(); assertThat(collect.get(0)).isEqualTo(90); 

Notare che il metodo collect () è un'operazione sink che attiva le trasformazioni effettive dei dati.

4.3. Carta geografica

Supponiamo che tu abbia un DataSet di oggetti Person :

private static class Person { private int age; private String name; // standard constructors/getters/setters }

Successivamente, creiamo un DataSet di questi oggetti:

DataSet personDataSource = env.fromCollection( Arrays.asList( new Person(23, "Tom"), new Person(75, "Michael")));

Supponiamo di voler estrarre solo il campo età da ogni oggetto della raccolta. È possibile utilizzare la trasformazione map () per ottenere solo un campo specifico della classe Person :

List ages = personDataSource .map(p -> p.age) .collect(); assertThat(ages).hasSize(2); assertThat(ages).contains(23, 75);

4.4. Aderire

Quando hai due set di dati, potresti voler unirli su un campo ID . Per questo, puoi usare la trasformazione join () .

Creiamo raccolte di transazioni e indirizzi di un utente:

Tuple3 address = new Tuple3(1, "5th Avenue", "London"); DataSet
    
      addresses = env.fromElements(address); Tuple2 firstTransaction = new Tuple2(1, "Transaction_1"); DataSet
     
       transactions = env.fromElements(firstTransaction, new Tuple2(12, "Transaction_2")); 
     
    

Il primo campo in entrambe le tuple è di tipo Integer , e questo è un campo id su cui vogliamo unire entrambi i set di dati.

Per eseguire l'effettiva logica di unione, dobbiamo implementare un'interfaccia KeySelector per indirizzo e transazione:

private static class IdKeySelectorTransaction implements KeySelector
    
      { @Override public Integer getKey(Tuple2 value) { return value.f0; } } private static class IdKeySelectorAddress implements KeySelector
     
       { @Override public Integer getKey(Tuple3 value) { return value.f0; } }
     
    

Ogni selettore restituisce solo il campo su cui deve essere eseguito il join.

Sfortunatamente, non è possibile utilizzare espressioni lambda qui perché Flink necessita di informazioni sul tipo generico.

Successivamente, implementiamo la logica di fusione utilizzando quei selettori:

List
    
     > joined = transactions.join(addresses) .where(new IdKeySelectorTransaction()) .equalTo(new IdKeySelectorAddress()) .collect(); assertThat(joined).hasSize(1); assertThat(joined).contains(new Tuple2(firstTransaction, address)); 
    

4.5. Ordinare

Supponiamo che tu abbia la seguente raccolta di Tuple2:

Tuple2 secondPerson = new Tuple2(4, "Tom"); Tuple2 thirdPerson = new Tuple2(5, "Scott"); Tuple2 fourthPerson = new Tuple2(200, "Michael"); Tuple2 firstPerson = new Tuple2(1, "Jack"); DataSet
    
      transactions = env.fromElements( fourthPerson, secondPerson, thirdPerson, firstPerson); 
    

Se vuoi ordinare questa raccolta in base al primo campo della tupla, puoi usare la trasformazione sortPartitions () :

List
    
      sorted = transactions .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING) .collect(); assertThat(sorted) .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);
    

5. Conteggio parole

Il problema del conteggio delle parole è comunemente usato per mostrare le capacità dei framework di elaborazione dei Big Data. La soluzione di base prevede il conteggio delle occorrenze di parole in un input di testo. Usiamo Flink per implementare una soluzione a questo problema.

Come primo passo nella nostra soluzione, creiamo una classe LineSplitter che divide il nostro input in token (parole), raccogliendo per ogni token una Tuple2 di coppie chiave-valore. In ciascuna di queste tuple, la chiave è una parola trovata nel testo e il valore è quello intero (1).

This class implements the FlatMapFunction interface that takes String as an input and produces a Tuple2:

public class LineSplitter implements FlatMapFunction
    
      { @Override public void flatMap(String value, Collector
     
       out) { Stream.of(value.toLowerCase().split("\\W+")) .filter(t -> t.length() > 0) .forEach(token -> out.collect(new Tuple2(token, 1))); } }
     
    

We call the collect() method on the Collector class to push data forward in the processing pipeline.

Our next and final step is to group the tuples by their first elements (words) and then perform a sum aggregate on the second elements to produce a count of the word occurrences:

public static DataSet
    
      startWordCount( ExecutionEnvironment env, List lines) throws Exception { DataSet text = env.fromCollection(lines); return text.flatMap(new LineSplitter()) .groupBy(0) .aggregate(Aggregations.SUM, 1); }
    

We are using three types of the Flink transformations: flatMap(), groupBy(), and aggregate().

Let's write a test to assert that the word count implementation is working as expected:

List lines = Arrays.asList( "This is a first sentence", "This is a second sentence with a one word"); DataSet
    
      result = WordCount.startWordCount(env, lines); List
     
       collect = result.collect(); assertThat(collect).containsExactlyInAnyOrder( new Tuple2("a", 3), new Tuple2("sentence", 2), new Tuple2("word", 1), new Tuple2("is", 2), new Tuple2("this", 2), new Tuple2("second", 1), new Tuple2("first", 1), new Tuple2("with", 1), new Tuple2("one", 1));
     
    

6. DataStream API

6.1. Creating a DataStream

Apache Flink also supports the processing of streams of events through its DataStream API. If we want to start consuming events, we first need to use the StreamExecutionEnvironment class:

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

Next, we can create a stream of events using the executionEnvironment from a variety of sources. It could be some message bus like Apache Kafka, but in this example, we will simply create a source from a couple of string elements:

DataStream dataStream = executionEnvironment.fromElements( "This is a first sentence", "This is a second sentence with a one word");

We can apply transformations to every element of the DataStream like in the normal DataSet class:

SingleOutputStreamOperator upperCase = text.map(String::toUpperCase);

To trigger the execution, we need to invoke a sink operation such as print() that will just print the result of transformations to the standard output, following with the execute() method on the StreamExecutionEnvironment class:

upperCase.print(); env.execute();

It will produce the following output:

1> THIS IS A FIRST SENTENCE 2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2. Windowing of Events

When processing a stream of events in real time, you may sometimes need to group events together and apply some computation on a window of those events.

Suppose we have a stream of events, where each event is a pair consisting of the event number and the timestamp when the event was sent to our system, and that we can tolerate events that are out-of-order but only if they are no more than twenty seconds late.

For this example, let's first create a stream simulating two events that are several minutes apart and define a timestamp extractor that specifies our lateness threshold:

SingleOutputStreamOperator
    
      windowed = env.fromElements( new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()), new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor 
     
      (Time.seconds(20)) { @Override public long extractTimestamp(Tuple2 element) { return element.f1 * 1000; } });
     
    

Next, let's define a window operation to group our events into five-second windows and apply a transformation on those events:

SingleOutputStreamOperator
    
      reduced = windowed .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .maxBy(0, true); reduced.print();
    

It will get the last element of every five-second window, so it prints out:

1> (15,1491221519)

Note that we do not see the second event because it arrived later than the specified lateness threshold.

7. Conclusion

In this article, we introduced the Apache Flink framework and looked at some of the transformations supplied with its API.

We implemented a word count program using Flink's fluent and functional DataSet API. Then we looked at the DataStream API and implemented a simple real-time transformation on a stream of events.

L'implementazione di tutti questi esempi e frammenti di codice può essere trovata su GitHub: questo è un progetto Maven, quindi dovrebbe essere facile da importare ed eseguire così com'è.