Introduzione a MBassador

1. Panoramica

In poche parole, MBassador è un bus di eventi ad alte prestazioni che utilizza la semantica di pubblicazione-sottoscrizione.

I messaggi vengono trasmessi a uno o più peer senza la preventiva conoscenza del numero di iscritti o di come utilizzano il messaggio.

2. Dipendenza da Maven

Prima di poter utilizzare la libreria, dobbiamo aggiungere la dipendenza mbassador:

 net.engio mbassador 1.3.1 

3. Gestione degli eventi di base

3.1. Semplice esempio

Inizieremo con un semplice esempio di pubblicazione di un messaggio:

private MBassador dispatcher = new MBassador(); private String messageString; @Before public void prepareTests() { dispatcher.subscribe(this); } @Test public void whenStringDispatched_thenHandleString() { dispatcher.post("TestString").now(); assertNotNull(messageString); assertEquals("TestString", messageString); } @Handler public void handleString(String message) { messageString = message; } 

All'inizio di questa classe di test, vediamo la creazione di un MBassador con il suo costruttore predefinito. Successivamente, nel metodo @Before , chiamiamo subscribe () e passiamo un riferimento alla classe stessa.

In subscribe (), il dispatcher ispeziona l'abbonato per le annotazioni @Handler .

E, nel primo test, chiamiamo dispatcher.post (…) .now () per inviare il messaggio, il che si traduce nella chiamata di handleString () .

Questo test iniziale dimostra diversi concetti importanti. Qualsiasi oggetto può essere un abbonato, purché abbia uno o più metodi annotati con @Handler . Un abbonato può avere un numero qualsiasi di gestori.

Stiamo usando oggetti di test che sottoscrivono se stessi per motivi di semplicità, ma nella maggior parte degli scenari di produzione, i dispatcher di messaggi lo faranno in classi diverse dai consumatori.

I metodi del gestore hanno un solo parametro di input: il messaggio e non possono generare eccezioni verificate.

Simile al metodo subscribe () , il metodo post accetta qualsiasi Object . Questo oggetto viene consegnato agli abbonati.

Quando un messaggio viene pubblicato, viene consegnato a tutti gli ascoltatori che si sono iscritti al tipo di messaggio.

Aggiungiamo un altro gestore di messaggi e inviamo un diverso tipo di messaggio:

private Integer messageInteger; @Test public void whenIntegerDispatched_thenHandleInteger() { dispatcher.post(42).now(); assertNull(messageString); assertNotNull(messageInteger); assertTrue(42 == messageInteger); } @Handler public void handleInteger(Integer message) { messageInteger = message; } 

Come previsto, quando spediamoviene chiamato un numero intero , handleInteger () e handleString () non lo è. Un unico dispatcher può essere utilizzato per inviare più di un tipo di messaggio.

3.2. Messaggi morti

Allora dove va a finire un messaggio quando non c'è un gestore per esso? Aggiungiamo un nuovo gestore di eventi e quindi inviamo un terzo tipo di messaggio:

private Object deadEvent; @Test public void whenLongDispatched_thenDeadEvent() { dispatcher.post(42L).now(); assertNull(messageString); assertNull(messageInteger); assertNotNull(deadEvent); assertTrue(deadEvent instanceof Long); assertTrue(42L == (Long) deadEvent); } @Handler public void handleDeadEvent(DeadMessage message) { deadEvent = message.getMessage(); } 

In questo test, inviamo un Long invece di un Integer. handleInteger ()handleString () vengono chiamati, ma handleDeadEvent () lo è.

Quando non sono presenti gestori per un messaggio, viene inserito in un oggetto DeadMessage . Poiché abbiamo aggiunto un gestore per Deadmessage , lo acquisiamo .

DeadMessage può essere tranquillamente ignorato; se un'applicazione non ha bisogno di tenere traccia dei messaggi morti, può essere consentito loro di non andare da nessuna parte.

4. Utilizzo di una gerarchia di eventi

L'invio di eventi String e Integer è limitante. Creiamo alcune classi di messaggi:

