Una guida ad Apache Mesos

1. Panoramica

Di solito distribuiamo varie applicazioni sullo stesso cluster di macchine. Ad esempio, oggigiorno è comune avere un motore di elaborazione distribuito come Apache Spark o Apache Flink con database distribuiti come Apache Cassandra nello stesso cluster.

Apache Mesos è una piattaforma che consente un'efficace condivisione delle risorse tra tali applicazioni.

In questo articolo, discuteremo prima di alcuni problemi di allocazione delle risorse all'interno delle applicazioni distribuite sullo stesso cluster. Successivamente, vedremo come Apache Mesos fornisce un migliore utilizzo delle risorse tra le applicazioni.

2. Condivisione del cluster

Molte applicazioni devono condividere un cluster. In generale, ci sono due approcci comuni:

  • Partizionare il cluster in modo statico ed eseguire un'applicazione su ciascuna partizione
  • Assegna un insieme di macchine a un'applicazione

Sebbene questi approcci consentano alle applicazioni di funzionare indipendentemente l'una dall'altra, non consentono un elevato utilizzo delle risorse.

Ad esempio, si consideri un'applicazione che viene eseguita solo per un breve periodo seguito da un periodo di inattività. Ora, poiché abbiamo allocato macchine o partizioni statiche a questa applicazione, abbiamo risorse non utilizzate durante il periodo di inattività.

Possiamo ottimizzare l'utilizzo delle risorse riallocando le risorse gratuite durante il periodo di inattività ad altre applicazioni.

Apache Mesos aiuta con l'allocazione dinamica delle risorse tra le applicazioni.

3. Apache Mesos

Con entrambi gli approcci di condivisione dei cluster discussi sopra, le applicazioni sono a conoscenza solo delle risorse di una particolare partizione o macchina che stanno eseguendo. Tuttavia, Apache Mesos fornisce alle applicazioni una vista astratta di tutte le risorse nel cluster.

Come vedremo tra poco, Mesos funge da interfaccia tra macchine e applicazioni. Fornisce alle applicazioni le risorse disponibili su tutte le macchine del cluster. Si aggiorna di frequente queste informazioni per includere le risorse che si liberano dalle applicazioni che hanno raggiunto lo stato di completamento. Ciò consente alle applicazioni di prendere la decisione migliore su quale attività eseguire su quale macchina.

Per capire come funziona Mesos, diamo uno sguardo alla sua architettura:

Questa immagine fa parte della documentazione ufficiale per Mesos (fonte). Qui, Hadoop e MPI sono due applicazioni che condividono il cluster.

Parleremo di ogni componente mostrato qui nelle prossime sezioni.

3.1. Mesos Master

Il master è il componente principale di questa configurazione e memorizza lo stato corrente delle risorse nel cluster. Inoltre, funge da orchestratore tra gli agenti e le applicazioni trasmettendo informazioni su cose come risorse e attività.

Poiché qualsiasi errore nel master provoca la perdita dello stato delle risorse e delle attività, lo distribuiamo nella configurazione ad alta disponibilità. Come si può vedere nel diagramma sopra, Mesos schiera demoni master in standby insieme a un leader. Questi daemon si basano su Zookeeper per il ripristino dello stato in caso di errore.

3.2. Mesos Agents

Un cluster Mesos deve eseguire un agente su ogni macchina. Questi agenti segnalano periodicamente le proprie risorse al master e, a loro volta, ricevono le attività che un'applicazione ha pianificato per l'esecuzione . Questo ciclo si ripete dopo che l'attività pianificata è stata completata o persa.

Vedremo come le applicazioni pianificano ed eseguono le attività su questi agenti nelle sezioni seguenti.

3.3. Mesos Frameworks

Mesos consente alle applicazioni di implementare un componente astratto che interagisce con il Master per ricevere le risorse disponibili nel cluster e inoltre prendere decisioni di schedulazione basate su di esse. Questi componenti sono noti come framework.

Un framework Mesos è composto da due sottocomponenti:

  • Scheduler : consente alle applicazioni di pianificare le attività in base alle risorse disponibili su tutti gli agenti
  • Esecutore : viene eseguito su tutti gli agenti e contiene tutte le informazioni necessarie per eseguire qualsiasi attività pianificata su quell'agente

L'intero processo è rappresentato con questo flusso:

Innanzitutto, gli agenti segnalano le proprie risorse al master. In questo momento, il master offre queste risorse a tutti gli scheduler registrati. Questo processo è noto come offerta di risorse e ne parleremo in dettaglio nella sezione successiva.

