Introduzione ad Apache Storm

1. Panoramica

Questo tutorial sarà un'introduzione ad Apache Storm, un sistema di calcolo distribuito in tempo reale.

Ci concentreremo su e copriremo:

  • Che cos'è esattamente Apache Storm e quali problemi risolve
  • La sua architettura e
  • Come usarlo in un progetto

2. Cos'è Apache Storm?

Apache Storm è un sistema distribuito gratuito e open source per calcoli in tempo reale.

Fornisce tolleranza agli errori, scalabilità e garantisce l'elaborazione dei dati ed è particolarmente efficace nell'elaborazione di flussi di dati illimitati.

Alcuni buoni casi d'uso per Storm possono essere l'elaborazione di operazioni con carta di credito per il rilevamento di frodi o l'elaborazione di dati da case intelligenti per rilevare sensori difettosi.

Storm consente l'integrazione con vari database e sistemi di accodamento disponibili sul mercato.

3. Dipendenza da Maven

Prima di utilizzare Apache Storm, dobbiamo includere la dipendenza storm-core nel nostro progetto:

 org.apache.storm storm-core 1.2.2 provided 

Dobbiamo utilizzare l' ambito fornito solo se intendiamo eseguire la nostra applicazione sul cluster Storm.

Per eseguire l'applicazione localmente, possiamo utilizzare una cosiddetta modalità locale che simulerà il cluster Storm in un processo locale, in tal caso dovremmo rimuovere il fornito.

4. Modello di dati

Il modello di dati di Apache Storm è costituito da due elementi: tuple e flussi.

4.1. Tupla

Una tupla è un elenco ordinato di campi denominati con tipi dinamici. Ciò significa che non è necessario dichiarare esplicitamente i tipi dei campi.

Storm ha bisogno di sapere come serializzare tutti i valori utilizzati in una tupla. Per impostazione predefinita, può già serializzare tipi primitivi, stringhe e array di byte .

E poiché Storm usa la serializzazione Kryo, dobbiamo registrare il serializzatore usando Config per usare i tipi personalizzati. Possiamo farlo in due modi:

Innanzitutto, possiamo registrare la classe da serializzare utilizzando il suo nome completo:

Config config = new Config(); config.registerSerialization(User.class);

In tal caso, Kryo serializzerà la classe usando FieldSerializer. Per impostazione predefinita, questo serializzerà tutti i campi non transitori della classe, sia privati ​​che pubblici.

Oppure possiamo fornire sia la classe da serializzare che il serializzatore che vogliamo che Storm utilizzi per quella classe:

Config config = new Config(); config.registerSerialization(User.class, UserSerializer.class);

Per creare il serializzatore personalizzato, dobbiamo estendere la classe generica Serializer che ha due metodi di scrittura e lettura.

4.2. Stream

Uno Stream è l'astrazione principale nell'ecosistema Storm. Lo Stream è una sequenza illimitata di tuple.

Storms consente di elaborare più flussi in parallelo.

Ogni flusso ha un ID che viene fornito e assegnato durante la dichiarazione.

5. Topologia

La logica dell'applicazione Storm in tempo reale è inclusa nella topologia. La topologia è composta da beccucci e bulloni .

5.1. Becco

I beccucci sono le fonti dei flussi. Emettono tuple alla topologia.

Le tuple possono essere lette da vari sistemi esterni come Kafka, Kestrel o ActiveMQ.

I beccucci possono essere affidabili o inaffidabili . Affidabile significa che lo spout può rispondere che la tupla che non è stata elaborata da Storm. Inaffidabile significa che il beccuccio non risponde poiché utilizzerà un meccanismo di fuoco e dimentica per emettere le tuple.

Per creare lo spout personalizzato, dobbiamo implementare l' interfaccia IRichSpout o estendere qualsiasi classe che già implementa l'interfaccia, ad esempio una classe BaseRichSpout astratta .

Creiamo un beccuccio inaffidabile :

public class RandomIntSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector outputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); outputCollector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis())); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp")); } }

Il nostro RandomIntSpout personalizzato genererà un numero intero casuale e un timestamp ogni secondo.

5.2. Bullone

I bulloni elaborano le tuple nel flusso. Possono eseguire varie operazioni come filtri, aggregazioni o funzioni personalizzate.

Alcune operazioni richiedono più passaggi e quindi in questi casi sarà necessario utilizzare più bulloni.

Per creare il Bolt personalizzato , dobbiamo implementare IRichBolt o per operazioni più semplici l' interfaccia IBasicBolt .

Sono disponibili anche più classi di supporto per l'implementazione di Bolt. In questo caso, useremo BaseBasicBolt :

public class PrintingBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { System.out.println(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }

Questo PrintingBolt personalizzato stamperà semplicemente tutte le tuple sulla console.

6. Creazione di una topologia semplice

Mettiamo insieme queste idee in una topologia semplice. La nostra topologia avrà un beccuccio e tre bulloni.

6.1. RandomNumberSpout

All'inizio creeremo un beccuccio inaffidabile. Genererà numeri interi casuali dall'intervallo (0,100) ogni secondo:

public class RandomNumberSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector collector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); collector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); int operation = random.nextInt(101); long timestamp = System.currentTimeMillis(); Values values = new Values(operation, timestamp); collector.emit(values); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }

6.2. FilteringBolt

Successivamente, creeremo un bullone che filtrerà tutti gli elementi con operazione uguale a 0:

public class FilteringBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { int operation = tuple.getIntegerByField("operation"); if (operation > 0) { basicOutputCollector.emit(tuple.getValues()); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }

6.3. AggregatingBolt

Successivamente, creiamo un Bolt più complicato che aggregherà tutte le operazioni positive di ogni giorno.

A tale scopo, utilizzeremo una classe specifica creata appositamente per implementare i bulloni che operano su finestre invece di operare su singole tuple: BaseWindowedBolt .

Le finestre sono un concetto essenziale nell'elaborazione dei flussi, dividendo i flussi infiniti in blocchi finiti. Possiamo quindi applicare i calcoli a ciascun blocco. Esistono generalmente due tipi di finestre:

Le finestre temporali vengono utilizzate per raggruppare gli elementi di un determinato periodo di tempo utilizzando i timestamp . Le finestre temporali possono avere un numero diverso di elementi.

Le finestre di conteggio vengono utilizzate per creare finestre con una dimensione definita . In tal caso, tutte le finestre avranno la stessa dimensione e la finestra non verrà emessa se ci sono meno elementi rispetto alla dimensione definita.

Il nostro AggregatingBolt genererà la somma di tutte le operazioni positive da una finestra temporale insieme ai timestamp di inizio e fine:

public class AggregatingBolt extends BaseWindowedBolt { private OutputCollector outputCollector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.outputCollector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override public void execute(TupleWindow tupleWindow) { List tuples = tupleWindow.get(); tuples.sort(Comparator.comparing(this::getTimestamp)); int sumOfOperations = tuples.stream() .mapToInt(tuple -> tuple.getIntegerByField("operation")) .sum(); Long beginningTimestamp = getTimestamp(tuples.get(0)); Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1)); Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp); outputCollector.emit(values); } private Long getTimestamp(Tuple tuple) { return tuple.getLongByField("timestamp"); } }

Nota che, in questo caso, ottenere direttamente il primo elemento della lista è sicuro. Questo perché ogni finestra viene calcolata utilizzando il campo timestamp della tupla, quindi deve esserci almeno un elemento in ogni finestra.

6.4. FileWritingBolt

Infine, creeremo un bolt che prenderà tutti gli elementi con sumOfOperations maggiore di 2000, li serializzerà e li scriverà nel file:

public class FileWritingBolt extends BaseRichBolt { public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class); private BufferedWriter writer; private String filePath; private ObjectMapper objectMapper; @Override public void cleanup() { try { writer.close(); } catch (IOException e) { logger.error("Failed to close writer!"); } } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); try { writer = new BufferedWriter(new FileWriter(filePath)); } catch (IOException e) { logger.error("Failed to open a file for writing.", e); } } @Override public void execute(Tuple tuple) { int sumOfOperations = tuple.getIntegerByField("sumOfOperations"); long beginningTimestamp = tuple.getLongByField("beginningTimestamp"); long endTimestamp = tuple.getLongByField("endTimestamp"); if (sumOfOperations > 2000) { AggregatedWindow aggregatedWindow = new AggregatedWindow( sumOfOperations, beginningTimestamp, endTimestamp); try { writer.write(objectMapper.writeValueAsString(aggregatedWindow)); writer.newLine(); writer.flush(); } catch (IOException e) { logger.error("Failed to write data to file.", e); } } } // public constructor and other methods }

Notare che non è necessario dichiarare l'output poiché questo sarà l'ultimo bolt nella nostra topologia

6.5. Esecuzione della topologia

Infine, possiamo mettere tutto insieme ed eseguire la nostra topologia:

public static void runTopology() { TopologyBuilder builder = new TopologyBuilder(); Spout random = new RandomNumberSpout(); builder.setSpout("randomNumberSpout"); Bolt filtering = new FilteringBolt(); builder.setBolt("filteringBolt", filtering) .shuffleGrouping("randomNumberSpout"); Bolt aggregating = new AggregatingBolt() .withTimestampField("timestamp") .withLag(BaseWindowedBolt.Duration.seconds(1)) .withWindow(BaseWindowedBolt.Duration.seconds(5)); builder.setBolt("aggregatingBolt", aggregating) .shuffleGrouping("filteringBolt");  String filePath = "./src/main/resources/data.txt"; Bolt file = new FileWritingBolt(filePath); builder.setBolt("fileBolt", file) .shuffleGrouping("aggregatingBolt"); Config config = new Config(); config.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Test", config, builder.createTopology()); }

Per far fluire i dati attraverso ogni parte della topologia, è necessario indicare come collegarli. shuffleGroup ci consente di affermare che i dati per filteringBolt proverranno da randomNumberSpout .

Per ogni Bolt , dobbiamo aggiungere shuffleGroup che definisce l'origine degli elementi per questo bolt. La fonte degli elementi può essere un beccuccio o un altro bullone. E se impostiamo la stessa sorgente per più di un bullone , la sorgente emetterà tutti gli elementi a ciascuno di essi.

In questo caso, la nostra topologia utilizzerà LocalCluster per eseguire il lavoro in locale.

7. Conclusione

In questo tutorial, abbiamo introdotto Apache Storm, un sistema di calcolo distribuito in tempo reale. Abbiamo creato un beccuccio, alcuni bulloni e li abbiamo riuniti in una topologia completa.

E, come sempre, tutti gli esempi di codice possono essere trovati su GitHub.