Spring Integration Java DSL

1. Introduzione

In questo tutorial, impareremo a conoscere il DSL Java di Spring Integration per la creazione di integrazioni di applicazioni.

Prenderemo l'integrazione dello spostamento dei file che abbiamo creato in Introduzione a Spring Integration e utilizzeremo invece il DSL.

2. Dipendenze

Spring Integration Java DSL fa parte di Spring Integration Core.

Quindi, possiamo aggiungere quella dipendenza:

 org.springframework.integration spring-integration-core 5.0.6.RELEASE 

E per lavorare sulla nostra applicazione di spostamento di file, avremo anche bisogno di Spring Integration File:

 org.springframework.integration spring-integration-file 5.0.6.RELEASE 

3. Spring Integration Java DSL

Prima del Java DSL, gli utenti configuravano i componenti Spring Integration in XML.

Il DSL introduce alcuni builder fluenti da cui possiamo facilmente creare una pipeline completa di Spring Integration puramente in Java.

Quindi, supponiamo di voler creare un canale che metta in maiuscolo tutti i dati che provengono dalla pipe.

In passato, avremmo potuto fare:

E ora possiamo invece fare:

@Bean public IntegrationFlow upcaseFlow() { return IntegrationFlows.from("input") .transform(String::toUpperCase) .get(); }

4. L'app per lo spostamento dei file

Per iniziare la nostra integrazione per lo spostamento di file, avremo bisogno di alcuni semplici elementi costitutivi.

4.1. Flusso di integrazione

Il primo elemento costitutivo di cui abbiamo bisogno è un flusso di integrazione, che possiamo ottenere dal builder IntegrationFlows :

IntegrationFlows.from(...)

from può richiedere diversi tipi, ma in questo tutorial ne vedremo solo tre:

  • MessageSource s
  • MessageChannel s e
  • String s

Parleremo di tutti e tre a breve.

Dopo aver chiamato da , alcuni metodi di personalizzazione sono ora disponibili:

IntegrationFlow flow = IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .handle(targetDirectory()) // add more components .get();

Alla fine, IntegrationFlows produrrà sempre un'istanza di IntegrationFlow, che è il prodotto finale di qualsiasi app Spring Integration.

Questo modello di prendere input, eseguire le trasformazioni appropriate ed emettere i risultati è fondamentale per tutte le app Spring Integration .

4.2. Descrivere una sorgente di input

Innanzitutto, per spostare i file, dovremo indicare al nostro flusso di integrazione dove dovrebbe cercarli e, per questo, abbiamo bisogno di un MessageSource:

@Bean public MessageSource sourceDirectory() { // .. create a message source }

In poche parole, MessageSource è un luogo da cui possono provenire messaggi esterni all'applicazione.

Più specificamente, abbiamo bisogno di qualcosa che possa adattare quella fonte esterna alla rappresentazione dei messaggi di Spring. E poiché questo adattamento è focalizzato sull'input , questi sono spesso chiamati adattatori del canale di ingresso.

La dipendenza spring-integration-file ci fornisce un adattatore del canale di input che è ottimo per il nostro caso d'uso: FileReadingMessageSource:

@Bean public MessageSource sourceDirectory() { FileReadingMessageSource messageSource = new FileReadingMessageSource(); messageSource.setDirectory(new File(INPUT_DIR)); return messageSource; }

Qui, il nostro FileReadingMessageSource leggerà una directory data da INPUT_DIR e creerà un MessageSource da esso.

Specifichiamo questo come nostra origine in una chiamata di IntegrationFlows.from :

IntegrationFlows.from(sourceDirectory());

4.3. Configurazione di una sorgente di ingresso

Ora, se stiamo pensando a questa come un'applicazione di lunga durata, probabilmente vorremo essere in grado di notare i file mentre arrivano , non solo spostare i file che sono già presenti all'avvio.

Per facilitare ciò, da può anche richiedere configuratori aggiuntivi come ulteriore personalizzazione della sorgente di ingresso:

IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));

In questo caso, possiamo rendere la nostra sorgente di input più resiliente dicendo a Spring Integration di eseguire il polling di quella sorgente, in questo caso il nostro filesystem, ogni 10 secondi.

E, naturalmente, questo non si applica solo alla nostra sorgente di input del file, potremmo aggiungere questo poller a qualsiasi MessageSource .

4.4. Filtro dei messaggi da una sorgente di input

Successivamente, supponiamo di volere che la nostra applicazione di spostamento file sposti solo file specifici, ad esempio file di immagine con estensione jpg .

