Apache Spark: differenze tra dataframe, set di dati e RDD

1. Panoramica

Apache Spark è un sistema di elaborazione dati veloce e distribuito. Esegue l'elaborazione dei dati in memoria e utilizza la memorizzazione nella cache in memoria e l'esecuzione ottimizzata con conseguente prestazioni veloci. Fornisce API di alto livello per linguaggi di programmazione popolari come Scala, Python, Java e R.

In questo breve tutorial, esamineremo tre dei concetti di base di Spark: dataframe, set di dati e RDD.

2. DataFrame

Spark SQL ha introdotto un'astrazione dei dati tabulare chiamata DataFrame a partire da Spark 1.3. Da allora, è diventata una delle funzionalità più importanti di Spark. Questa API è utile quando vogliamo gestire dati distribuiti strutturati e semi-strutturati.

Nella sezione 3, discuteremo di Resilient Distributed Datasets (RDD). I DataFrames memorizzano i dati in modo più efficiente rispetto agli RDD, questo perché utilizzano le capacità immutabili, in memoria, resilienti, distribuite e parallele degli RDD ma applicano anche uno schema ai dati. I DataFrame traducono anche il codice SQL in operazioni RDD ottimizzate di basso livello.

Possiamo creare DataFrame in tre modi:

  • Conversione di RDD esistenti
  • Esecuzione di query SQL
  • Caricamento di dati esterni

Il team di Spark ha introdotto SparkSession nella versione 2.0, unifica tutti i diversi contesti assicurando che gli sviluppatori non debbano preoccuparsi di creare contesti diversi:

SparkSession session = SparkSession.builder() .appName("TouristDataFrameExample") .master("local[*]") .getOrCreate(); DataFrameReader dataFrameReader = session.read();

Analizzeremo il file Tourist.csv :

Dataset data = dataFrameReader.option("header", "true") .csv("data/Tourist.csv");

Dal momento che Spark 2.0 dataframe è diventato un set di dati di tipo Row , in modo che possiamo utilizzare un dataframe come un alias per un set di dati .

Possiamo selezionare colonne specifiche che ci interessano. Possiamo anche filtrare e raggruppare per una data colonna:

data.select(col("country"), col("year"), col("value")) .show(); data.filter(col("country").equalTo("Mexico")) .show(); data.groupBy(col("country")) .count() .show();

3. Set di dati

Un set di dati è un insieme di dati strutturati fortemente tipizzati . Forniscono il familiare stile di programmazione orientato agli oggetti oltre ai vantaggi della sicurezza dei tipi poiché i set di dati possono controllare la sintassi e rilevare gli errori in fase di compilazione.

Dataset è un'estensione di DataFrame, quindi possiamo considerare un DataFrame una vista non tipizzata di un set di dati.

La squadra Spark rilasciato il Dataset API Spark 1.6 e come menzionato: “l'obiettivo di Datasets Spark è quello di realizzare un'API che consente agli utenti di esprimere facilmente trasformazioni su domini oggetto, fornendo anche le prestazioni e robustezza vantaggi dell'esecuzione Spark SQL motore".

Innanzitutto, dobbiamo creare una classe di tipo TouristData :