Lo scheduler seleziona quindi l'agente migliore ed esegue varie attività su di esso tramite il master. Non appena l'esecutore completa l'attività assegnata, gli agenti ripubblicano le proprie risorse nel master. Il master ripete questo processo di condivisione delle risorse per tutti i framework nel cluster.

Mesos consente alle applicazioni di implementare il loro pianificatore ed esecutore personalizzato in vari linguaggi di programmazione. Un'implementazione Java dello scheduler deve implementare l' interfaccia Scheduler :

public class HelloWorldScheduler implements Scheduler { @Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID, Protos.MasterInfo masterInfo) { } @Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) { } @Override public void resourceOffers(SchedulerDriver schedulerDriver, List list) { } @Override public void offerRescinded(SchedulerDriver schedulerDriver, OfferID offerID) { } @Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) { } @Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, byte[] bytes) { } @Override public void disconnected(SchedulerDriver schedulerDriver) { } @Override public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) { } @Override public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, int i) { } @Override public void error(SchedulerDriver schedulerDriver, String s) { } }

Come si può vedere, consiste principalmente in vari metodi di callback per la comunicazione in particolare con il master .

Allo stesso modo, l'implementazione di un esecutore deve implementare l' interfaccia dell'Executor :

public class HelloWorldExecutor implements Executor { @Override public void registered(ExecutorDriver driver, Protos.ExecutorInfo executorInfo, Protos.FrameworkInfo frameworkInfo, Protos.SlaveInfo slaveInfo) { } @Override public void reregistered(ExecutorDriver driver, Protos.SlaveInfo slaveInfo) { } @Override public void disconnected(ExecutorDriver driver) { } @Override public void launchTask(ExecutorDriver driver, Protos.TaskInfo task) { } @Override public void killTask(ExecutorDriver driver, Protos.TaskID taskId) { } @Override public void frameworkMessage(ExecutorDriver driver, byte[] data) { } @Override public void shutdown(ExecutorDriver driver) { } }

Vedremo una versione operativa dello scheduler e dell'esecutore in una sezione successiva.

4. Gestione delle risorse

4.1. Offerte di risorse

Come abbiamo discusso in precedenza, gli agenti pubblicano le informazioni sulle risorse nel master. A sua volta, il master offre queste risorse ai framework in esecuzione nel cluster. Questo processo è noto come offerta di risorse.

Un'offerta di risorse è composta da due parti: risorse e attributi.

Le risorse vengono utilizzate per pubblicare le informazioni sull'hardware della macchina dell'agente come memoria, CPU e disco.

Sono disponibili cinque risorse predefinite per ogni agente:

  • processore
  • gpus
  • mem
  • disco
  • porti

I valori per queste risorse possono essere definiti in uno dei tre tipi:

  • Scalare : utilizzato per rappresentare le informazioni numeriche utilizzando numeri in virgola mobile per consentire valori frazionari come 1,5 G di memoria
  • Intervallo : utilizzato per rappresentare un intervallo di valori scalari, ad esempio un intervallo di porte
  • Set : utilizzato per rappresentare più valori di testo

Per impostazione predefinita, l'agente Mesos tenta di rilevare queste risorse dalla macchina.

Tuttavia, in alcune situazioni, possiamo configurare risorse personalizzate su un agente. I valori per tali risorse personalizzate dovrebbero essere di nuovo in uno dei tipi discussi sopra.

Ad esempio, possiamo avviare il nostro agente con queste risorse:

--resources='cpus:24;gpus:2;mem:24576;disk:409600;ports:[21000-24000,30000-34000];bugs(debug_role):{a,b,c}'

Come si può vedere, abbiamo configurato l'agente con poche delle risorse predefinite e una risorsa personalizzata denominata bug che è di tipo impostato .

Oltre alle risorse, gli agenti possono pubblicare attributi di valori-chiave nel master. Questi attributi fungono da metadati aggiuntivi per l'agente e aiutano i framework nelle decisioni di pianificazione.

A useful example can be to add agents into different racks or zones and then schedule various tasks on the same rack or zone to achieve data locality:

--attributes='rack:abc;zone:west;os:centos5;level:10;keys:[1000-1500]'

Similar to resources, values for attributes can be either a scalar, a range, or a text type.

4.2. Resource Roles

Many modern-day operating systems support multiple users. Similarly, Mesos also supports multiple users in the same cluster. These users are known as roles. We can consider each role as a resource consumer within a cluster.

Due to this, Mesos agents can partition the resources under different roles based on different allocation strategies. Furthermore, frameworks can subscribe to these roles within the cluster and have fine-grained control over resources under different roles.

For example, consider a cluster hosting applications which are serving different users in an organization. So by dividing the resources into roles, every application can work in isolation from one another.

Additionally, frameworks can use these roles to achieve data locality.

