Creazione di una pipeline di dati con Flink e Kafka

1. Panoramica

Apache Flink è un framework di elaborazione del flusso che può essere utilizzato facilmente con Java. Apache Kafka è un sistema di elaborazione del flusso distribuito che supporta un'elevata tolleranza agli errori.

In questo tutorial, daremo un'occhiata a come costruire una pipeline di dati utilizzando queste due tecnologie.

2. Installazione

Per installare e configurare Apache Kafka, fare riferimento alla guida ufficiale. Dopo l'installazione, possiamo utilizzare i seguenti comandi per creare i nuovi argomenti chiamati flink_input e flink_output:

 bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic flink_output bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic flink_input

Per il bene di questo tutorial, useremo la configurazione predefinita e le porte predefinite per Apache Kafka.

3. Utilizzo di Flink

Apache Flink consente una tecnologia di elaborazione del flusso in tempo reale. Il framework consente di utilizzare più sistemi di terze parti come sorgenti di flusso o sink .

In Flink - sono disponibili vari connettori:

  • Apache Kafka (sorgente / sink)
  • Apache Cassandra (lavandino)
  • Amazon Kinesis Streams (sorgente / sink)
  • Elasticsearch (lavandino)
  • Hadoop FileSystem (sink)
  • RabbitMQ (sorgente / sink)
  • Apache NiFi (sorgente / sink)
  • Twitter Streaming API (fonte)

Per aggiungere Flink al nostro progetto, dobbiamo includere le seguenti dipendenze Maven:

 org.apache.flink flink-core 1.5.0   org.apache.flink flink-connector-kafka-0.11_2.11 1.5.0 

L'aggiunta di queste dipendenze ci consentirà di utilizzare e produrre da e verso argomenti Kafka. È possibile trovare la versione corrente di Flink su Maven Central.

4. Kafka String Consumer

Per utilizzare i dati di Kafka con Flink, dobbiamo fornire un argomento e un indirizzo Kafka. Dovremmo anche fornire un ID di gruppo che verrà utilizzato per contenere gli offset in modo da non leggere sempre tutti i dati dall'inizio.

Creiamo un metodo statico che renderà più semplice la creazione di FlinkKafkaConsumer :

public static FlinkKafkaConsumer011 createStringConsumerForTopic( String topic, String kafkaAddress, String kafkaGroup ) { Properties props = new Properties(); props.setProperty("bootstrap.servers", kafkaAddress); props.setProperty("group.id",kafkaGroup); FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011( topic, new SimpleStringSchema(), props); return consumer; }

Questo metodo accetta un argomento, kafkaAddress e kafkaGroup e crea il FlinkKafkaConsumer che utilizzerà i dati di un determinato argomento come una stringa poiché abbiamo utilizzato SimpleStringSchema per decodificare i dati.

Il numero 011 nel nome della classe si riferisce alla versione Kafka.

5. Kafka String Producer

Per produrre dati a Kafka, dobbiamo fornire l'indirizzo e l'argomento di Kafka che vogliamo utilizzare. Ancora una volta, possiamo creare un metodo statico che ci aiuterà a creare produttori per diversi argomenti:

public static FlinkKafkaProducer011 createStringProducer( String topic, String kafkaAddress){ return new FlinkKafkaProducer011(kafkaAddress, topic, new SimpleStringSchema()); }

Questo metodo accetta solo topic e kafkaAddress come argomenti poiché non è necessario fornire l'ID di gruppo quando produciamo l'argomento Kafka.

6. Elaborazione del flusso di stringhe

Quando abbiamo un consumatore e un produttore completamente funzionanti, possiamo provare a elaborare i dati da Kafka e quindi salvare i nostri risultati di nuovo in Kafka. L'elenco completo delle funzioni che possono essere utilizzate per l'elaborazione del flusso può essere trovato qui.

In questo esempio, metteremo in maiuscolo le parole in ogni voce di Kafka e poi le riscriveremo a Kafka.

A questo scopo dobbiamo creare una MapFunction personalizzata :

public class WordsCapitalizer implements MapFunction { @Override public String map(String s) { return s.toUpperCase(); } }

Dopo aver creato la funzione, possiamo usarla nell'elaborazione del flusso:

public static void capitalize() { String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; String address = "localhost:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment .getExecutionEnvironment(); FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic( inputTopic, address, consumerGroup); DataStream stringInputStream = environment .addSource(flinkKafkaConsumer); FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer( outputTopic, address); stringInputStream .map(new WordsCapitalizer()) .addSink(flinkKafkaProducer); }

L'applicazione leggerà i dati dall'argomento flink_input , eseguirà operazioni sullo stream e quindi salverà i risultati nell'argomento flink_output in Kafka.

Abbiamo visto come gestire gli archi usando Flink e Kafka. Ma spesso è necessario eseguire operazioni su oggetti personalizzati. Vedremo come farlo nei prossimi capitoli.

7. Deserializzazione di oggetti personalizzati

La classe seguente rappresenta un semplice messaggio con informazioni sul mittente e sul destinatario:

@JsonSerialize public class InputMessage { String sender; String recipient; LocalDateTime sentAt; String message; }

In precedenza, usavamo SimpleStringSchema per deserializzare i messaggi da Kafka, ma ora vogliamo deserializzare i dati direttamente negli oggetti personalizzati .

Per fare ciò, abbiamo bisogno di uno schema di deserializzazione personalizzato :

public class InputMessageDeserializationSchema implements DeserializationSchema { static ObjectMapper objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()); @Override public InputMessage deserialize(byte[] bytes) throws IOException { return objectMapper.readValue(bytes, InputMessage.class); } @Override public boolean isEndOfStream(InputMessage inputMessage) { return false; } @Override public TypeInformation getProducedType() { return TypeInformation.of(InputMessage.class); } }

