Introduzione a Project Reactor Bus

1. Panoramica

In questo rapido articolo, introdurremo il bus del reattore configurando uno scenario di vita reale per un'applicazione reattiva e guidata dagli eventi.

2. Le basi del progetto Reactor

2.1. Perché Reactor?

Le applicazioni moderne devono gestire un numero enorme di richieste simultanee ed elaborare una quantità significativa di dati. Il codice di blocco standard non è più sufficiente per soddisfare questi requisiti.

Il modello di progettazione reattiva è un approccio architettonico basato su eventi per la gestione asincrona di un grande volume di richieste di servizio simultanee provenienti da uno o più gestori di servizi.

Il Project Reactor si basa su questo modello e ha un obiettivo chiaro e ambizioso di creare applicazioni reattive e non bloccanti sulla JVM .

2.2. Scenari di esempio

Prima di iniziare, ecco alcuni scenari interessanti in cui avrebbe senso sfruttare lo stile architettonico reattivo, solo per avere un'idea di dove potremmo applicarlo:

  • Servizi di notifica per una grande piattaforma di shopping online come Amazon
  • Enormi servizi di elaborazione delle transazioni per il settore bancario
  • Attività di negoziazione di azioni in cui i prezzi delle azioni cambiano simultaneamente

3. Dipendenze di Maven

Cominciamo a usare Project Reactor Bus aggiungendo la seguente dipendenza nel nostro pom.xml:

 io.projectreactor reactor-bus 2.0.8.RELEASE 

Possiamo controllare l'ultima versione del bus-reattore a Maven Central.

4. Creazione di un'applicazione demo

Per comprendere meglio i vantaggi dell'approccio basato sul reattore, diamo un'occhiata a un esempio pratico.

Costruiremo una semplice applicazione responsabile dell'invio di notifiche agli utenti di una piattaforma di shopping online. Ad esempio, se un utente effettua un nuovo ordine, l'app invia una conferma dell'ordine tramite e-mail o SMS.

Una tipica implementazione sincrona sarebbe naturalmente limitata dal throughput del servizio di posta elettronica o SMS. Pertanto, i picchi di traffico, come le vacanze, sarebbero generalmente problematici.

Con un approccio reattivo, possiamo progettare il nostro sistema in modo che sia più flessibile e per adattarsi meglio a guasti o timeout che possono verificarsi nei sistemi esterni, come i server gateway.

Diamo uno sguardo all'applicazione, partendo dagli aspetti più tradizionali e passando ai costrutti più reattivi.

4.1. POJO semplice

Innanzitutto, creiamo una classe POJO per rappresentare i dati di notifica:

public class NotificationData { private long id; private String name; private String email; private String mobile; // getter and setter methods }

4.2. Il livello di servizio

Definiamo ora un semplice livello di servizio:

public interface NotificationService { void initiateNotification(NotificationData notificationData) throws InterruptedException; }

E l'implementazione, simulando un'operazione di lunga durata:

@Service public class NotificationServiceimpl implements NotificationService { @Override public void initiateNotification(NotificationData notificationData) throws InterruptedException { System.out.println("Notification service started for " + "Notification ID: " + notificationData.getId()); Thread.sleep(5000); System.out.println("Notification service ended for " + "Notification ID: " + notificationData.getId()); } }

Si noti che per illustrare uno scenario di vita reale di invio di messaggi tramite un SMS o un gateway di posta elettronica, stiamo intenzionalmente introducendo un ritardo di cinque secondi nel metodo initiateNotification con Thread.sleep (5000).

Di conseguenza, quando un thread raggiunge il servizio, verrà bloccato per cinque secondi.

4.3. Il consumatore

Passiamo ora agli aspetti più reattivi della nostra applicazione e implementiamo un consumatore, che poi mapperemo sul bus degli eventi del reattore:

@Service public class NotificationConsumer implements Consumer
    