For instance, suppose we have two applications in the cluster named producer and consumer. Here, producer writes data to a persistent volume which consumer can read afterward. We can optimize the consumer application by sharing the volume with the producer.

Since Mesos allows multiple applications to subscribe to the same role, we can associate the persistent volume with a resource role. Furthermore, the frameworks for both producer and consumer will both subscribe to the same resource role. Therefore, the consumer application can now launch the data reading task on the same volume as the producer application.

4.3. Resource Reservation

Now the question may arise as to how Mesos allocates cluster resources into different roles. Mesos allocates the resources through reservations.

There are two types of reservations:

  • Static Reservation
  • Dynamic Reservation

Static reservation is similar to the resource allocation on agent startup we discussed in the earlier sections:

 --resources="cpus:4;mem:2048;cpus(baeldung):8;mem(baeldung):4096"

The only difference here is that now the Mesos agent reserves eight CPUs and 4096m of memory for the role named baeldung.

Dynamic reservation allows us to reshuffle the resources within roles, unlike the static reservation. Mesos allows frameworks and cluster operators to dynamically change the allocation of resources via framework messages as a response to resource offer or via HTTP endpoints.

Mesos allocates all resources without any role into a default role named (*). Master offers such resources to all frameworks whether or not they have subscribed to it.

4.4. Resource Weights and Quotas

Generally, the Mesos master offers resources using a fairness strategy. It uses the weighted Dominant Resource Fairness (wDRF) to identify the roles that lack resources. The master then offers more resources to the frameworks that have subscribed to these roles.

Event though fair sharing of resources between applications is an important characteristic of Mesos, its not always necessary. Suppose a cluster hosting applications that have a low resource footprint along with those having a high resource demand. In such deployments, we would want to allocate resources based on the nature of the application.

Mesos allows frameworks to demand more resources by subscribing to roles and adding a higher value of weight for that role. Therefore, if there are two roles, one of weight 1 and another of weight 2, Mesos will allocate twice the fair share of resources to the second role.

Similar to resources, we can configure weights via HTTP endpoints.

Besides ensuring a fair share of resources to a role with weights, Mesos also ensures that the minimum resources for a role are allocated.

Mesos allows us to add quotas to the resource roles. A quota specifies the minimum amount of resources that a role is guaranteed to receive.

5. Implementing Framework

As we discussed in an earlier section, Mesos allows applications to provide framework implementations in a language of their choice. In Java, a framework is implemented using the main class – which acts as an entry point for the framework process – and the implementation of Scheduler and Executor discussed earlier.

5.1. Framework Main Class

Before we implement a scheduler and an executor, we'll first implement the entry point for our framework that:

  • Registers itself with the master
  • Provides executor runtime information to agents
  • Starts the scheduler

We'll first add a Maven dependency for Mesos:

 org.apache.mesos mesos 0.28.3 

Next, we'll implement the HelloWorldMain for our framework. One of the first things we'll do is to start the executor process on the Mesos agent:

public static void main(String[] args) { String path = System.getProperty("user.dir") + "/target/libraries2-1.0.0-SNAPSHOT.jar"; CommandInfo.URI uri = CommandInfo.URI.newBuilder().setValue(path).setExtract(false).build(); String helloWorldCommand = "java -cp libraries2-1.0.0-SNAPSHOT.jar com.baeldung.mesos.executors.HelloWorldExecutor"; CommandInfo commandInfoHelloWorld = CommandInfo.newBuilder() .setValue(helloWorldCommand) .addUris(uri) .build(); ExecutorInfo executorHelloWorld = ExecutorInfo.newBuilder() .setExecutorId(Protos.ExecutorID.newBuilder() .setValue("HelloWorldExecutor")) .setCommand(commandInfoHelloWorld) .setName("Hello World (Java)") .setSource("java") .build(); }

Here, we first configured the executor binary location. Mesos agent would download this binary upon framework registration. Next, the agent would run the given command to start the executor process.

Next, we'll initialize our framework and start the scheduler:

FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder() .setFailoverTimeout(120000) .setUser("") .setName("Hello World Framework (Java)"); frameworkBuilder.setPrincipal("test-framework-java"); MesosSchedulerDriver driver = new MesosSchedulerDriver(new HelloWorldScheduler(), frameworkBuilder.build(), args[0]);

Finally, we'll start the MesosSchedulerDriver that registers itself with the Master. For successful registration, we must pass the IP of the Master as a program argument args[0] to this main class:

int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1; driver.stop(); System.exit(status);

In the class shown above, CommandInfo, ExecutorInfo, and FrameworkInfo are all Java representations of protobuf messages between master and frameworks.

