Flusso di dati Spring Cloud con Apache Spark

1. Introduzione

Spring Cloud Data Flow è un toolkit per la creazione di integrazione dei dati e pipeline di elaborazione dei dati in tempo reale.

Le pipeline, in questo caso, sono applicazioni Spring Boot create con l'uso di framework Spring Cloud Stream o Spring Cloud Task.

In questo tutorial, mostreremo come utilizzare Spring Cloud Data Flow con Apache Spark.

2. Server locale del flusso di dati

Innanzitutto, dobbiamo eseguire il server del flusso di dati per poter distribuire i nostri lavori.

Per eseguire il server del flusso di dati in locale, è necessario creare un nuovo progetto con la dipendenza spring-cloud-starter-dataflow-server-local :

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 1.7.4.RELEASE 

Dopodiché, dobbiamo annotare la classe principale nel server con @EnableDataFlowServer :

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } }

Una volta eseguita questa applicazione, avremo un server del flusso di dati locale sulla porta 9393.

3. Creazione di un progetto

Creeremo un lavoro Spark come applicazione locale autonoma in modo che non avremo bisogno di alcun cluster per eseguirlo.

3.1. Dipendenze

Innanzitutto, aggiungeremo la dipendenza Spark:

 org.apache.spark spark-core_2.10 2.4.0  

3.2. Creazione di un lavoro

E per il nostro lavoro, approssimiamo pi greco:

public class PiApproximation { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation"); JavaSparkContext context = new JavaSparkContext(conf); int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2; int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices; List xs = IntStream.rangeClosed(0, n) .mapToObj(element -> Integer.valueOf(element)) .collect(Collectors.toList()); JavaRDD dataSet = context.parallelize(xs, slices); JavaRDD pointsInsideTheCircle = dataSet.map(integer -> { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y )  integer + integer2); System.out.println("The pi was estimated as:" + count / n); context.stop(); } }

4. Flusso di dati Shell

Data Flow Shell è un'applicazione che ci consentirà di interagire con il server . Shell utilizza i comandi DSL per descrivere i flussi di dati.

Per utilizzare Data Flow Shell dobbiamo creare un progetto che ci permetta di eseguirlo. Innanzitutto, abbiamo bisogno della dipendenza spring-cloud-dataflow-shell :

 org.springframework.cloud spring-cloud-dataflow-shell 1.7.4.RELEASE 

Dopo aver aggiunto la dipendenza, possiamo creare la classe che eseguirà la nostra shell del flusso di dati:

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } }

5. Distribuzione del progetto

Per distribuire il nostro progetto, utilizzeremo il cosiddetto task runner disponibile per Apache Spark in tre versioni: cluster , filato e client . Procederemo con la versione del client locale .

Il task runner è ciò che esegue il nostro lavoro Spark.

Per fare ciò, dobbiamo prima registrare la nostra attività utilizzando Data Flow Shell :

app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT 

L'attività ci consente di specificare più parametri diversi alcuni di essi sono opzionali, ma alcuni parametri sono necessari per distribuire correttamente il lavoro Spark:

  • spark.app-class , la classe principale del nostro lavoro presentato
  • spark.app-jar , un percorso al fat jar contenente il nostro lavoro
  • spark.app- name , il nome che verrà utilizzato per il nostro lavoro
  • spark.app-args , gli argomenti che verranno passati al lavoro

Possiamo utilizzare il task spark-client registrato per inviare il nostro lavoro, ricordandoci di fornire i parametri richiesti:

task create spark1 --definition "spark-client \ --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \ --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

Nota che spark.app-jar è il percorso per fat-jar con il nostro lavoro.

Dopo aver creato con successo l'attività, possiamo procedere ad eseguirla con il seguente comando:

task launch spark1

Questo richiamerà l'esecuzione del nostro compito.

6. Riepilogo

In questo tutorial, abbiamo mostrato come utilizzare il framework Spring Cloud Data Flow per elaborare i dati con Apache Spark. Ulteriori informazioni sul framework Spring Cloud Data Flow sono disponibili nella documentazione.

Tutti gli esempi di codice possono essere trovati su GitHub.