Una guida ad Apache Crunch

1. Introduzione

In questo tutorial, dimostreremo Apache Crunch con un'applicazione di elaborazione dati di esempio. Eseguiremo questa applicazione utilizzando il framework MapReduce.

Inizieremo coprendo brevemente alcuni concetti di Apache Crunch. Quindi passeremo a un'app di esempio. In questa app faremo l'elaborazione del testo:

  • Prima di tutto, leggeremo le righe da un file di testo
  • Successivamente, li divideremo in parole e rimuoveremo alcune parole comuni
  • Quindi, raggrupperemo le parole rimanenti per ottenere un elenco di parole uniche e il loro conteggio
  • Infine, scriveremo questo elenco in un file di testo

2. Cos'è Crunch?

MapReduce è un framework di programmazione parallelo distribuito per l'elaborazione di grandi quantità di dati su un cluster di server. Framework software come Hadoop e Spark implementano MapReduce.

Crunch fornisce un framework per scrivere, testare ed eseguire pipeline MapReduce in Java. Qui, non scriviamo direttamente i lavori MapReduce. Piuttosto, definiamo la pipeline di dati (ovvero le operazioni per eseguire i passaggi di input, elaborazione e output) utilizzando le API Crunch. Crunch Planner li associa ai lavori MapReduce e li esegue quando necessario.

Pertanto, ogni pipeline di dati Crunch è coordinata da un'istanza dell'interfaccia Pipeline . Questa interfaccia definisce anche i metodi per leggere i dati in una pipeline tramite istanze di origine e scrivere dati da una pipeline a istanze di destinazione .

Abbiamo 3 interfacce per la rappresentazione dei dati:

  1. PCollection : una raccolta di elementi immutabile e distribuita
  2. PTable , V > - una multi-mappa immutabile, distribuita e non ordinata di chiavi e valori
  3. PGroupedTable , V > - una mappa distribuita e ordinata di chiavi di tipo K su una V Iterabile che può essere ripetuta esattamente una volta

DoFn è la classe base per tutte le funzioni di elaborazione dati . Corrisponde alleclassi Mapper , Reducer e Combiner in MapReduce. Dedichiamo la maggior parte del tempo di sviluppo a scrivere e testare calcoli logici che lo utilizzano .

Ora che abbiamo più familiarità con Crunch, usiamolo per creare l'applicazione di esempio.

3. Impostazione di un progetto Crunch

Prima di tutto, creiamo un progetto Crunch con Maven. Possiamo farlo in due modi:

  1. Aggiungi le dipendenze richieste nel file pom.xml di un progetto esistente
  2. Usa un archetipo per generare un progetto iniziale

Diamo una rapida occhiata a entrambi gli approcci.

3.1. Dipendenze di Maven

Per aggiungere Crunch a un progetto esistente, aggiungiamo le dipendenze richieste nel file pom.xml .

Per prima cosa, aggiungiamo la libreria crunch-core :

 org.apache.crunch crunch-core 0.15.0 

Successivamente, aggiungiamo la libreria hadoop-client per comunicare con Hadoop. Usiamo la versione corrispondente all'installazione di Hadoop:

 org.apache.hadoop hadoop-client 2.2.0 provided 

Possiamo controllare Maven Central per le ultime versioni delle librerie crunch-core e hadoop-client.

3.2. Archetipo di Maven

Un altro approccio consiste nel generare rapidamente un progetto iniziale utilizzando l'archetipo Maven fornito da Crunch :

mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype 

Quando richiesto dal comando precedente, forniamo la versione Crunch e i dettagli del progetto.

4. Crunch Pipeline Setup

Dopo aver impostato il progetto, dobbiamo creare un oggetto Pipeline . Crunch ha 3 implementazioni della pipeline :

  • MRPipeline : viene eseguito all'interno di Hadoop MapReduce
  • SparkPipeline : viene eseguito come una serie di pipeline Spark
  • MemPipeline : viene eseguito in memoria sul client ed è utile per i test di unità

Di solito sviluppiamo e testiamo utilizzando un'istanza di MemPipeline . Successivamente utilizziamo un'istanza di MRPipeline o SparkPipeline per l'esecuzione effettiva.

