Apache RocketMQ con Spring Boot

1. Introduzione

In questo tutorial, creeremo un produttore e consumatore di messaggi utilizzando Spring Boot e Apache RocketMQ, una piattaforma di messaggistica e streaming di dati distribuita open source.

2. Dipendenze

Per i progetti Maven, dobbiamo aggiungere la dipendenza RocketMQ Spring Boot Starter:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. Produzione di messaggi

Per il nostro esempio, creeremo un produttore di messaggi di base che invierà eventi ogni volta che l'utente aggiunge o rimuove un articolo dal carrello.

Per prima cosa, impostiamo la posizione del nostro server e il nome del gruppo nella nostra application.properties :

rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=cart-producer-group

Nota che se avessimo più di un name server, potremmo elencarli come host: port; host: port .

Ora, per mantenerlo semplice, creeremo un'applicazione CommandLineRunner e genereremo alcuni eventi durante l'avvio dell'applicazione:

@SpringBootApplication public class CartEventProducer implements CommandLineRunner { @Autowired private RocketMQTemplate rocketMQTemplate; public static void main(String[] args) { SpringApplication.run(CartEventProducer.class, args); } public void run(String... args) throws Exception { rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1)); rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2)); rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); } }

Il CartItemEvent è costituito da due sole proprietà: l'ID dell'articolo e una quantità:

class CartItemEvent { private String itemId; private int quantity; // constructor, getters and setters }

Nell'esempio precedente, utilizziamo il metodo convertAndSend () , un metodo generico definito dalla classe astratta AbstractMessageSendingTemplate , per inviare i nostri eventi del carrello. Richiede due parametri: una destinazione, che nel nostro caso è il nome di un argomento e un payload del messaggio.

4. Consumatore di messaggi

Consumare messaggi RocketMQ è semplice come creare un componente Spring annotato con @RocketMQMessageListener e implementare l' interfaccia RocketMQListener :

@SpringBootApplication public class CartEventConsumer { public static void main(String[] args) { SpringApplication.run(CartEventConsumer.class, args); } @Service @RocketMQMessageListener( topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic" ) public class CardItemAddConsumer implements RocketMQListener { public void onMessage(CartItemEvent addItemEvent) { log.info("Adding item: {}", addItemEvent); // additional logic } } @Service @RocketMQMessageListener( topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic" ) public class CardItemRemoveConsumer implements RocketMQListener { public void onMessage(CartItemEvent removeItemEvent) { log.info("Removing item: {}", removeItemEvent); // additional logic } } }

Dobbiamo creare un componente separato per ogni argomento del messaggio che stiamo ascoltando. In ciascuno di questi listener, definiamo il nome dell'argomento e il nome del gruppo di consumatori tramite l' annotazione @ RocketMQMessageListener .

5. Trasmissione sincrona e asincrona

Negli esempi precedenti, abbiamo utilizzato il metodo convertAndSend per inviare i nostri messaggi. Tuttavia, abbiamo altre opzioni.

Potremmo, ad esempio, chiamare syncSend che è diverso da convertAndSend perché restituisce l' oggetto SendResult .

Può essere utilizzato, ad esempio, per verificare se il nostro messaggio è stato inviato con successo o per ottenere il suo id:

public void run(String... args) throws Exception { SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", new CartItemEvent("bike", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", new CartItemEvent("computer", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); }

Come convertAndSend, questo metodo viene restituito solo al termine della procedura di invio.

Dovremmo utilizzare la trasmissione sincrona nei casi che richiedono un'elevata affidabilità, come messaggi di notifica importanti o notifiche SMS.

D'altra parte, potremmo invece voler inviare il messaggio in modo asincrono e ricevere una notifica quando l'invio è completo.

Possiamo farlo con asyncSend , che accetta un SendCallback come parametro e restituisce immediatamente:

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.error("Successfully sent cart item"); } @Override public void onException(Throwable throwable) { log.error("Exception during cart item sending", throwable); } });

Utilizziamo la trasmissione asincrona nei casi che richiedono un throughput elevato.

Infine, per gli scenari in cui abbiamo requisiti di velocità effettiva molto elevati, possiamo usare sendOneWay invece di asyncSend . sendOneWay è diverso da asyncSend in quanto non garantisce che il messaggio venga inviato.

La trasmissione unidirezionale può essere utilizzata anche per casi di affidabilità ordinaria, come la raccolta di log.

6. Invio di messaggi in transazione

RocketMQ ci offre la possibilità di inviare messaggi all'interno di una transazione. Possiamo farlo utilizzando il metodo sendInTransaction () :

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build(); rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

Inoltre, dobbiamo implementare un'interfaccia RocketMQLocalTransactionListener :

@RocketMQTransactionListener(txProducerGroup="test-transaction") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN return RocketMQLocalTransactionState.COMMIT; } }

In sendMessageInTransaction () , il primo parametro è il nome della transazione. Deve essere lo stesso del @RocketMQTransactionListener campo membro s' txProducerGroup.

7. Configurazione del produttore di messaggi

Possiamo anche configurare aspetti del produttore del messaggio stesso:

  • rocketmq.producer.send-message-timeout : il timeout di invio del messaggio in millisecondi - il valore predefinito è 3000
  • rocketmq.producer.compress-message-body-threshold : soglia oltre la quale, RocketMQ comprimerà i messaggi - il valore predefinito è 1024.
  • rocketmq.producer.max-message-size : la dimensione massima del messaggio in byte: il valore predefinito è 4096.
  • rocketmq.producer.retry-times-when-send-async-failed : il numero massimo di tentativi da eseguire internamente in modalità asincrona prima di inviare un errore - il valore predefinito è 2.
  • rocketmq.producer.retry-next-server : indica se ritentare un altro broker in caso di errore di invio interno - il valore predefinito è false .
  • rocketmq.producer.retry-times-when-send-failed : il numero massimo di tentativi da eseguire internamente in modalità asincrona prima di un errore di invio - il valore predefinito è 2.

8. Conclusione

In questo articolo abbiamo imparato come inviare e consumare messaggi utilizzando Apache RocketMQ e Spring Boot. Come sempre tutto il codice sorgente è disponibile su GitHub.