Client MQTT in Java

1. Panoramica

In questo tutorial, vedremo come aggiungere la messaggistica MQTT in un progetto Java utilizzando le librerie fornite dal progetto Eclipse Paho.

2. Primer MQTT

MQTT (MQ Telemetry Transport) è un protocollo di messaggistica creato per soddisfare la necessità di un metodo semplice e leggero per trasferire dati a / da dispositivi a bassa potenza, come quelli utilizzati nelle applicazioni industriali.

Con la crescente popolarità dei dispositivi IoT (Internet of Things), MQTT ha visto un maggiore utilizzo, portando alla sua standardizzazione da parte di OASIS e ISO.

Il protocollo supporta un unico modello di messaggistica, ovvero il modello Pubblica-Sottoscrivi: ogni messaggio inviato da un client contiene un “argomento” associato che viene utilizzato dal broker per instradarlo ai client sottoscritti. I nomi degli argomenti possono essere semplici stringhe come " oiltemp " o una stringa simile a un percorso " motore / 1 / rpm ".

Per ricevere messaggi, un client si iscrive a uno o più argomenti utilizzando il suo nome esatto o una stringa contenente uno dei caratteri jolly supportati ("#" per argomenti multilivello e "+" per singolo livello ").

3. Configurazione del progetto

Per includere la libreria Paho in un progetto Maven, dobbiamo aggiungere la seguente dipendenza:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

L'ultima versione del modulo della libreria Java Eclipse Paho può essere scaricata da Maven Central.

4. Configurazione del client

Quando si utilizza la libreria Paho, la prima cosa che dobbiamo fare per inviare e / o ricevere messaggi da un broker MQTT è ottenere un'implementazione dell'interfaccia IMqttClient . Questa interfaccia contiene tutti i metodi richiesti da un'applicazione per stabilire una connessione al server, inviare e ricevere messaggi.

Paho esce dagli schemi con due implementazioni di questa interfaccia, una asincrona ( MqttAsyncClient ) e una sincrona ( MqttClient ).Nel nostro caso, ci concentreremo sulla versione sincrona, che ha una semantica più semplice.

La configurazione stessa è un processo in due fasi: prima creiamo un'istanza della classe MqttClient e poi la colleghiamo al nostro server. La sottosezione seguente descrive in dettaglio questi passaggi.

4.1. Creazione di una nuova istanza IMqttClient

Il frammento di codice seguente mostra come creare una nuova istanza sincrona IMqttClient :

String publisherId = UUID.randomUUID().toString(); IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);

In questo caso, stiamo usando il costruttore più semplice disponibile, che prende l'indirizzo endpoint del nostro broker MQTT e un identificatore client , che identifica in modo univoco il nostro cliente.

Nel nostro caso, abbiamo utilizzato un UUID casuale, quindi a ogni esecuzione verrà generato un nuovo identificatore client.

Paho fornisce anche costruttori aggiuntivi che possiamo utilizzare per personalizzare il meccanismo di persistenza utilizzato per archiviare messaggi non riconosciuti e / o ScheduledExecutorService utilizzato per eseguire attività in background richieste dall'implementazione del motore di protocollo.

L'endpoint del server che stiamo utilizzando è un broker MQTT pubblico ospitato dal progetto Paho , che consente a chiunque disponga di una connessione Internet di testare i client senza la necessità di alcuna autenticazione.

4.2. Connessione al server

La nostra istanza MqttClient appena creata non è connessa al server. Lo facciamo chiamando il suo metodo connect () , opzionalmente passando un'istanza MqttConnectOptions che ci permette di personalizzare alcuni aspetti del protocollo.

In particolare, possiamo utilizzare queste opzioni per passare informazioni aggiuntive come credenziali di sicurezza, modalità di ripristino della sessione, modalità di riconnessione e così via.

La classe MqttConnectionOptions espone queste opzioni come proprietà semplici che possiamo impostare utilizzando i normali metodi setter. Dobbiamo solo impostare le proprietà richieste per il nostro scenario: le restanti assumeranno valori predefiniti.

Il codice utilizzato per stabilire una connessione al server in genere ha questo aspetto:

MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setCleanSession(true); options.setConnectionTimeout(10); publisher.connect(options);

Qui definiamo le nostre opzioni di connessione in modo che:

  • La libreria tenterà automaticamente di riconnettersi al server in caso di errore di rete
  • Eliminerà i messaggi non inviati da un'esecuzione precedente
  • Il timeout di connessione è impostato su 10 secondi

5. Invio di messaggi

