Introduzione a Netflix Mantis

1. Panoramica

In questo articolo, daremo uno sguardo alla piattaforma Mantis sviluppata da Netflix.

Esploreremo i principali concetti di Mantis creando, eseguendo e analizzando un processo di elaborazione del flusso.

2. Che cos'è Mantis?

Mantis è una piattaforma per la creazione di applicazioni di elaborazione del flusso (lavori). Fornisce un modo semplice per gestire la distribuzione e il ciclo di vita dei lavori. Inoltre, facilita l'allocazione delle risorse, la scoperta e la comunicazione tra questi lavori.

Pertanto, gli sviluppatori possono concentrarsi sulla logica aziendale effettiva, pur avendo il supporto di una piattaforma robusta e scalabile per eseguire le loro applicazioni ad alto volume, bassa latenza e non bloccanti.

Un lavoro Mantis è composto da tre parti distinte:

  • la fonte , responsabile del recupero dei dati da una fonte esterna
  • una o più fasi , responsabili dell'elaborazione dei flussi di eventi in arrivo
  • e un sink che raccoglie i dati elaborati

Esploriamo ora ciascuno di essi.

3. Configurazione e dipendenze

Iniziamo aggiungendo le dipendenze mantis-runtime e jackson-databind :

 io.mantisrx mantis-runtime   com.fasterxml.jackson.core jackson-databind 

Ora, per impostare l'origine dati del nostro lavoro, implementiamo l' interfaccia Mantis Source :

public class RandomLogSource implements Source { @Override public Observable
    
      call(Context context, Index index) { return Observable.just( Observable .interval(250, TimeUnit.MILLISECONDS) .map(this::createRandomLogEvent)); } private String createRandomLogEvent(Long tick) { // generate a random log entry string ... } }
    

Come possiamo vedere, genera semplicemente voci di registro casuali più volte al secondo.

4. Il nostro primo lavoro

Creiamo ora un lavoro Mantis che raccolga semplicemente gli eventi di registro dal nostro RandomLogSource . Successivamente, aggiungeremo trasformazioni di gruppo e aggregazione per un risultato più complesso e interessante.

Per cominciare, creiamo un'entità LogEvent :

public class LogEvent implements JsonType { private Long index; private String level; private String message; // ... }

Quindi, aggiungiamo il nostro TransformLogStage.

È una fase semplice che implementa l'interfaccia ScalarComputation e divide una voce di log per creare un LogEvent . Inoltre, filtra eventuali stringhe formattate errate:

public class TransformLogStage implements ScalarComputation { @Override public Observable call(Context context, Observable logEntry) { return logEntry .map(log -> log.split("#")) .filter(parts -> parts.length == 3) .map(LogEvent::new); } }

4.1. Eseguire il lavoro

A questo punto, abbiamo abbastanza elementi costitutivi per mettere insieme il nostro lavoro Mantis:

public class LogCollectingJob extends MantisJobProvider { @Override public Job getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), new ScalarToScalar.Config()) .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString))) .metadata(new Metadata.Builder().build()) .create(); } }

Diamo uno sguardo più da vicino al nostro lavoro.

Come possiamo vedere, estende MantisJobProvider. All'inizio, recupera i dati dal nostro RandomLogSource e applica TransformLogStage ai dati recuperati. Infine, invia i dati elaborati al sink integrato che si iscrive con entusiasmo e fornisce i dati tramite SSE.

Ora configuriamo il nostro lavoro in modo che venga eseguito localmente all'avvio:

@SpringBootApplication public class MantisApplication implements CommandLineRunner { // ... @Override public void run(String... args) { LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance()); } }

Eseguiamo l'applicazione. Vedremo un messaggio di registro come:

... Serving modern HTTP SSE server sink on port: 86XX

Collegiamoci ora al lavandino usando curl :

$ curl localhost:86XX data: {"index":86,"level":"WARN","message":"login attempt"} data: {"index":87,"level":"ERROR","message":"user created"} data: {"index":88,"level":"INFO","message":"user created"} data: {"index":89,"level":"INFO","message":"login attempt"} data: {"index":90,"level":"INFO","message":"user created"} data: {"index":91,"level":"ERROR","message":"user created"} data: {"index":92,"level":"WARN","message":"login attempt"} data: {"index":93,"level":"INFO","message":"user created"} ...

4.2. Configurazione del sink

Finora abbiamo utilizzato il sink integrato per raccogliere i nostri dati elaborati. Vediamo se possiamo aggiungere maggiore flessibilità al nostro scenario fornendo un sink personalizzato.

E se, ad esempio, volessimo filtrare i log per messaggio ?

Creiamo un LogSink che implementa l' interfaccia Sink :

public class LogSink implements Sink { @Override public void call(Context context, PortRequest portRequest, Observable logEventObservable) { SelfDocumentingSink sink = new ServerSentEventsSink.Builder() .withEncoder(LogEvent::toJsonString) .withPredicate(filterByLogMessage()) .build(); logEventObservable.subscribe(); sink.call(context, portRequest, logEventObservable); } private Predicate filterByLogMessage() { return new Predicate("filter by message", parameters -> { if (parameters != null && parameters.containsKey("filter")) { return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0)); } return logEvent -> true; }); } }