      { @Autowired private NotificationService notificationService; @Override public void accept(Event notificationDataEvent) { NotificationData notificationData = notificationDataEvent.getData(); try { notificationService.initiateNotification(notificationData); } catch (InterruptedException e) { // ignore } } }
    

Come possiamo vedere, il consumatore che abbiamo creato implementa l' interfaccia del consumatore . La logica principale risiede nel metodo di accettazione .

Questo è un approccio simile che possiamo incontrare in una tipica implementazione dell'ascoltatore Spring.

4.4. Il controller

Infine, ora che siamo in grado di consumare gli eventi, generiamoli anche.

Lo faremo in un semplice controller:

@Controller public class NotificationController { @Autowired private EventBus eventBus; @GetMapping("/startNotification/{param}") public void startNotification(@PathVariable Integer param) { for (int i = 0; i < param; i++) { NotificationData data = new NotificationData(); data.setId(i); eventBus.notify("notificationConsumer", Event.wrap(data)); System.out.println( "Notification " + i + ": notification task submitted successfully"); } } }

Questo è abbastanza autoesplicativo: qui stiamo emettendo eventi tramite EventBus .

Ad esempio, se un client raggiunge l'URL con un valore di parametro di dieci, verranno inviati dieci eventi tramite il bus degli eventi.

4.5. Il Java Config

Mettiamo ora tutto insieme e creiamo una semplice applicazione Spring Boot.

Innanzitutto, dobbiamo configurare EventBus e i bean di ambiente :

@Configuration public class Config { @Bean public Environment env() { return Environment.initializeIfEmpty().assignErrorJournal(); } @Bean public EventBus createEventBus(Environment env) { return EventBus.create(env, Environment.THREAD_POOL); } }

Nel nostro caso, stiamo istanziando EventBus con un pool di thread predefinito disponibile nell'ambiente .

In alternativa, possiamo utilizzare un'istanza di Dispatcher personalizzata :

EventBus evBus = EventBus.create( env, Environment.newDispatcher( REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT, DispatcherType.THREAD_POOL_EXECUTOR));

Ora siamo pronti per creare un codice dell'applicazione principale:

import static reactor.bus.selector.Selectors.$; @SpringBootApplication public class NotificationApplication implements CommandLineRunner { @Autowired private EventBus eventBus; @Autowired private NotificationConsumer notificationConsumer; @Override public void run(String... args) throws Exception { eventBus.on($("notificationConsumer"), notificationConsumer); } public static void main(String[] args) { SpringApplication.run(NotificationApplication.class, args); } }

Nel nostro metodo di esecuzione stiamo registrando il notificationConsumer da attivare quando la notifica corrisponde a un determinato selettore .

Notate come stiamo usando l'importazione statica dell'attributo $ per creare un oggetto Selector .

5. Testare l'applicazione

Creiamo ora un test per vedere la nostra NotificationApplication in azione:

@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class NotificationApplicationIntegrationTest { @LocalServerPort private int port; @Test public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() { RestTemplate restTemplate = new RestTemplate(); restTemplate.getForObject("//localhost:" + port + "/startNotification/10", String.class); } }

Come possiamo vedere, non appena la richiesta viene eseguita, tutte e dieci le attività vengono inviate istantaneamente senza creare alcun blocco . E una volta inviati, gli eventi di notifica vengono elaborati in parallelo.

Notification 0: notification task submitted successfully Notification 1: notification task submitted successfully Notification 2: notification task submitted successfully Notification 3: notification task submitted successfully Notification 4: notification task submitted successfully Notification 5: notification task submitted successfully Notification 6: notification task submitted successfully Notification 7: notification task submitted successfully Notification 8: notification task submitted successfully Notification 9: notification task submitted successfully Notification service started for Notification ID: 1 Notification service started for Notification ID: 2 Notification service started for Notification ID: 3 Notification service started for Notification ID: 0 Notification service ended for Notification ID: 1 Notification service ended for Notification ID: 0 Notification service started for Notification ID: 4 Notification service ended for Notification ID: 3 Notification service ended for Notification ID: 2 Notification service started for Notification ID: 6 Notification service started for Notification ID: 5 Notification service started for Notification ID: 7 Notification service ended for Notification ID: 4 Notification service started for Notification ID: 8 Notification service ended for Notification ID: 6 Notification service ended for Notification ID: 5 Notification service started for Notification ID: 9 Notification service ended for Notification ID: 7 Notification service ended for Notification ID: 8 Notification service ended for Notification ID: 9

È importante tenere presente che nel nostro scenario non è necessario elaborare questi eventi in un ordine particolare.

6. Conclusione

In questo breve tutorial, abbiamo creato una semplice applicazione basata sugli eventi . Abbiamo anche visto come iniziare a scrivere un codice più reattivo e non bloccante.

Tuttavia, questo scenario graffia solo la superficie del soggetto e rappresenta solo una buona base per iniziare a sperimentare il paradigma reattivo .

Come sempre, il codice sorgente è disponibile su GitHub.