Supponiamo qui che i messaggi siano conservati come JSON in Kafka.

Poiché abbiamo un campo di tipo LocalDateTime , dobbiamo specificare JavaTimeModule, che si occupa di mappare gli oggetti LocalDateTime a JSON.

Gli schemi Flink non possono avere campi non serializzabili perché tutti gli operatori (come schemi o funzioni) sono serializzati all'inizio del lavoro.

Esistono problemi simili in Apache Spark. Una delle soluzioni note per questo problema è l'inizializzazione dei campi come statici , come abbiamo fatto con ObjectMapper sopra. Non è la soluzione più carina, ma è relativamente semplice e fa il lavoro.

Il metodo isEndOfStream può essere utilizzato per il caso speciale in cui lo stream deve essere elaborato solo fino a quando non vengono ricevuti alcuni dati specifici. Ma non è necessario nel nostro caso.

8. Serializzazione di oggetti personalizzati

Ora, supponiamo di volere che il nostro sistema abbia la possibilità di creare un backup dei messaggi. Vogliamo che il processo sia automatico e ogni backup dovrebbe essere composto da messaggi inviati durante un giorno intero.

Inoltre, a un messaggio di backup dovrebbe essere assegnato un ID univoco.

A questo scopo, possiamo creare la seguente classe:

public class Backup { @JsonProperty("inputMessages") List inputMessages; @JsonProperty("backupTimestamp") LocalDateTime backupTimestamp; @JsonProperty("uuid") UUID uuid; public Backup(List inputMessages, LocalDateTime backupTimestamp) { this.inputMessages = inputMessages; this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID(); } }

Tieni presente che il meccanismo di generazione dell'UUID non è perfetto, in quanto consente duplicati. Tuttavia, questo è sufficiente per lo scopo di questo esempio.

Vogliamo salvare il nostro oggetto di backup come JSON in Kafka, quindi dobbiamo creare il nostro SerializationSchema :

public class BackupSerializationSchema implements SerializationSchema { ObjectMapper objectMapper; Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class); @Override public byte[] serialize(Backup backupMessage) { if(objectMapper == null) { objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()); } try { return objectMapper.writeValueAsString(backupMessage).getBytes(); } catch (com.fasterxml.jackson.core.JsonProcessingException e) { logger.error("Failed to parse JSON", e); } return new byte[0]; } }

9. Timestamping Messages

Since we want to create a backup for all messages of each day, messages need a timestamp.

Flink provides the three different time characteristics EventTime, ProcessingTime, and IngestionTime.

In our case, we need to use the time at which the message has been sent, so we'll use EventTime.

To use EventTimewe need a TimestampAssigner which will extract timestamps from our input data:

public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks { @Override public long extractTimestamp(InputMessage element, long previousElementTimestamp) { ZoneId zoneId = ZoneId.systemDefault(); return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000; } @Nullable @Override public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1500); } }

We need to transform our LocalDateTime to EpochSecond as this is the format expected by Flink. After assigning timestamps, all time-based operations will use time from sentAt field to operate.

Since Flink expects timestamps to be in milliseconds and toEpochSecond() returns time in seconds we needed to multiply it by 1000, so Flink will create windows correctly.

Flink defines the concept of a Watermark. Watermarks are useful in case of data that don't arrive in the order they were sent. A watermark defines the maximum lateness that is allowed for elements to be processed.

Elements that have timestamps lower than the watermark won't be processed at all.

10. Creating Time Windows

To assure that our backup gathers only messages sent during one day, we can use the timeWindowAll method on the stream, which will split messages into windows.

Tuttavia, dovremo comunque aggregare i messaggi da ciascuna finestra e restituirli come backup .

Per fare ciò, avremo bisogno di una AggregateFunction personalizzata :

public class BackupAggregator implements AggregateFunction
    
      { @Override public List createAccumulator() { return new ArrayList(); } @Override public List add( InputMessage inputMessage, List inputMessages) { inputMessages.add(inputMessage); return inputMessages; } @Override public Backup getResult(List inputMessages) { return new Backup(inputMessages, LocalDateTime.now()); } @Override public List merge(List inputMessages, List acc1) { inputMessages.addAll(acc1); return inputMessages; } }
    

11. Aggregazione di backup

Dopo aver assegnato i timestamp corretti e aver implementato la nostra AggregateFunction , possiamo finalmente prendere il nostro input Kafka ed elaborarlo:

public static void createBackup () throws Exception { String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; String kafkaAddress = "192.168.99.100:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest(); flinkKafkaConsumer.assignTimestampsAndWatermarks( new InputMessageTimestampAssigner()); FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress); DataStream inputMessagesStream = environment.addSource(flinkKafkaConsumer); inputMessagesStream .timeWindowAll(Time.hours(24)) .aggregate(new BackupAggregator()) .addSink(flinkKafkaProducer); environment.execute(); }

12. Conclusione

In questo articolo, abbiamo presentato come creare una semplice pipeline di dati con Apache Flink e Apache Kafka.

Come sempre, il codice può essere trovato su Github.