Messaggistica affidabile con JGroups

1. Panoramica

JGroups è un'API Java per lo scambio di messaggi affidabile. È dotato di una semplice interfaccia che fornisce:

  • uno stack di protocolli flessibile, inclusi TCP e UDP
  • frammentazione e riassemblaggio di messaggi di grandi dimensioni
  • affidabile unicast e multicast
  • rilevamento dei guasti
  • controllo del flusso

Oltre a molte altre funzionalità.

In questo tutorial, creeremo una semplice applicazione per lo scambio di messaggi String tra le applicazioni e per fornire lo stato condiviso alle nuove applicazioni man mano che si uniscono alla rete.

2. Configurazione

2.1. Dipendenza da Maven

Dobbiamo aggiungere una singola dipendenza al nostro pom.xml :

 org.jgroups jgroups 4.0.10.Final  

L'ultima versione della libreria può essere verificata su Maven Central.

2.2. Networking

JGroups proverà a utilizzare IPV6 per impostazione predefinita. A seconda della configurazione del nostro sistema, ciò potrebbe impedire alle applicazioni di comunicare.

Per evitare ciò, imposteremo java.net.preferIPv4Stack sulla proprietà true durante l'esecuzione delle nostre applicazioni qui:

java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger 

3. JChannels

La nostra connessione a una rete JGroups è un JChannel. Il canale si unisce a un cluster e invia e riceve messaggi, nonché informazioni sullo stato della rete.

3.1. Creazione di un canale

Creiamo un JChannel con un percorso a un file di configurazione. Se omettiamo il nome del file, cercherà udp.xml nella directory di lavoro corrente.

Creeremo un canale con un file di configurazione dal nome esplicito:

JChannel channel = new JChannel("src/main/resources/udp.xml"); 

La configurazione di JGroups può essere molto complicata, ma le configurazioni UDP e TCP predefinite sono sufficienti per la maggior parte delle applicazioni. Abbiamo incluso il file per UDP nel nostro codice e lo useremo per questo tutorial.

Per ulteriori informazioni sulla configurazione del trasporto, consultare il manuale di JGroups qui.

3.2. Collegamento di un canale

Dopo aver creato il nostro canale, dobbiamo unirci a un cluster. Un cluster è un gruppo di nodi che scambiano messaggi.

L'adesione a un cluster richiede un nome cluster:

channel.connect("Baeldung"); 

Il primo nodo che tenta di unirsi a un cluster lo creerà se non esiste. Vedremo questo processo in azione di seguito.

3.3. Denominazione di un canale

I nodi sono identificati da un nome in modo che i peer possano inviare messaggi diretti e ricevere notifiche su chi sta entrando e uscendo dal cluster. JGroups assegnerà automaticamente un nome, oppure possiamo impostarne uno nostro:

channel.name("user1");

Useremo questi nomi di seguito, per tenere traccia di quando i nodi entrano ed escono dal cluster.

3.4. Chiusura di un canale

La pulizia del canale è essenziale se vogliamo che i peer ricevano una notifica tempestiva che siamo usciti.

Chiudiamo un JChannel con il suo metodo di chiusura:

channel.close()

4. Modifiche alla visualizzazione del cluster

Con un JChannel creato siamo ora pronti per vedere lo stato dei peer nel cluster e scambiare messaggi con loro.

JGroups mantiene lo stato del cluster all'interno della classe View . Ogni canale ha una singola visualizzazione della rete. Quando la visualizzazione cambia, viene fornita tramite il callback viewAccepted () .

Per questo tutorial, estenderemo la classe API ReceiverAdaptor che implementa tutti i metodi di interfaccia richiesti per un'applicazione.

È il modo consigliato per implementare i callback.

Aggiungiamo viewAccepted alla nostra applicazione:

public void viewAccepted(View newView) { private View lastView; if (lastView == null) { System.out.println("Received initial view:"); newView.forEach(System.out::println); } else { System.out.println("Received new view."); List newMembers = View.newMembers(lastView, newView); System.out.println("New members: "); newMembers.forEach(System.out::println); List exMembers = View.leftMembers(lastView, newView); System.out.println("Exited members:"); exMembers.forEach(System.out::println); } lastView = newView; } 

Ogni vista contiene un elenco di oggetti indirizzo , che rappresenta ogni membro del cluster. JGroups offre metodi convenienti per confrontare una visualizzazione con un'altra, che utilizziamo per rilevare membri nuovi o usciti dal cluster.