Se avessimo bisogno di una pipeline in memoria, potremmo usare il metodo statico getInstance per ottenere l' istanza di MemPipeline :

Pipeline pipeline = MemPipeline.getInstance();

Ma per ora, creiamo un'istanza di MRPipeline per eseguire l'applicazione con Hadoop :

Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

5. Leggere i dati di input

Dopo aver creato l'oggetto pipeline, vogliamo leggere i dati di input. L' interfaccia Pipeline fornisce un comodo metodo per leggere l'input da un file di testo , readTextFile (pathName).

Chiamiamo questo metodo per leggere il file di testo di input:

PCollection lines = pipeline.readTextFile(inputPath);

Il codice precedente legge il file di testo come una raccolta di String .

Come passaggio successivo, scriviamo un test case per la lettura dell'input:

@Test public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() { Pipeline pipeline = MemPipeline.getInstance(); PCollection lines = pipeline.readTextFile(INPUT_FILE_PATH); assertEquals(21, lines.asCollection() .getValue() .size()); }

In questo test, verifichiamo di ottenere il numero di righe previsto durante la lettura di un file di testo.

6. Fasi del trattamento dei dati

Dopo aver letto i dati di input, dobbiamo elaborarli. L'API Crunch contiene una serie di sottoclassi di DoFn per gestire scenari di elaborazione dati comuni :

  • FilterFn : filtra i membri di una raccolta in base a una condizione booleana
  • MapFn : mappa ogni record di input esattamente su un record di output
  • CombineFn : combina un numero di valori in un unico valore
  • JoinFn : esegue join come inner join, left outer join, right outer join e full outer join

Let's implement the following data processing logic by using these classes:

  1. Split each line in the input file into words
  2. Remove the stop words
  3. Count the unique words

6.1. Split a Line of Text Into Words

First of all, let's create the Tokenizer class to split a line into words.

We'll extend the DoFn class. This class has an abstract method called process. This method processes the input records from a PCollection and sends the output to an Emitter.

We need to implement the splitting logic in this method:

public class Tokenizer extends DoFn { private static final Splitter SPLITTER = Splitter .onPattern("\\s+") .omitEmptyStrings(); @Override public void process(String line, Emitter emitter) { for (String word : SPLITTER.split(line)) { emitter.emit(word); } } } 

In the above implementation, we've used the Splitter class from Guava library to extract words from a line.

Next, let's write a unit test for the Tokenizer class:

@RunWith(MockitoJUnitRunner.class) public class TokenizerUnitTest { @Mock private Emitter emitter; @Test public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() { Tokenizer splitter = new Tokenizer(); splitter.process(" hello world ", emitter); verify(emitter).emit("hello"); verify(emitter).emit("world"); verifyNoMoreInteractions(emitter); } }

The above test verifies that the correct words are returned.

Finally, let's split the lines read from the input text file using this class.

The parallelDo method of PCollection interface applies the given DoFn to all the elements and returns a new PCollection.

Let's call this method on the lines collection and pass an instance of Tokenizer:

PCollection words = lines.parallelDo(new Tokenizer(), Writables.strings()); 

As a result, we get the list of words in the input text file. We'll remove the stop words in the next step.

6.2. Remove Stop Words

Similarly to the previous step, let's create a StopWordFilter class to filter out stop words.

However, we'll extend FilterFn instead of DoFn. FilterFn has an abstract method called accept. We need to implement the filtering logic in this method:

public class StopWordFilter extends FilterFn { // English stop words, borrowed from Lucene. private static final Set STOP_WORDS = ImmutableSet .copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "s", "such", "t", "that", "the", "their", "then", "there", "these", "they", "this", "to", "was", "will", "with" }); @Override public boolean accept(String word) { return !STOP_WORDS.contains(word); } }

Next, let's write the unit test for StopWordFilter class:

public class StopWordFilterUnitTest { @Test public void givenFilter_whenStopWordPassed_thenFalseReturned() { FilterFn filter = new StopWordFilter(); assertFalse(filter.accept("the")); assertFalse(filter.accept("a")); } @Test public void givenFilter_whenNonStopWordPassed_thenTrueReturned() { FilterFn filter = new StopWordFilter(); assertTrue(filter.accept("Hello")); assertTrue(filter.accept("World")); } @Test public void givenWordCollection_whenFiltered_thenStopWordsRemoved() { PCollection words = MemPipeline .collectionOf("This", "is", "a", "test", "sentence"); PCollection noStopWords = words.filter(new StopWordFilter()); assertEquals(ImmutableList.of("This", "test", "sentence"), Lists.newArrayList(noStopWords.materialize())); } }

This test verifies that the filtering logic is performed correctly.

Finally, let's use StopWordFilter to filter the list of words generated in the previous step. The filter method of PCollection interface applies the given FilterFn to all the elements and returns a new PCollection.

Let's call this method on the words collection and pass an instance of StopWordFilter:

PCollection noStopWords = words.filter(new StopWordFilter());

As a result, we get the filtered collection of words.

6.3. Count Unique Words

After getting the filtered collection of words, we want to count how often each word occurs. PCollection interface has a number of methods to perform common aggregations:

  • min – returns the minimum element of the collection
  • max – returns the maximum element of the collection
  • length – returns the number of elements in the collection
  • count – returns a PTable that contains the count of each unique element of the collection

Let's use the count method to get the unique words along with their counts:

// The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable counts = noStopWords.count();

7. Specify Output

As a result of the previous steps, we have a table of words and their counts. We want to write this result to a text file. The Pipeline interface provides convenience methods to write output:

void write(PCollection collection, Target target); void write(PCollection collection, Target target, Target.WriteMode writeMode); void writeTextFile(PCollection collection, String pathName);

Therefore, let's call the writeTextFile method:

pipeline.writeTextFile(counts, outputPath); 

8. Manage Pipeline Execution

All the steps so far have just defined the data pipeline. No input has been read or processed. This is because Crunch uses lazy execution model.

It doesn't run the MapReduce jobs until a method that controls job planning and execution is invoked on the Pipeline interface:

  • run – prepares an execution plan to create the required outputs and then executes it synchronously
  • done – runs any remaining jobs required to generate outputs and then cleans up any intermediate data files created
  • runAsync – similar to run method, but executes in a non-blocking fashion

Therefore, let's call the done method to execute the pipeline as MapReduce jobs:

PipelineResult result = pipeline.done(); 

The above statement runs the MapReduce jobs to read input, process them and write the result to the output directory.

9. Putting the Pipeline Together

So far we have developed and unit tested the logic to read input data, process it and write to the output file.

Next, let's put them together to build the entire data pipeline:

public int run(String[] args) throws Exception { String inputPath = args[0]; String outputPath = args[1]; // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); // Reference a given text file as a collection of Strings. PCollection lines = pipeline.readTextFile(inputPath); // Define a function that splits each line in a PCollection of Strings into // a PCollection made up of the individual words in the file. // The second argument sets the serialization format. PCollection words = lines.parallelDo(new Tokenizer(), Writables.strings()); // Take the collection of words and remove known stop words. PCollection noStopWords = words.filter(new StopWordFilter()); // The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable counts = noStopWords.count(); // Instruct the pipeline to write the resulting counts to a text file. pipeline.writeTextFile(counts, outputPath); // Execute the pipeline as a MapReduce. PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1; }

10. Hadoop Launch Configuration

The data pipeline is thus ready.

However, we need the code to launch it. Therefore, let's write the main method to launch the application:

public class WordCount extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new WordCount(), args); }

ToolRunner.run parses the Hadoop configuration from the command line and executes the MapReduce job.

11. Run Application

The complete application is now ready. Let's run the following command to build it:

mvn package 

As a result of the above command, we get the packaged application and a special job jar in the target directory.

Let's use this job jar to execute the application on Hadoop:

hadoop jar target/crunch-1.0-SNAPSHOT-job.jar 

The application reads the input file and writes the result to the output file. The output file contains unique words along with their counts similar to the following:

[Add,1] [Added,1] [Admiration,1] [Admitting,1] [Allowance,1]

In addition to Hadoop, we can run the application within IDE, as a stand-alone application or as unit tests.

12. Conclusion

In this tutorial, we created a data processing application running on MapReduce. Apache Crunch makes it easy to write, test and execute MapReduce pipelines in Java.

As usual, the full source code can be found over on Github.