WebSocket con Play Framework e Akka

1. Panoramica

Quando desideriamo che i nostri client Web mantengano un dialogo con il nostro server, WebSocket può essere una soluzione utile. I WebSocket mantengono una connessione full-duplex permanente. Questo ci dà la possibilità di inviare messaggi bidirezionali tra il nostro server e client.

In questo tutorial, impareremo come utilizzare WebSocket con Akka nel Play Framework.

2. Configurazione

Configuriamo una semplice applicazione di chat. L'utente invierà messaggi al server e il server risponderà con un messaggio da JSONPlaceholder.

2.1. Configurazione dell'applicazione Play Framework

Costruiremo questa applicazione utilizzando il Play Framework.

Seguiamo le istruzioni di Introduzione a Play in Java per configurare ed eseguire una semplice applicazione Play Framework.

2.2. Aggiunta dei file JavaScript necessari

Inoltre, dovremo lavorare con JavaScript per lo scripting lato client. Questo ci consentirà di ricevere nuovi messaggi inviati dal server. Useremo la libreria jQuery per questo.

Aggiungiamo jQuery in fondo al file app / views / i ndex.scala.html :

2.3. Configurazione di Akka

Infine, useremo Akka per gestire le connessioni WebSocket sul lato server.

Andiamo al file build.sbt e aggiungiamo le dipendenze.

Dobbiamo aggiungere le dipendenze akka-actor e akka-testkit :

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion

Abbiamo bisogno di questi per poter utilizzare e testare il codice Akka Framework.

Successivamente, utilizzeremo i flussi Akka. Quindi aggiungiamo la dipendenza akka-stream :

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion

Infine, dobbiamo chiamare un endpoint di riposo da un attore Akka. Per questo, avremo bisogno della dipendenza akka-http . Quando lo facciamo, l'endpoint restituirà dati JSON che dovremo deserializzare, quindi dobbiamo aggiungere anche la dipendenza akka-http-jackson :

libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion

E ora siamo tutti pronti. Vediamo come far funzionare i WebSocket!

3. Gestione di WebSocket con Akka Actors

Il meccanismo di gestione WebSocket di Play è basato sui flussi Akka. Un WebSocket è modellato come Flow. Pertanto, i messaggi WebSocket in arrivo vengono inseriti nel flusso e i messaggi prodotti dal flusso vengono inviati al client.

Per gestire un WebSocket utilizzando un attore, avremo bisogno dell'utilità di riproduzione ActorFlow che converte un ActorRef in un flusso. Ciò richiede principalmente del codice Java, con una piccola configurazione.

3.1. Il metodo del controller WebSocket

Innanzitutto, abbiamo bisogno di un'istanza di Materializer . Materializer è una fabbrica per i motori di esecuzione del flusso.

Dobbiamo iniettare ActorSystem e Materializer nell'app controller / controller / HomeController.java :

private ActorSystem actorSystem; private Materializer materializer; @Inject public HomeController( ActorSystem actorSystem, Materializer materializer) { this.actorSystem = actorSystem; this.materializer = materializer; }

Aggiungiamo ora un metodo di controller socket:

public WebSocket socket() { return WebSocket.Json .acceptOrResult(this::createActorFlow); }

Qui stiamo chiamando la funzione acceptOrResult che accetta l'intestazione della richiesta e restituisce un futuro. Il futuro restituito è un flusso per gestire i messaggi WebSocket.

Possiamo, invece, rifiutare la richiesta e restituire un risultato di rifiuto.

Ora creiamo il flusso:

private CompletionStage
    
     > createActorFlow(Http.RequestHeader request) { return CompletableFuture.completedFuture( F.Either.Right(createFlowForActor())); }
    

La classe F in Play Framework definisce un set di helper dello stile di programmazione funzionale. In questo caso, stiamo usando F. O. Diritto di accettare la connessione e restituire il flusso.

Diciamo che volevamo rifiutare la connessione quando il client non è autenticato.

Per questo, potremmo controllare se un nome utente è impostato nella sessione. E se non lo è, rifiutiamo la connessione con HTTP 403 Forbidden:

private CompletionStage
    
     > createActorFlow2(Http.RequestHeader request) { return CompletableFuture.completedFuture( request.session() .getOptional("username") .map(username -> F.Either.
     
      Right( createFlowForActor())) .orElseGet(() -> F.Either.Left(forbidden()))); }
     
    

Usiamo F.Either.Right per rifiutare la connessione nello stesso modo in cui forniamo un flusso con F.Either.Right .

Infine, colleghiamo il flusso all'attore che gestirà i messaggi:

private Flow createFlowForActor() { return ActorFlow.actorRef(out -> Messenger.props(out), actorSystem, materializer); }

The ActorFlow.actorRef creates a flow that is handled by the Messenger actor.

3.2. The routes File

Now, let's add the routes definitions for the controller methods in conf/routes:

GET / controllers.HomeController.index(request: Request) GET /chat controllers.HomeController.socket GET /chat/with/streams controllers.HomeController.akkaStreamsSocket GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)

These route definitions map incoming HTTP requests to controller action methods as explained in Routing in Play Applications in Java.

3.3. The Actor Implementation

The most important part of the actor class is the createReceive method which determines which messages the actor can handle:

@Override public Receive createReceive() { return receiveBuilder() .match(JsonNode.class, this::onSendMessage) .matchAny(o -> log.error("Received unknown message: {}", o.getClass())) .build(); }

The actor will forward all messages matching the JsonNode class to the onSendMessage handler method:

private void onSendMessage(JsonNode jsonNode) { RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode); String message = requestDTO.getMessage().toLowerCase(); //.. processMessage(requestDTO); }

Then the handler will respond to every message using the processMessage method:

private void processMessage(RequestDTO requestDTO) { CompletionStage responseFuture = getRandomMessage(); responseFuture.thenCompose(this::consumeHttpResponse) .thenAccept(messageDTO -> out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf())); }

3.4. Consuming Rest API with Akka HTTP

We'll send HTTP requests to the dummy message generator at JSONPlaceholder Posts. When the response arrives, we send the response to the client by writing it out.

Let's have a method that calls the endpoint with a random post id:

private CompletionStage getRandomMessage() { int postId = ThreadLocalRandom.current().nextInt(0, 100); return Http.get(getContext().getSystem()) .singleRequest(HttpRequest.create( "//jsonplaceholder.typicode.com/posts/" + postId)); }

We're also processing the HttpResponse we get from calling the service in order to get the JSON response:

private CompletionStage consumeHttpResponse( HttpResponse httpResponse) { Materializer materializer = Materializer.matFromSystem(getContext().getSystem()); return Jackson.unmarshaller(MessageDTO.class) .unmarshal(httpResponse.entity(), materializer) .thenApply(messageDTO -> { log.info("Received message: {}", messageDTO); discardEntity(httpResponse, materializer); return messageDTO; }); }

The MessageConverter class is a utility for converting between JsonNode and the DTOs:

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) { ObjectMapper mapper = new ObjectMapper(); return mapper.convertValue(jsonNode, MessageDTO.class); }

Next, we need to discard the entity. The discardEntityBytes convenience method serves the purpose of easily discarding the entity if it has no purpose for us.

Let's see how to discard the bytes:

private void discardEntity( HttpResponse httpResponse, Materializer materializer) { HttpMessage.DiscardedEntity discarded = httpResponse.discardEntityBytes(materializer); discarded.completionStage() .whenComplete((done, ex) -> log.info("Entity discarded completely!")); }

Now having done the handling of the WebSocket, let's see how we can set up a client for this using HTML5 WebSockets.

4. Setting up the WebSocket Client

For our client, let's build a simple web-based chat application.

4.1. The Controller Action

We need to define a controller action that renders the index page. We'll put this in the controller class app.controllers.HomeController:

public Result index(Http.Request request) { String url = routes.HomeController.socket() .webSocketURL(request); return ok(views.html.index.render(url)); } 

4.2. The Template Page

Now, let's head over to the app/views/ndex.scala.html page and add a container for the received messages and a form to capture a new message:

 F   Send 

We'll also need to pass in the URL for the WebSocket controller action by declaring this parameter at the top of the app/views/index.scala.htmlpage:

@(url: String)

4.3. WebSocket Event Handlers in JavaScript

And now, we can add the JavaScript to handle the WebSocket events. For simplicity, we'll add the JavaScript functions at the bottom of the app/views/index.scala.html page.

Let's declare the event handlers:

var webSocket; var messageInput; function init() { initWebSocket(); } function initWebSocket() { webSocket = new WebSocket("@url"); webSocket.onopen = onOpen; webSocket.onclose = onClose; webSocket.onmessage = onMessage; webSocket.onerror = onError; }

Let's add the handlers themselves:

function onOpen(evt) { writeToScreen("CONNECTED"); } function onClose(evt) { writeToScreen("DISCONNECTED"); } function onError(evt) { writeToScreen("ERROR: " + JSON.stringify(evt)); } function onMessage(evt) { var receivedData = JSON.parse(evt.data); appendMessageToView("Server", receivedData.body); }

Then, to present the output, we'll use the functions appendMessageToView and writeToScreen:

function appendMessageToView(title, message) { $("#messageContent").append("

" + title + ": " + message + "

"); } function writeToScreen(message) { console.log("New message: ", message); }

4.4. Running and Testing the Application

We're ready to test the application, so let's run it:

cd websockets sbt run

With the application running, we can chat with the server by visiting //localhost:9000:

Every time we type a message and hit Send the server will immediately respond with some lorem ipsum from the JSON Placeholder service.

5. Handling WebSockets Directly with Akka Streams

If we are processing a stream of events from a source and sending these to the client, then we can model this around Akka streams.

Let's see how we can use Akka streams in an example where the server sends messages every two seconds.

We'll start with the WebSocket action in the HomeController:

public WebSocket akkaStreamsSocket() { return WebSocket.Json.accept(request -> { Sink in = Sink.foreach(System.out::println); MessageDTO messageDTO = new MessageDTO("1", "1", "Title", "Test Body"); Source out = Source.tick( Duration.ofSeconds(2), Duration.ofSeconds(2), MessageConverter.messageToJsonNode(messageDTO) ); return Flow.fromSinkAndSource(in, out); }); }

The Source#tick method takes three parameters. The first is the initial delay before the first tick is processed, and the second is the interval between successive ticks. We've set both values to two seconds in the above snippet. The third parameter is an object that should be returned on each tick.

To see this in action, we need to modify the URL in the index action and make it point to the akkaStreamsSocket endpoint:

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

And now refreshing the page, we'll see a new entry every two seconds:

6. Terminating the Actor

At some point, we'll need to shut down the chat, either through a user request or through a timeout.

6.1. Handling Actor Termination

How do we detect when a WebSocket has been closed?

Play will automatically close the WebSocket when the actor that handles the WebSocket terminates. So we can handle this scenario by implementing the Actor#postStop method:

@Override public void postStop() throws Exception { log.info("Messenger actor stopped at {}", OffsetDateTime.now() .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); }

6.2. Manually Terminating the Actor

Further, if we must stop the actor, we can send a PoisonPill to the actor. In our example application, we should be able to handle a “stop” request.

Let's see how to do this in the onSendMessage method:

private void onSendMessage(JsonNode jsonNode) { RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode); String message = requestDTO.getMessage().toLowerCase(); if("stop".equals(message)) { MessageDTO messageDTO = createMessageDTO("1", "1", "Stop", "Stopping actor"); out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()); self().tell(PoisonPill.getInstance(), getSelf()); } else { log.info("Actor received. {}", requestDTO); processMessage(requestDTO); } }

When we receive a message, we check if it's a stop request. If it is, we send the PoisonPill. Otherwise, we process the request.

7. Configuration Options

We can configure several options in terms of how the WebSocket should be handled. Let's look at a few.

7.1. WebSocket Frame Length

WebSocket communication involves the exchange of data frames.

The WebSocket frame length is configurable. We have the option to adjust the frame length to our application requirements.

Configuring a shorter frame length may help reduce denial of service attacks that use long data frames. We can change the frame length for the application by specifying the max length in application.conf:

play.server.websocket.frame.maxLength = 64k

We can also set this configuration option by specifying the max length as a command-line parameter:

sbt -Dwebsocket.frame.maxLength=64k run

7.2. Connection Idle Timeout

By default, the actor we use to handle the WebSocket is terminated after one minute. This is because the Play server in which our application is running has a default idle timeout of 60 seconds. This means that all connections that do not receive a request in sixty seconds are closed automatically.

We can change this through configuration options. Let's head over to our application.conf and change the server to have no idle timeout:

play.server.http.idleTimeout = "infinite"

Or we can pass in the option as command-line arguments:

sbt -Dhttp.idleTimeout=infinite run

We can also configure this by specifying devSettings in build.sbt.

Config options specified in build.sbt are only used in development, they will be ignored in production:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

Se eseguiamo nuovamente l'applicazione, l'attore non verrà terminato.

Possiamo cambiare il valore in secondi:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

Possiamo trovare ulteriori informazioni sulle opzioni di configurazione disponibili nella documentazione di Play Framework.

8. Conclusione

In questo tutorial, abbiamo implementato WebSocket in Play Framework con attori Akka e Akka Streams.

Abbiamo quindi esaminato come utilizzare direttamente gli attori Akka e poi abbiamo visto come è possibile configurare Akka Streams per gestire la connessione WebSocket.

Sul lato client, abbiamo utilizzato JavaScript per gestire i nostri eventi WebSocket.

Infine, abbiamo esaminato alcune opzioni di configurazione che possiamo utilizzare.

Come al solito, il codice sorgente di questo tutorial è disponibile su GitHub.