Introduzione all'elaborazione dei flussi con Spring Cloud Data Flow

1. Introduzione

Spring Cloud Data Flow è un modello operativo e di programmazione nativo per il cloud per microservizi di dati componibili.

Con Spring Cloud Data Flow , gli sviluppatori possono creare e orchestrare pipeline di dati per casi d'uso comuni come l'acquisizione di dati, analisi in tempo reale e importazione / esportazione di dati.

Queste pipeline di dati sono disponibili in due versioni, streaming e pipeline di dati batch.

Nel primo caso, una quantità illimitata di dati viene consumata o prodotta tramite middleware di messaggistica. Mentre nel secondo caso l'attività di breve durata elabora un insieme finito di dati e quindi termina.

Questo articolo si concentrerà sull'elaborazione dello streaming.

2. Panoramica architettonica

I componenti chiave di questo tipo di architettura sono Applicazioni , Data Flow Server e il runtime di destinazione.

Inoltre, oltre a questi componenti chiave, di solito abbiamo anche una Shell per il flusso di dati e un broker di messaggi all'interno dell'architettura.

Vediamo tutti questi componenti in modo più dettagliato.

2.1. Applicazioni

In genere, una pipeline di dati in streaming include eventi di consumo da sistemi esterni, elaborazione dei dati e persistenza poliglotta. Queste fasi sono comunemente noti come sorgente , processore e dissipatore in primavera cloud terminologia:

  • Fonte: è l'applicazione che consuma eventi
  • Processore: consuma dati dalla sorgente , fa alcune elaborazioni su di esso, ed emette i dati elaborati per la successiva applicazione in cantiere
  • Sink: consuma da una sorgente o da un processore e scrive i dati nel livello di persistenza desiderato

Queste applicazioni possono essere pacchettizzate in due modi:

  • Spring Boot uber-jar che è ospitato in un repository Maven, file, http o qualsiasi altra implementazione di risorse Spring (questo metodo verrà utilizzato in questo articolo)
  • Docker

Molte sorgenti, processori e applicazioni sink per casi d'uso comuni (ad es. Jdbc, hdfs, http, router) sono già fornite e pronte per l'uso dal team di Spring Cloud Data Flow .

2.2. Runtime

Inoltre, è necessario un runtime per l'esecuzione di queste applicazioni. I tempi di esecuzione supportati sono:

  • Cloud Foundry
  • Apache YARN
  • Kubernetes
  • Apache Mesos
  • Server locale per lo sviluppo (che verrà utilizzato in questo articolo)

2.3. Data Flow Server

Il componente responsabile della distribuzione delle applicazioni in un runtime è Data Flow Server . Esiste un file eseguibile di Data Flow Server fornito per ciascuno dei runtime di destinazione.

Il Data Flow Server è responsabile dell'interpretazione:

  • Un flusso DSL che descrive il flusso logico di dati attraverso più applicazioni.
  • Un manifesto di distribuzione che descrive la mappatura delle applicazioni nel runtime.

2.4. Shell flusso di dati

Data Flow Shell è un client per Data Flow Server. La shell ci consente di eseguire il comando DSL necessario per interagire con il server.

Ad esempio, il DSL per descrivere il flusso di dati da una sorgente http a un sink jdbc sarebbe scritto come "http | jdbc ". Questi nomi nel DSL vengono registrati con Data Flow Server e mappati su artefatti dell'applicazione che possono essere ospitati nei repository Maven o Docker.

Spring offre anche un'interfaccia grafica, denominata Flo , per la creazione e il monitoraggio di pipeline di dati in streaming. Tuttavia, il suo utilizzo è al di fuori della discussione di questo articolo.

2.5. Message Broker

Come abbiamo visto nell'esempio della sezione precedente, abbiamo utilizzato il simbolo del tubo nella definizione del flusso di dati. Il simbolo pipe rappresenta la comunicazione tra le due applicazioni tramite middleware di messaggistica.

Ciò significa che abbiamo bisogno di un broker di messaggi attivo e funzionante nell'ambiente di destinazione.

I due broker middleware di messaggistica supportati sono:

  • Apache Kafka
  • RabbitMQ

Quindi, ora che abbiamo una panoramica dei componenti architettonici, è il momento di creare la nostra prima pipeline di elaborazione del flusso.

3. Installare un Message Broker

Come abbiamo visto, le applicazioni nella pipeline necessitano di un middleware di messaggistica per comunicare. Ai fini di questo articolo, andremo con RabbitMQ .

Per i dettagli completi dell'installazione, puoi seguire le istruzioni sul sito ufficiale.

4. Il server del flusso di dati locale

Per accelerare il processo di generazione delle nostre applicazioni, useremo Spring Initializr; con il suo aiuto, possiamo ottenere le nostre applicazioni Spring Boot in pochi minuti.

Dopo aver raggiunto il sito Web, scegli semplicemente un gruppo e un nome di artefatto .