public class Message {} public class AckMessage extends Message {} public class RejectMessage extends Message { int code; // setters and getters }

Abbiamo una semplice classe base e due classi che la estendono.

4.1. Invio di un messaggio di classe base

Inizieremo con gli eventi dei messaggi :

private MBassador dispatcher = new MBassador(); private Message message; private AckMessage ackMessage; private RejectMessage rejectMessage; @Before public void prepareTests() { dispatcher.subscribe(this); } @Test public void whenMessageDispatched_thenMessageHandled() { dispatcher.post(new Message()).now(); assertNotNull(message); assertNull(ackMessage); assertNull(rejectMessage); } @Handler public void handleMessage(Message message) { this.message = message; } @Handler public void handleRejectMessage(RejectMessage message) { rejectMessage = message; } @Handler public void handleAckMessage(AckMessage message) { ackMessage = message; }

Scopri MBassador, un autobus per eventi pub-sub ad alte prestazioni. Questo ci limita all'uso di Messaggi ma aggiunge un ulteriore livello di sicurezza dei tipi.

Quando inviamo un messaggio , handleMessage () lo riceve. Gli altri due gestori no.

4.2. Invio di un messaggio di sottoclasse

Inviamo un RejectMessage :

@Test public void whenRejectDispatched_thenMessageAndRejectHandled() { dispatcher.post(new RejectMessage()).now(); assertNotNull(message); assertNotNull(rejectMessage); assertNull(ackMessage); }

Quando inviamo un RejectMessage, sia handleRejectMessage () che handleMessage () lo ricevono.

Since RejectMessage extends Message, the Message handler received it, in addition to the RejectMessage handler.

Let's verify this behavior with an AckMessage:

@Test public void whenAckDispatched_thenMessageAndAckHandled() { dispatcher.post(new AckMessage()).now(); assertNotNull(message); assertNotNull(ackMessage); assertNull(rejectMessage); }

Just as we expected, when we send an AckMessage, both handleAckMessage() and handleMessage() receive it.

5. Filtering Messages

Organizing messages by type is already a powerful feature, but we can filter them even more.

5.1. Filter on Class and Subclass

When we posted a RejectMessage or AckMessage, we received the event in both the event handler for the particular type and in the base class.

We can solve this type hierarchy issue by making Message abstract and creating a class such as GenericMessage. But what if we don't have this luxury?

We can use message filters:

private Message baseMessage; private Message subMessage; @Test public void whenMessageDispatched_thenMessageFiltered() { dispatcher.post(new Message()).now(); assertNotNull(baseMessage); assertNull(subMessage); } @Test public void whenRejectDispatched_thenRejectFiltered() { dispatcher.post(new RejectMessage()).now(); assertNotNull(subMessage); assertNull(baseMessage); } @Handler(filters = { @Filter(Filters.RejectSubtypes.class) }) public void handleBaseMessage(Message message) { this.baseMessage = message; } @Handler(filters = { @Filter(Filters.SubtypesOnly.class) }) public void handleSubMessage(Message message) { this.subMessage = message; }

The filters parameter for the @Handler annotation accepts a Class that implements IMessageFilter. The library offers two examples:

The Filters.RejectSubtypes does as its name suggests: it will filter out any subtypes. In this case, we see that RejectMessage is not handled by handleBaseMessage().

The Filters.SubtypesOnly also does as its name suggests: it will filter out any base types. In this case, we see that Message is not handled by handleSubMessage().

5.2. IMessageFilter

The Filters.RejectSubtypes and the Filters.SubtypesOnly both implement IMessageFilter.

RejectSubTypes compares the class of the message to its defined message types and will only allow through messages that equal one of its types, as opposed to any subclasses.

5.3. Filter With Conditions

Fortunately, there is an easier way of filtering messages. MBassador supports a subset of Java EL expressions as conditions for filtering messages.

Let's filter a String message based on its length:

private String testString; @Test public void whenLongStringDispatched_thenStringFiltered() { dispatcher.post("foobar!").now(); assertNull(testString); } @Handler(condition = "msg.length() < 7") public void handleStringMessage(String message) { this.testString = message; }

The “foobar!” message is seven characters long and is filtered. Let's send a shorter String:

 @Test public void whenShortStringDispatched_thenStringHandled() { dispatcher.post("foobar").now(); assertNotNull(testString); }

Now, the “foobar” is only six characters long and is passed through.

Our RejectMessage contains a field with an accessor. Let's write a filter for that:

private RejectMessage rejectMessage; @Test public void whenWrongRejectDispatched_thenRejectFiltered() { RejectMessage testReject = new RejectMessage(); testReject.setCode(-1); dispatcher.post(testReject).now(); assertNull(rejectMessage); assertNotNull(subMessage); assertEquals(-1, ((RejectMessage) subMessage).getCode()); } @Handler(condition = "msg.getCode() != -1") public void handleRejectMessage(RejectMessage rejectMessage) { this.rejectMessage = rejectMessage; }

Here again, we can query a method on an object and either filter the message or not.

5.4. Capture Filtered Messages

Similar to DeadEvents, we may want to capture and process filtered messages. There is a dedicated mechanism for capturing filtered events too. Filtered events are treated differently from “dead” events.

Let's write a test that illustrates this:

private String testString; private FilteredMessage filteredMessage; private DeadMessage deadMessage; @Test public void whenLongStringDispatched_thenStringFiltered() { dispatcher.post("foobar!").now(); assertNull(testString); assertNotNull(filteredMessage); assertTrue(filteredMessage.getMessage() instanceof String); assertNull(deadMessage); } @Handler(condition = "msg.length() < 7") public void handleStringMessage(String message) { this.testString = message; } @Handler public void handleFilterMessage(FilteredMessage message) { this.filteredMessage = message; } @Handler public void handleDeadMessage(DeadMessage deadMessage) { this.deadMessage = deadMessage; } 

With the addition of a FilteredMessage handler, we can track Strings that are filtered because of their length. The filterMessage contains our too-long String while deadMessage remains null.

6. Asynchronous Message Dispatch and Handling

So far all of our examples have used synchronous message dispatch; when we called post.now() the messages were delivered to each handler in the same thread we called post() from.

6.1. Asynchronous Dispatch

The MBassador.post() returns a SyncAsyncPostCommand. This class offers several methods, including:

  • now() – dispatch messages synchronously; the call will block until all messages have been delivered
  • asynchronously() – executes the message publication asynchronously

Let's use asynchronous dispatch in a sample class. We'll use Awaitility in these tests to simplify the code:

private MBassador dispatcher = new MBassador(); private String testString; private AtomicBoolean ready = new AtomicBoolean(false); @Test public void whenAsyncDispatched_thenMessageReceived() { dispatcher.post("foobar").asynchronously(); await().untilAtomic(ready, equalTo(true)); assertNotNull(testString); } @Handler public void handleStringMessage(String message) { this.testString = message; ready.set(true); }

We call asynchronously() in this test, and use an AtomicBoolean as a flag with await() to wait for the delivery thread to deliver the message.

If we comment out the call to await(), we risk the test failing, because we check testString before the delivery thread completes.

6.2. Asynchronous Handler Invocation

Asynchronous dispatch allows the message provider to return to message processing before the messages are delivered to each handler, but it still calls each handler in order, and each handler has to wait for the previous to finish.

This can lead to problems if one handler performs an expensive operation.

MBassador provides a mechanism for asynchronous handler invocation. Handlers configured for this receive messages in their thread:

private Integer testInteger; private String invocationThreadName; private AtomicBoolean ready = new AtomicBoolean(false); @Test public void whenHandlerAsync_thenHandled() { dispatcher.post(42).now(); await().untilAtomic(ready, equalTo(true)); assertNotNull(testInteger); assertFalse(Thread.currentThread().getName().equals(invocationThreadName)); } @Handler(delivery = Invoke.Asynchronously) public void handleIntegerMessage(Integer message) { this.invocationThreadName = Thread.currentThread().getName(); this.testInteger = message; ready.set(true); }

Handlers can request asynchronous invocation with the delivery = Invoke.Asynchronously property on the Handler annotation. We verify this in our test by comparing the Thread names in the dispatching method and the handler.

7. Customizing MBassador

So far we've been using an instance of MBassador with its default configuration. The dispatcher's behavior can be modified with annotations, similar to those we have seen so far; we'll cover a few more to finish this tutorial.

7.1. Exception Handling

Handlers cannot define checked exceptions. Instead, the dispatcher can be provided with an IPublicationErrorHandler as an argument to its constructor:

public class MBassadorConfigurationTest implements IPublicationErrorHandler { private MBassador dispatcher; private String messageString; private Throwable errorCause; @Before public void prepareTests() { dispatcher = new MBassador(this); dispatcher.subscribe(this); } @Test public void whenErrorOccurs_thenErrorHandler() { dispatcher.post("Error").now(); assertNull(messageString); assertNotNull(errorCause); } @Test public void whenNoErrorOccurs_thenStringHandler() { dispatcher.post("Error").now(); assertNull(errorCause); assertNotNull(messageString); } @Handler public void handleString(String message) { if ("Error".equals(message)) { throw new Error("BOOM"); } messageString = message; } @Override public void handleError(PublicationError error) { errorCause = error.getCause().getCause(); } }

When handleString() throws an Error, it is saved to errorCause.

7.2. Handler Priority

Handlers are called in reverse order of how they are added, but this isn't behavior we want to rely on. Even with the ability to call handlers in their threads, we may still need to know what order they will be called in.

We can set handler priority explicitly:

private LinkedList list = new LinkedList(); @Test public void whenRejectDispatched_thenPriorityHandled() { dispatcher.post(new RejectMessage()).now(); // Items should pop() off in reverse priority order assertTrue(1 == list.pop()); assertTrue(3 == list.pop()); assertTrue(5 == list.pop()); } @Handler(priority = 5) public void handleRejectMessage5(RejectMessage rejectMessage) { list.push(5); } @Handler(priority = 3) public void handleRejectMessage3(RejectMessage rejectMessage) { list.push(3); } @Handler(priority = 2, rejectSubtypes = true) public void handleMessage(Message rejectMessage) logger.error("Reject handler #3"); list.push(3); } @Handler(priority = 0) public void handleRejectMessage0(RejectMessage rejectMessage) { list.push(1); } 

Handlers are called from highest priority to lowest. Handlers with the default priority, which is zero, are called last. We see that the handler numbers pop() off in reverse order.

7.3. Reject Subtypes, the Easy Way

What happened to handleMessage() in the test above? We don't have to use RejectSubTypes.class to filter our sub types.

RejectSubTypes is a boolean flag that provides the same filtering as the class, but with better performance than the IMessageFilter implementation.

We still need to use the filter-based implementation for accepting subtypes only, though.

8. Conclusion

MBassador è una libreria semplice e diretta per il passaggio di messaggi tra oggetti. I messaggi possono essere organizzati in vari modi e possono essere inviati in modo sincrono o asincrono.

E, come sempre, l'esempio è disponibile in questo progetto GitHub.