5.2. Implementing Scheduler

Since Mesos 1.0, we can invoke the HTTP endpoint from any Java application to send and receive messages to the Mesos master. Some of these messages include, for example, framework registration, resource offers, and offer rejections.

For Mesos 0.28 or earlier, we need to implement the Scheduler interface:

For the most part, we'll only focus on the resourceOffers method of the Scheduler. Let's see how a scheduler receives resources and initializes tasks based on them.

First, we'll see how the scheduler allocates resources for a task:

@Override public void resourceOffers(SchedulerDriver schedulerDriver, List list) { for (Offer offer : list) { List tasks = new ArrayList(); Protos.TaskID taskId = Protos.TaskID.newBuilder() .setValue(Integer.toString(launchedTasks++)).build(); System.out.println("Launching printHelloWorld " + taskId.getValue() + " Hello World Java"); Protos.Resource.Builder cpus = Protos.Resource.newBuilder() .setName("cpus") .setType(Protos.Value.Type.SCALAR) .setScalar(Protos.Value.Scalar.newBuilder() .setValue(1)); Protos.Resource.Builder mem = Protos.Resource.newBuilder() .setName("mem") .setType(Protos.Value.Type.SCALAR) .setScalar(Protos.Value.Scalar.newBuilder() .setValue(128));

Here, we allocated 1 CPU and 128M of memory for our task. Next, we'll use the SchedulerDriver to launch the task on an agent:

 TaskInfo printHelloWorld = TaskInfo.newBuilder() .setName("printHelloWorld " + taskId.getValue()) .setTaskId(taskId) .setSlaveId(offer.getSlaveId()) .addResources(cpus) .addResources(mem) .setExecutor(ExecutorInfo.newBuilder(helloWorldExecutor)) .build(); List offerIDS = new ArrayList(); offerIDS.add(offer.getId()); tasks.add(printHelloWorld); schedulerDriver.launchTasks(offerIDS, tasks); } }

Alternatively, Scheduler often finds the need to reject resource offers. For example, if the Scheduler cannot launch a task on an agent due to lack of resources, it must immediately decline that offer:

schedulerDriver.declineOffer(offer.getId());

5.3. Implementing Executor

As we discussed earlier, the executor component of the framework is responsible for executing application tasks on the Mesos agent.

We used the HTTP endpoints for implementing Scheduler in Mesos 1.0. Likewise, we can use the HTTP endpoint for the executor.

In an earlier section, we discussed how a framework configures an agent to start the executor process:

java -cp libraries2-1.0.0-SNAPSHOT.jar com.baeldung.mesos.executors.HelloWorldExecutor

Notably, this command considers HelloWorldExecutor as the main class. We'll implement this main method to initialize the MesosExecutorDriver that connects with Mesos agents to receive tasks and share other information like task status:

public class HelloWorldExecutor implements Executor { public static void main(String[] args) { MesosExecutorDriver driver = new MesosExecutorDriver(new HelloWorldExecutor()); System.exit(driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1); } }

The last thing to do now is to accept tasks from the framework and launch them on the agent. The information to launch any task is self-contained within the HelloWorldExecutor:

public void launchTask(ExecutorDriver driver, TaskInfo task) { Protos.TaskStatus status = Protos.TaskStatus.newBuilder() .setTaskId(task.getTaskId()) .setState(Protos.TaskState.TASK_RUNNING) .build(); driver.sendStatusUpdate(status); System.out.println("Execute Task!!!"); status = Protos.TaskStatus.newBuilder() .setTaskId(task.getTaskId()) .setState(Protos.TaskState.TASK_FINISHED) .build(); driver.sendStatusUpdate(status); }

Of course, this is just a simple implementation, but it explains how an executor shares task status with the master at every stage and then executes the task before sending a completion status.

In some cases, executors can also send data back to the scheduler:

String myStatus = "Hello Framework"; driver.sendFrameworkMessage(myStatus.getBytes());

6. Conclusion

In questo articolo, abbiamo discusso brevemente della condivisione delle risorse tra le applicazioni in esecuzione nello stesso cluster. Abbiamo anche discusso di come Apache Mesos aiuta le applicazioni a ottenere il massimo utilizzo con una visione astratta delle risorse del cluster come CPU e memoria.

Successivamente, abbiamo discusso dell'allocazione dinamica delle risorse tra le applicazioni sulla base di varie politiche e ruoli di equità. Mesos consente alle applicazioni di prendere decisioni di pianificazione in base alle offerte di risorse degli agenti Mesos nel cluster.

Infine, abbiamo visto un'implementazione del framework Mesos in Java.

Come al solito, tutti gli esempi sono disponibili su GitHub.