Fatto ciò, fai clic sul pulsante Genera progetto per avviare il download del manufatto Maven.

Al termine del download, decomprimere il progetto e importarlo come progetto Maven nell'IDE di scelta.

Aggiungiamo una dipendenza Maven al progetto. Poiché avremo bisogno delle librerie di Dataflow Local Server , aggiungiamo la dipendenza spring-cloud-starter-dataflow-server-local:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Ora dobbiamo annotare la classe principale di Spring Boot con l' annotazione @EnableDataFlowServer :

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } } 

È tutto. Il nostro Local Data Flow Server è pronto per essere eseguito:

mvn spring-boot:run

L'applicazione si avvierà sulla porta 9393.

5. La shell del flusso di dati

Ancora una volta, vai su Spring Initializr e scegli un nome di gruppo e artefatto .

Dopo aver scaricato e importato il progetto, aggiungiamo una dipendenza spring-cloud-dataflow-shell:

 org.springframework.cloud spring-cloud-dataflow-shell 

Ora dobbiamo aggiungere l' annotazione @EnableDataFlowShell alla classe principale di Spring Boot :

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } } 

Ora possiamo eseguire la shell:

mvn spring-boot:run

After the shell is running, we can type the help command in the prompt to see a complete list of command that we can perform.

6. The Source Application

Similarly, on Initializr, we'll now create a simple application and add a Stream Rabbit dependency called spring-cloud-starter-stream-rabbit:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

We'll then add the @EnableBinding(Source.class) annotation to the Spring Boot main class:

@EnableBinding(Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeSourceApplication.class, args); } }

Now we need to define the source of the data that must be processed. This source could be any potentially endless workload (internet-of-things sensor data, 24/7 event processing, online transaction data ingest).

In our sample application, we produce one event (for simplicity a new timestamp) every 10 seconds with a Poller.

The @InboundChannelAdapter annotation sends a message to the source’s output channel, using the return value as the payload of the message:

@Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource timeMessageSource() { return () -> MessageBuilder.withPayload(new Date().getTime()).build(); } 

Our data source is ready.

7. The Processor Application

Next- we'll create an application and add a Stream Rabbit dependency.

We'll then add the @EnableBinding(Processor.class) annotation to the Spring Boot main class:

@EnableBinding(Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeProcessorApplication.class, args); } }

Next, we need to define a method to process the data that coming from the source application.

To define a transformer, we need to annotate this method with @Transformer annotation:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(Long timestamp) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy"); String date = dateFormat.format(timestamp); return date; }

It converts a timestamp from the ‘input' channel to a formatted date which will be sent to the ‘output' channel.

8. The Sink Application

The last application to create is the Sink application.

Again, go to the Spring Initializr and choose a Group, an Artifact name. After downloading the project let's add a Stream Rabbit dependency.

Then add the @EnableBinding(Sink.class) annotation to the Spring Boot main class:

@EnableBinding(Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowLoggingSinkApplication.class, args); } }

Now we need a method to intercept the messages coming from the processor application.

To do this, we need to add the @StreamListener(Sink.INPUT) annotation to our method:

@StreamListener(Sink.INPUT) public void loggerSink(String date) { logger.info("Received: " + date); }

The method simply prints the timestamp transformed in a formatted date to a log file.

9. Register a Stream App

The Spring Cloud Data Flow Shell allow us to Register a Stream App with the App Registry using the app register command.

We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify “source“, “processor“, or “sink“.

When providing a URI with the maven scheme, the format should conform to the following:

maven://:[:[:]]:

To register the Source, Processor and Sink applications previously created , go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:

app register --name time-source --type source --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT app register --name time-processor --type processor --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT 

10. Create and Deploy the Stream

To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:

stream create --name time-to-log --definition 'time-source | time-processor | logging-sink'

This defines a stream named time-to-log based on the DSL expression ‘time-source | time-processor | logging-sink'.

Then to deploy the stream execute the following shell command:

stream deploy --name time-to-log

The Data Flow Server resolves time-source, time-processor, and logging-sink to maven coordinates and uses those to launch the time-source, time-processor and logging-sink applications of the stream.

If the stream is correctly deployed you’ll see in the Data Flow Server logs that the modules have been started and tied together:

2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink 2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor 2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Reviewing the Result

In this example, the source simply sends the current timestamp as a message each second, the processor format it and the log sink outputs the formatted timestamp using the logging framework.

The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above. To see the result, we can tail the log:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log 2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01 2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11 2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Conclusion

In this article, we have seen how to build a data pipeline for stream processing through the use of Spring Cloud Data Flow.

Inoltre, abbiamo visto il ruolo delle applicazioni Source , Processor e Sink all'interno dello stream e come collegare e collegare questo modulo all'interno di un Data Flow Server tramite l'uso di Data Flow Shell .

Il codice di esempio può essere trovato nel progetto GitHub.