L'invio di messaggi utilizzando un MqttClient già connesso è molto semplice. Utilizziamo una delle varianti del metodo publish () per inviare il payload, che è sempre un array di byte, a un determinato argomento , utilizzando una delle seguenti opzioni di qualità del servizio:

  • 0 - semantica "al massimo una volta", nota anche come "spara e dimentica". Utilizzare questa opzione quando la perdita di messaggi è accettabile, poiché non richiede alcun tipo di riconoscimento o persistenza
  • 1 - semantica “almeno una volta”. Utilizza questa opzione quando la perdita di messaggi non è accettabile e i tuoi abbonati possono gestire i duplicati
  • 2 - semantica "esattamente una volta". Utilizzare questa opzione quando la perdita di messaggi non è accettabile e gli iscritti non possono gestire i duplicati

Nel nostro progetto di esempio, la classe EngineTemperatureSensor svolge il ruolo di un sensore fittizio che produce una nuova lettura della temperatura ogni volta che invochiamo il suo metodo call () .

Questa classe implementa l' interfaccia Callable in modo che possiamo facilmente usarla con una delle implementazioni ExecutorService disponibili nel pacchetto java.util.concurrent :

public class EngineTemperatureSensor implements Callable { // ... private members omitted public EngineTemperatureSensor(IMqttClient client) { this.client = client; } @Override public Void call() throws Exception { if ( !client.isConnected()) { return null; } MqttMessage msg = readEngineTemp(); msg.setQos(0); msg.setRetained(true); client.publish(TOPIC,msg); return null; } private MqttMessage readEngineTemp() { double temp = 80 + rnd.nextDouble() * 20.0; byte[] payload = String.format("T:%04.2f",temp) .getBytes(); return new MqttMessage(payload); } }

Il MqttMessage incapsula il payload stesso, la qualità del servizio richiesta e anche il flag mantenuto per il messaggio. Questo flag indica al broker che deve conservare questo messaggio fino a quando non viene utilizzato da un abbonato.

Possiamo utilizzare questa funzione per implementare un comportamento "ultimo valido noto", così quando un nuovo abbonato si connette al server, riceverà immediatamente il messaggio conservato.

6. Ricezione di messaggi

Per ricevere messaggi dal broker MQTT, dobbiamo utilizzare una delle varianti del metodo subscribe () , che ci consentono di specificare:

  • Uno o più filtri argomento per i messaggi che vogliamo ricevere
  • Il QoS associato
  • Il gestore di callback per elaborare i messaggi ricevuti

Nell'esempio seguente, viene mostrato come aggiungere un listener di messaggi a un'istanza IMqttClient esistente per ricevere messaggi da un determinato argomento. Usiamo un CountDownLatch come meccanismo di sincronizzazione tra il nostro callback e il thread di esecuzione principale, decrementandolo ogni volta che arriva un nuovo messaggio.

Nel codice di esempio, abbiamo usato un'istanza IMqttClient diversa per ricevere i messaggi. Lo abbiamo fatto solo per rendere più chiaro quale client fa cosa, ma questa non è una limitazione di Paho: se vuoi, puoi usare lo stesso client per pubblicare e ricevere messaggi:

CountDownLatch receivedSignal = new CountDownLatch(10); subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { byte[] payload = msg.getPayload(); // ... payload handling omitted receivedSignal.countDown(); }); receivedSignal.await(1, TimeUnit.MINUTES);

La variante subscribe () usata sopra accetta un'istanza IMqttMessageListener come secondo argomento.

Nel nostro caso, utilizziamo una semplice funzione lambda che elabora il payload e decrementa un contatore. Se non arrivano abbastanza messaggi nella finestra temporale specificata (1 minuto), il metodo await () genererà un'eccezione.

When using Paho, we don't need to explicitly acknowledge message receipt. If the callback returns normally, Paho assumes it a successful consumption and sends an acknowledgment to the server.

If the callback throws an Exception, the client will be shut down. Please note that this will result in loss of any messages sent with QoS level of 0.

Messages sent with QoS level 1 or 2 will be resent by the server once the client is reconnected and subscribes to the topic again.

7. Conclusion

In this article, we demonstrated how we can add support for the MQTT protocol in our Java applications using the library provided by the Eclipse Paho project.

Questa libreria gestisce tutti i dettagli del protocollo di basso livello, permettendoci di concentrarci su altri aspetti della nostra soluzione, lasciando un buon spazio per personalizzare aspetti importanti delle sue caratteristiche interne, come la persistenza del messaggio.

Il codice mostrato in questo articolo è disponibile su GitHub.