5. Invio di messaggi

La gestione dei messaggi in JGroups è semplice. Un messaggio contiene una matrice di byte e oggetti Indirizzo corrispondenti al mittente e al destinatario.

Per questo tutorial stiamo usando le stringhe lette dalla riga di comando, ma è facile vedere come un'applicazione potrebbe scambiare altri tipi di dati.

5.1. Broadcast Messages

A Message is created with a destination and a byte array; JChannel sets the sender for us. If the target is null, the entire cluster will receive the message.

We'll accept text from the command line and send it to the cluster:

System.out.print("Enter a message: "); String line = in.readLine().toLowerCase(); Message message = new Message(null, line.getBytes()); channel.send(message); 

If we run multiple instances of our program and send this message (after we implement the receive() method below), all of them would receive it, including the sender.

5.2. Blocking Our Messages

If we don't want to see our messages, we can set a property for that:

channel.setDiscardOwnMessages(true); 

When we run the previous test, the message sender does not receive its broadcast message.

5.3. Direct Messages

Sending a direct message requires a valid Address. If we're referring to nodes by name, we need a way to look up an Address. Fortunately, we have the View for that.

The current View is always available from the JChannel:

private Optional getAddress(String name) { View view = channel.view(); return view.getMembers().stream() .filter(address -> name.equals(address.toString())) .findAny(); } 

Address names are available via the class toString() method, so we merely search the List of cluster members for the name we want.

So we can accept a name on from the console, find the associated destination, and send a direct message:

Address destination = null; System.out.print("Enter a destination: "); String destinationName = in.readLine().toLowerCase(); destination = getAddress(destinationName) .orElseThrow(() -> new Exception("Destination not found"); Message message = new Message(destination, "Hi there!"); channel.send(message); 

6. Receiving Messages

We can send messages, now let's add try to receive them now.

Let's override ReceiverAdaptor's empty receive method:

public void receive(Message message) { String line = Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); System.out.println(line); } 

Since we know the message contains a String, we can safely pass getObject() to System.out.

7. State Exchange

When a node enters the network, it may need to retrieve state information about the cluster. JGroups provides a state transfer mechanism for this.

When a node joins the cluster, it simply calls getState(). The cluster usually retrieves the state from the oldest member in the group – the coordinator.

Let's add a broadcast message count to our application. We'll add a new member variable and increment it inside receive():

private Integer messageCount = 0; public void receive(Message message) { String line = "Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); System.out.println(line); if (message.getDest() == null) { messageCount++; System.out.println("Message count: " + messageCount); } } 

We check for a null destination because if we count direct messages, each node will have a different number.

Next, we override two more methods in ReceiverAdaptor:

public void setState(InputStream input) { try { messageCount = Util.objectFromStream(new DataInputStream(input)); } catch (Exception e) { System.out.println("Error deserialing state!"); } System.out.println(messageCount + " is the current messagecount."); } public void getState(OutputStream output) throws Exception { Util.objectToStream(messageCount, new DataOutputStream(output)); } 

Similar to messages, JGroups transfers state as an array of bytes.

JGroups supplies an InputStream to the coordinator to write the state to, and an OutputStream for the new node to read. The API provides convenience classes for serializing and deserializing the data.

Note that in production code access to state information must be thread-safe.

Finally, we add the call to getState() to our startup, after we connect to the cluster:

channel.connect(clusterName); channel.getState(null, 0); 

getState() accepts a destination from which to request the state and a timeout in milliseconds. A null destination indicates the coordinator and 0 means do not timeout.

Quando eseguiamo questa app con una coppia di nodi e scambiamo messaggi di trasmissione, vediamo l'incremento del conteggio dei messaggi.

Quindi se aggiungiamo un terzo client o ne interrompiamo e ne avviamo uno, vedremo il nodo appena connesso stampare il conteggio dei messaggi corretto.

8. Conclusione

In questo tutorial, abbiamo utilizzato JGroups per creare un'applicazione per lo scambio di messaggi. Abbiamo utilizzato l'API per monitorare quali nodi si sono connessi e hanno lasciato il cluster e anche per trasferire lo stato del cluster a un nuovo nodo quando è entrato.

Esempi di codice, come sempre, possono essere trovati su GitHub.