Introduzione a Spring Cloud Stream

1. Panoramica

Spring Cloud Stream è un framework basato su Spring Boot e Spring Integration che aiuta nella creazione di microservizi basati su eventi o messaggi .

In questo articolo, introdurremo concetti e costrutti di Spring Cloud Stream con alcuni semplici esempi.

2. Dipendenze di Maven

Per iniziare, dovremo aggiungere lo Spring Cloud Starter Stream con la dipendenza Maven del broker RabbitMQ come middleware di messaggistica al nostro pom.xml :

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.RELEASE 

E aggiungeremo la dipendenza del modulo da Maven Central per abilitare anche il supporto JUnit:

 org.springframework.cloud spring-cloud-stream-test-support 1.3.0.RELEASE test 

3. Concetti principali

L'architettura dei microservizi segue il principio "endpoint intelligenti e dumb pipe". La comunicazione tra gli endpoint è guidata da parti di middleware di messaggistica come RabbitMQ o Apache Kafka. I servizi comunicano pubblicando eventi di dominio tramite questi endpoint o canali .

Esaminiamo i concetti che compongono il framework Spring Cloud Stream, insieme ai paradigmi essenziali di cui dobbiamo essere consapevoli per creare servizi basati sui messaggi.

3.1. Costruisce

Diamo un'occhiata a un semplice servizio in Spring Cloud Stream che ascolta l' associazione di input e invia una risposta all'associazione di output :

@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); } }

L'annotazione @EnableBinding configura l'applicazione per associare i canali INPUT e OUTPUT definiti all'interno dell'interfaccia Processor . Entrambi i canali sono collegamenti che possono essere configurati per utilizzare un concreto middleware di messaggistica o un raccoglitore.

Diamo uno sguardo alla definizione di tutti questi concetti:

  • Binding : una raccolta di interfacce che identificano i canali di input e output in modo dichiarativo
  • Binder : implementazione middleware di messaggistica come Kafka o RabbitMQ
  • Canale : rappresenta il canale di comunicazione tra il middleware di messaggistica e l'applicazione
  • StreamListeners : metodi di gestione dei messaggi nei bean che verranno richiamati automaticamente su un messaggio dal canale dopo che MessageConverter ha eseguito la serializzazione / deserializzazione tra eventi specifici del middleware e tipi di oggetti di dominio / POJO
  • Mes salvia schemi - usato per la serializzazione e deserializzazione dei messaggi, questi schemi possono essere letti staticamente da una posizione o caricati dinamicamente, sostenendo l'evoluzione di tipi di oggetto dominio

3.2. Modelli di comunicazione

I messaggi designati per le destinazioni vengono recapitati dal modello di messaggistica Pubblica-Sottoscrivi . Gli editori classificano i messaggi in argomenti, ciascuno identificato da un nome. Gli abbonati esprimono interesse per uno o più argomenti. Il middleware filtra i messaggi, consegnando quelli degli argomenti interessanti agli abbonati.

Ora, gli abbonati potrebbero essere raggruppati. Un gruppo di consumatori è un insieme di sottoscrittori o consumatori, identificati da un ID gruppo , all'interno del quale i messaggi di un argomento o di una partizione dell'argomento vengono recapitati in modo bilanciato.

4. Modello di programmazione

Questa sezione descrive le basi per la creazione di applicazioni Spring Cloud Stream.

4.1. Test funzionali

Il supporto del test è un'implementazione del raccoglitore che consente di interagire con i canali e di ispezionare i messaggi.

Inviamo un messaggio al servizio arricchLogMessage di cui sopra e controlliamo se la risposta contiene il testo "[1]:" all'inizio del messaggio:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = MyLoggerServiceApplication.class) @DirtiesContext public class MyLoggerApplicationTests { @Autowired private Processor pipe; @Autowired private MessageCollector messageCollector; @Test public void whenSendMessage_thenResponseShouldUpdateText() { pipe.input() .send(MessageBuilder.withPayload(new LogMessage("This is my message")) .build()); Object payload = messageCollector.forChannel(pipe.output()) .poll() .getPayload(); assertEquals("[1]: This is my message", payload.toString()); } }

4.2. Canali personalizzati

Nell'esempio sopra, abbiamo utilizzato l' interfaccia del processore fornita da Spring Cloud, che ha solo un canale di input e un canale di output.

Se abbiamo bisogno di qualcosa di diverso, come un ingresso e due canali di uscita, possiamo creare un processore personalizzato:

public interface MyProcessor { String INPUT = "myInput"; @Input SubscribableChannel myInput(); @Output("myOutput") MessageChannel anOutput(); @Output MessageChannel anotherOutput(); }

Spring ci fornirà la corretta implementazione di questa interfaccia. I nomi dei canali possono essere impostati utilizzando annotazioni come in @Output ("myOutput") .

Altrimenti, Spring utilizzerà i nomi dei metodi come nomi dei canali. Pertanto, abbiamo tre canali chiamati myInput , myOutput e anotherOutput .

Ora, immaginiamo di voler instradare i messaggi a un output se il valore è inferiore a 10 e in un altro output il valore è maggiore o uguale a 10:

@Autowired private MyProcessor processor; @StreamListener(MyProcessor.INPUT) public void routeValues(Integer val) { if (val < 10) { processor.anOutput().send(message(val)); } else { processor.anotherOutput().send(message(val)); } } private static final  Message message(T val) { return MessageBuilder.withPayload(val).build(); }

4.3. Dispacciamento condizionale

Utilizzando l' annotazione @StreamListener , possiamo anche filtrare i messaggi che ci aspettiamo nel consumatore utilizzando qualsiasi condizione che definiamo con le espressioni SpEL.

Ad esempio, potremmo usare l'invio condizionale come un altro approccio per instradare i messaggi in diversi output:

@Autowired private MyProcessor processor; @StreamListener( target = MyProcessor.INPUT, condition = "payload = 10") public void routeValuesToAnotherOutput(Integer val) { processor.anotherOutput().send(message(val)); }

L'unico limite di questo approccio è che questi metodi non devono restituire un valore.

5. Configurazione

Let's set up the application that will process the message from the RabbitMQ broker.

5.1. Binder Configuration

We can configure our application to use the default binder implementation via META-INF/spring.binders:

rabbit:\ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Or we can add the binder library for RabbitMQ to the classpath by including this dependency:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 1.3.0.RELEASE 

If no binder implementation is provided, Spring will use direct message communication between the channels.

5.2. RabbitMQ Configuration

To configure the example in section 3.1 to use the RabbitMQ binder, we need to update the application.yml located at src/main/resources:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit output: destination: queue.pretty.log.messages binder: local_rabbit binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host:  port: 5672 username:  password:  virtual-host: /

The input binding will use the exchange called queue.log.messages, and the output binding will use the exchange queue.pretty.log.messages. Both bindings will use the binder called local_rabbit.

Note that we don't need to create the RabbitMQ exchanges or queues in advance. When running the application, both exchanges are automatically created.

To test the application, we can use the RabbitMQ management site to publish a message. In the Publish Message panel of the exchange queue.log.messages, we need to enter the request in JSON format.

5.3. Customizing Message Conversion

Spring Cloud Stream allows us to apply message conversion for specific content types. In the above example, instead of using JSON format, we want to provide plain text.

To do this, we'll to apply a custom transformation to LogMessage using a MessageConverter:

@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { //... @Bean public MessageConverter providesTextPlainMessageConverter() { return new TextPlainMessageConverter(); } //... }
public class TextPlainMessageConverter extends AbstractMessageConverter { public TextPlainMessageConverter() { super(new MimeType("text", "plain")); } @Override protected boolean supports(Class clazz) { return (LogMessage.class == clazz); } @Override protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { Object payload = message.getPayload(); String text = payload instanceof String ? (String) payload : new String((byte[]) payload); return new LogMessage(text); } }

After applying these changes, going back to the Publish Message panel, if we set the header “contentTypes” to “text/plain” and the payload to “Hello World“, it should work as before.

5.4. Consumer Groups

When running multiple instances of our application, every time there is a new message in an input channel, all subscribers will be notified.

Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.

To enable this behavior, each consumer binding can use the spring.cloud.stream.bindings..group property to specify a group name:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit group: logMessageConsumers ...

6. Message-Driven Microservices

In this section, we introduce all the required features for running our Spring Cloud Stream applications in a microservices context.

6.1. Scaling Up

When multiple applications are running, it's important to ensure the data is split properly across consumers. To do so, Spring Cloud Stream provides two properties:

  • spring.cloud.stream.instanceCount — number of running applications
  • spring.cloud.stream.instanceIndex — index of the current application

For example, if we've deployed two instances of the above MyLoggerServiceApplication application, the property spring.cloud.stream.instanceCount should be 2 for both applications, and the property spring.cloud.stream.instanceIndex should be 0 and 1 respectively.

These properties are automatically set if we deploy the Spring Cloud Stream applications using Spring Data Flow as described in this article.

6.2. Partitioning

The domain events could be Partitioned messages. This helps when we are scaling up the storage and improving application performance.

The domain event usually has a partition key so that it ends up in the same partition with related messages.

Let's say that we want the log messages to be partitioned by the first letter in the message, which would be the partition key, and grouped into two partitions.

There would be one partition for the log messages that start with A-M and another partition for N-Z. This can be configured using two properties:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression — the expression to partition the payloads
  • spring.cloud.stream.bindings.output.producer.partitionCount — the number of groups

Sometimes the expression to partition is too complex to write it in only one line. For these cases, we can write our custom partition strategy using the property spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Health Indicator

In a microservices context, we also need to detect when a service is down or starts failing. Spring Cloud Stream provides the property management.health.binders.enabled to enable the health indicators for binders.

When running the application, we can query the health status at //:/health.

7. Conclusion

In questo tutorial, abbiamo presentato i concetti principali di Spring Cloud Stream e mostrato come usarlo attraverso alcuni semplici esempi su RabbitMQ. Ulteriori informazioni su Spring Cloud Stream sono disponibili qui.

Il codice sorgente di questo articolo può essere trovato su GitHub.