Per questo, possiamo usare GenericSelector :

@Bean public GenericSelector onlyJpgs() { return new GenericSelector() { @Override public boolean accept(File source) { return source.getName().endsWith(".jpg"); } }; }

Quindi, aggiorniamo nuovamente il nostro flusso di integrazione:

IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs());

Oppure, poiché questo filtro è così semplice, avremmo potuto invece definirlo usando un lambda :

IntegrationFlows.from(sourceDirectory()) .filter(source -> ((File) source).getName().endsWith(".jpg"));

4.5. Gestione dei messaggi con attivatori di servizi

Ora che abbiamo un elenco filtrato di file, dobbiamo scriverli in una nuova posizione.

Servizio Activator s sono ciò che ci rivolgiamo quando stiamo pensando di uscite in Integrazione primavera.

Usiamo l' attivatore del servizio FileWritingMessageHandler dal file spring-integration :

@Bean public MessageHandler targetDirectory() { FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR)); handler.setFileExistsMode(FileExistsMode.REPLACE); handler.setExpectReply(false); return handler; }

Here, our FileWritingMessageHandler will write each Message payload it receives to OUTPUT_DIR.

Again, let's update:

IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .handle(targetDirectory());

And notice, by the way, the usage of setExpectReply. Because integration flows can bebidirectional, this invocation indicates that this particular pipe is one way.

4.6. Activating Our Integration Flow

When we have added all our components we need to register our IntegrationFlow as a bean to activate it:

@Bean public IntegrationFlow fileMover() { return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000))) .filter(onlyJpgs()) .handle(targetDirectory()) .get(); }

The get method extracts an IntegrationFlow instance that we need to register as a Spring Bean.

As soon as our application context loads, all our components contained in our IntegrationFlow gets activated.

And now, our application will start moving files from the source directory to target directory.

5. Additional Components

In our DSL-based file-moving application, we created an Inbound Channel Adapter, a Message Filter, and a Service Activator.

Let's look at a few other common Spring Integration components and see how we might use them.

5.1. Message Channels

As mentioned earlier, a Message Channel is another way to initialize a flow:

IntegrationFlows.from("anyChannel")

We can read this as “please find or create a channel bean called anyChannel. Then, read any data that is fed into anyChannel from other flows.”

But, really it is more general-purpose than that.

Simply put, a channel abstracts away producers from consumers, and we can think of it as a Java Queue. A channel can be inserted at any point in the flow.

Let's say, for example, that we want to prioritize the files as they get moved from one directory to the next:

@Bean public PriorityChannel alphabetically() { return new PriorityChannel(1000, (left, right) -> ((File)left.getPayload()).getName().compareTo( ((File)right.getPayload()).getName())); }

Then, we can insert an invocation to channel in between our flow:

@Bean public IntegrationFlow fileMover() { return IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .channel("alphabetically") .handle(targetDirectory()) .get(); }

There are dozens of channels to pick from, some of the more handy ones being for concurrency, auditing, or intermediate persistence (think Kafka or JMS buffers).

Also, channels can be powerful when combined with Bridges.

5.2. Bridge

When we want to combine two channels, we use a Bridge.

Let's imagine that instead of writing directly to an output directory, we instead had our file-moving app write to another channel:

@Bean public IntegrationFlow fileReader() { return IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .channel("holdingTank") .get(); }

Now, because we've simply written it to a channel, we can bridge from there to other flows.

Let's create a bridge that polls our holding tank for messages and writes them to a destination:

@Bean public IntegrationFlow fileWriter() { return IntegrationFlows.from("holdingTank") .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20))) .handle(targetDirectory()) .get(); }

Again, because we wrote to an intermediate channel, now we can add another flow that takes these same files and writes them at a different rate:

@Bean public IntegrationFlow anotherFileWriter() { return IntegrationFlows.from("holdingTank") .bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10))) .handle(anotherTargetDirectory()) .get(); }

As we can see, individual bridges can control the polling configuration for different handlers.

As soon as our application context is loaded, we now have a more complex app in action that will start moving files from the source directory to two target directories.

6. Conclusion

In questo articolo, abbiamo visto vari modi per utilizzare Spring Integration Java DSL per creare diverse pipeline di integrazione.

Essenzialmente, siamo stati in grado di ricreare l'applicazione per lo spostamento di file da un tutorial precedente, questa volta utilizzando java puro.

Inoltre, abbiamo esaminato alcuni altri componenti come canali e ponti.

Il codice sorgente completo utilizzato in questo tutorial è disponibile su Github.