public class TouristData { private String region; private String country; private String year; private String series; private Double value; private String footnotes; private String source; // ... getters and setters }

Per mappare ciascuno dei nostri record al tipo specificato, avremo bisogno di utilizzare un codificatore. I codificatori traducono tra oggetti Java e il formato binario interno di Spark :

// SparkSession initialization and data load Dataset responseWithSelectedColumns = data.select(col("region"), col("country"), col("year"), col("series"), col("value").cast("double"), col("footnotes"), col("source")); Dataset typedDataset = responseWithSelectedColumns .as(Encoders.bean(TouristData.class));

Come con DataFrame, possiamo filtrare e raggruppare per colonne specifiche:

typedDataset.filter((FilterFunction) record -> record.getCountry() .equals("Norway")) .show(); typedDataset.groupBy(typedDataset.col("country")) .count() .show();

Possiamo anche eseguire operazioni come filtrare per colonna che corrisponde a un determinato intervallo o calcolare la somma di una colonna specifica, per ottenere il valore totale di essa:

typedDataset.filter((FilterFunction) record -> record.getYear() != null && (Long.valueOf(record.getYear()) > 2010 && Long.valueOf(record.getYear())  record.getValue() != null && record.getSeries() .contains("expenditure")) .groupBy("country") .agg(sum("value")) .show();

4. RDD

Il Resilient Distributed Dataset o RDD è l'astrazione di programmazione principale di Spark. Rappresenta una raccolta di elementi che è: immutabile, resiliente e distribuita .

Un RDD incapsula un set di dati di grandi dimensioni, Spark distribuirà automaticamente i dati contenuti negli RDD nel nostro cluster e parallelizzerà le operazioni che eseguiamo su di essi .

Possiamo creare RDD solo attraverso operazioni di dati in una memoria stabile o operazioni su altri RDD.

La tolleranza agli errori è essenziale quando si tratta di grandi set di dati e i dati vengono distribuiti su macchine cluster. Gli RDD sono resistenti grazie alla meccanica di ripristino dei guasti incorporata di Spark. Spark si basa sul fatto che gli RDD memorizzano come sono stati creati in modo che possiamo facilmente risalire al lignaggio per ripristinare la partizione .

Esistono due tipi di operazioni che possiamo eseguire sugli RDD: trasformazioni e azioni .

4.1. Trasformazioni

Possiamo applicare trasformazioni a un RDD per manipolare i suoi dati. Dopo aver eseguito questa manipolazione, otterremo un RDD nuovo di zecca, poiché gli RDD sono oggetti immutabili .

Verificheremo come implementare Map e Filter, due delle trasformazioni più comuni.

Innanzitutto, dobbiamo creare un JavaSparkContext e caricare i dati come RDD dal file Tourist.csv :

SparkConf conf = new SparkConf().setAppName("uppercaseCountries") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD tourists = sc.textFile("data/Tourist.csv");

Successivamente, applichiamo la funzione mappa per ottenere il nome del paese da ciascun record e convertire il nome in maiuscolo. Possiamo salvare questo set di dati appena generato come file di testo su disco:

JavaRDD upperCaseCountries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1].toUpperCase(); }).distinct(); upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

If we want to select only a specific country, we can apply the filter function on our original tourists RDD:

JavaRDD touristsInMexico = tourists .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico")); touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2. Actions

Actions will return a final value or save the results to disc, after doing some computation on the data.

Two of the recurrently used actions in Spark are Count and Reduce.

Let's count the total countries on our CSV file:

// Spark Context initialization and data load JavaRDD countries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1]; }).distinct(); Long numberOfCountries = countries.count();

Now, we'll calculate the total expenditure by country. We'll need to filter the records containing expenditure in their description.

Instead of using a JavaRDD, we'll use a JavaPairRDD. A pair of RDD is a type of RDD that can store key-value pairs. Let's check it next:

JavaRDD touristsExpenditure = tourists .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure")); JavaPairRDD expenditurePairRdd = touristsExpenditure .mapToPair(line -> { String[] columns = line.split(COMMA_DELIMITER); return new Tuple2(columns[1], Double.valueOf(columns[6])); }); List
    
      totalByCountry = expenditurePairRdd .reduceByKey((x, y) -> x + y) .collect();
    

5. Conclusion

Per riassumere, dovremmo usare DataFrames o Dataset quando abbiamo bisogno di API specifiche del dominio, abbiamo bisogno di espressioni di alto livello come aggregazione, somma o query SQL. O quando vogliamo l'indipendenza dai tipi in fase di compilazione.

D'altra parte, dovremmo usare RDD quando i dati non sono strutturati e non abbiamo bisogno di implementare uno schema specifico o quando abbiamo bisogno di trasformazioni e azioni di basso livello.

Come sempre, tutti gli esempi di codice sono disponibili su GitHub.