Invio di messaggi RabbitMQ con Spring AMQP

1. Introduzione

In questo tutorial, esploreremo il concetto di fanout e scambi di argomenti con Spring AMQP e RabbitMQ.

Ad alto livello, gli scambi fanout sarà trasmesso lo stesso messaggio a tutte le code rilegati , mentre gli scambi argomento utilizzano una chiave di routing per passare messaggi a una determinata coda associato o code .

Per questo tutorial è consigliata la lettura preliminare di Messaging With Spring AMQP.

2. Configurazione di uno scambio Fanout

Impostiamo uno scambio fanout con due code ad esso associate. Quando inviamo un messaggio a questo scambio, entrambe le code riceveranno il messaggio. Il nostro scambio fanout ignora qualsiasi chiave di instradamento inclusa nel messaggio.

Spring AMQP ci consente di aggregare tutte le dichiarazioni di code, scambi e associazioni in un oggetto Dichiarabili :

@Bean public Declarables fanoutBindings() { Queue fanoutQueue1 = new Queue("fanout.queue1", false); Queue fanoutQueue2 = new Queue("fanout.queue2", false); FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange"); return new Declarables( fanoutQueue1, fanoutQueue2, fanoutExchange, bind(fanoutQueue1).to(fanoutExchange), BindingBuilder.bind(fanoutQueue2).to(fanoutExchange)); }

3. Impostazione di uno scambio di argomenti

Ora, imposteremo anche uno scambio di argomenti con due code, ciascuna con un diverso modello di associazione:

@Bean public Declarables topicBindings() { Queue topicQueue1 = new Queue(topicQueue1Name, false); Queue topicQueue2 = new Queue(topicQueue2Name, false); TopicExchange topicExchange = new TopicExchange(topicExchangeName); return new Declarables( topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind(topicQueue1) .to(topicExchange).with("*.important.*"), BindingBuilder .bind(topicQueue2) .to(topicExchange).with("#.error")); }

Uno scambio di argomenti ci consente di associare le code ad esso con diversi modelli di chiavi. Questo è molto flessibile e ci consente di associare più code con lo stesso modello o anche più modelli alla stessa coda.

Quando la chiave di instradamento del messaggio corrisponde al modello, verrà inserita nella coda. Se una coda ha più bind che corrispondono alla chiave di instradamento del messaggio, solo una copia del messaggio viene inserita nella coda.

I nostri schemi di rilegatura possono utilizzare un asterisco ("*") per trovare una parola in una posizione specifica o un cancelletto ("#") per trovare zero o più parole.

Quindi, il nostro topicQueue1 riceverà messaggi che hanno chiavi di routing con uno schema di tre parole con la parola centrale "important", ad esempio: "user.important.error" o "blog.important.notification".

Inoltre , il nostro topicQueue2 riceverà messaggi con chiavi di routing che terminano con la parola error; esempi corrispondenti sono "error" , "user.important.error" o "blog.post.save.error".

4. Creazione di un produttore

Useremo il metodo convertAndSend del RabbitTemplate per inviare i nostri messaggi di esempio:

 String message = " payload is broadcast"; return args -> { rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "topic important warn" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "topic important error" + message); };

Il RabbitTemplate fornisce molte sovraccarico convertAndSend () metodi per i tipi di cambio diversi.

Quando inviamo un messaggio a uno scambio fanout, la chiave di instradamento viene ignorata e il messaggio viene passato a tutte le code associate.

Quando inviamo un messaggio allo scambio di argomenti, dobbiamo passare una chiave di instradamento. In base a questa chiave di instradamento, il messaggio verrà consegnato a code specifiche.

5. Configurazione dei consumatori

Infine, impostiamo quattro consumatori - uno per ogni coda - per prelevare i messaggi prodotti:

 @RabbitListener(queues = {FANOUT_QUEUE_1_NAME}) public void receiveMessageFromFanout1(String message) { System.out.println("Received fanout 1 message: " + message); } @RabbitListener(queues = {FANOUT_QUEUE_2_NAME}) public void receiveMessageFromFanout2(String message) { System.out.println("Received fanout 2 message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_1_NAME}) public void receiveMessageFromTopic1(String message) { System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_2_NAME}) public void receiveMessageFromTopic2(String message) { System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message); }

Configuriamo i consumatori utilizzando l' annotazione @RabbitListener . L'unico argomento passato qui è il nome delle code. I consumatori non sono a conoscenza di scambi o chiavi di instradamento.

6. Esecuzione dell'esempio

Il nostro progetto di esempio è un'applicazione Spring Boot, quindi inizializzerà l'applicazione insieme a una connessione a RabbitMQ e configurerà tutte le code, gli scambi e le associazioni.

Per impostazione predefinita, la nostra applicazione prevede un'istanza RabbitMQ in esecuzione sull'host locale sulla porta 5672. Possiamo modificare questa e altre impostazioni predefinite in application.yaml .

Il nostro progetto espone l'endpoint HTTP sull'URI - / broadcast - che accetta POST con un messaggio nel corpo della richiesta.

Quando inviamo una richiesta a questo URI con corpo "Test" dovremmo vedere qualcosa di simile a questo nell'output:

Received fanout 1 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important warn payload is broadcast Received topic 2 (#.error) message: topic important error payload is broadcast Received fanout 2 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important error payload is broadcast

L'ordine in cui vedremo questi messaggi, ovviamente, non è garantito.

7. Conclusione

In questo breve tutorial, abbiamo trattato fanout e scambi di argomenti con Spring AMQP e RabbitMQ.

Il codice sorgente completo e tutti i frammenti di codice per questo tutorial sono disponibili nel repository GitHub.