In questa implementazione del sink, abbiamo configurato un predicato che utilizza il parametro filter per recuperare solo i log che contengono il testo impostato nel parametro filter :

$ curl localhost:8874?filter=login data: {"index":93,"level":"ERROR","message":"login attempt"} data: {"index":95,"level":"INFO","message":"login attempt"} data: {"index":97,"level":"ERROR","message":"login attempt"} ...

Nota Mantis offre anche un potente linguaggio di query, MQL, che può essere utilizzato per eseguire query, trasformare e analizzare i dati di flusso in modo SQL.

5. Stage Chaining

Supponiamo ora di essere interessati a sapere quante voci di log ERROR , WARN o INFO abbiamo in un dato intervallo di tempo. Per questo, aggiungeremo altre due fasi al nostro lavoro e le concateneremo insieme.

5.1. Raggruppamento

In primo luogo, creiamo un GroupLogStage.

This stage is a ToGroupComputation implementation that receives a LogEvent stream data from the existing TransformLogStage. After that, it groups entries by logging level and sends them to the next stage:

public class GroupLogStage implements ToGroupComputation { @Override public Observable
    
      call(Context context, Observable logEvent) { return logEvent.map(log -> new MantisGroup(log.getLevel(), log)); } public static ScalarToGroup.Config config(){ return new ScalarToGroup.Config() .description("Group event data by level") .codec(JacksonCodecs.pojo(LogEvent.class)) .concurrentInput(); } }
    

We've also created a custom stage config by providing a description, the codec to use for serializing the output, and allowed this stage's call method to run concurrently by using concurrentInput().

One thing to note is that this stage is horizontally scalable. Meaning we can run as many instances of this stage as needed. Also worth mentioning, when deployed in a Mantis cluster, this stage sends data to the next stage so that all events belonging to a particular group will land on the same worker of the next stage.

5.2. Aggregating

Before we move on and create the next stage, let's first add a LogAggregate entity:

public class LogAggregate implements JsonType { private final Integer count; private final String level; }

Now, let's create the last stage in the chain.

This stage implements GroupToScalarComputation and transforms a stream of log groups to a scalar LogAggregate. It does this by counting how many times each type of log appears in the stream. In addition, it also has a LogAggregationDuration parameter, which can be used to control the size of the aggregation window:

public class CountLogStage implements GroupToScalarComputation { private int duration; @Override public void init(Context context) { duration = (int)context.getParameters().get("LogAggregationDuration", 1000); } @Override public Observable call(Context context, Observable
    
      mantisGroup) { return mantisGroup .window(duration, TimeUnit.MILLISECONDS) .flatMap(o -> o.groupBy(MantisGroup::getKeyValue) .flatMap(group -> group.reduce(0, (count, value) -> count = count + 1) .map((count) -> new LogAggregate(count, group.getKey())) )); } public static GroupToScalar.Config config(){ return new GroupToScalar.Config() .description("sum events for a log level") .codec(JacksonCodecs.pojo(LogAggregate.class)) .withParameters(getParameters()); } public static List
     
       getParameters() { List
      
        params = new ArrayList(); params.add(new IntParameter() .name("LogAggregationDuration") .description("window size for aggregation in milliseconds") .validator(Validators.range(100, 10000)) .defaultValue(5000) .build()); return params; } }
      
     
    

5.3. Configure and Run the Job

The only thing left to do now is to configure our job:

public class LogAggregationJob extends MantisJobProvider { @Override public Job getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), TransformLogStage.stageConfig()) .stage(new GroupLogStage(), GroupLogStage.config()) .stage(new CountLogStage(), CountLogStage.config()) .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString))) .metadata(new Metadata.Builder().build()) .create(); } }

As soon as we run the application and execute our new job, we can see the log counts being retrieved every few seconds:

$ curl localhost:8133 data: {"count":3,"level":"ERROR"} data: {"count":13,"level":"INFO"} data: {"count":4,"level":"WARN"} data: {"count":8,"level":"ERROR"} data: {"count":5,"level":"INFO"} data: {"count":7,"level":"WARN"} ...

6. Conclusion

To sum up, in this article, we've seen what Netflix Mantis is and what it can be used for. Furthermore, we looked at the main concepts, used them to build jobs, and explored custom configurations for different scenarios.

Come sempre, il codice completo è disponibile su GitHub.