Elaborazione batch Java EE 7

1. Introduzione

Immagina di dover completare manualmente attività come l'elaborazione di buste paga, il calcolo degli interessi e la generazione di fatture. Diventerebbe piuttosto noioso, soggetto a errori e un elenco infinito di attività manuali!

In questo tutorial, daremo un'occhiata a Java Batch Processing (JSR 352), una parte della piattaforma Jakarta EE e un'ottima specifica per automatizzare attività come queste. Offre agli sviluppatori di applicazioni un modello per lo sviluppo di robusti sistemi di elaborazione batch in modo che possano concentrarsi sulla logica aziendale.

2. Dipendenze di Maven

Poiché JSR 352 è solo una specifica, dovremo includere la sua API e implementazione, come jberet :

 javax.batch javax.batch-api 1.0.1   org.jberet jberet-core 1.0.2.Final   org.jberet jberet-support 1.0.2.Final   org.jberet jberet-se 1.0.2.Final 

Aggiungeremo anche un database in memoria in modo da poter esaminare alcuni scenari più realistici.

3. Concetti chiave

JSR 352 introduce alcuni concetti, che possiamo guardare in questo modo:

Definiamo prima ogni pezzo:

  • Partendo da sinistra, abbiamo JobOperator . Esso gestisce tutti gli aspetti del lavoro di elaborazione come l'avvio, arresto e riavvio
  • Successivamente, abbiamo il lavoro . Un lavoro è una raccolta logica di passaggi; incapsula un intero processo batch
  • Un lavoro conterrà da 1 a n Step . Ogni fase è un'unità di lavoro sequenziale e indipendente. Un passaggio è composto dalla lettura dell'input, dall'elaborazione dell'input e dalla scrittura dell'output
  • Infine, ma non meno importante, abbiamo il JobRepository che memorizza le informazioni in esecuzione dei lavori. Aiuta a tenere traccia dei lavori, del loro stato e dei risultati di completamento

I passaggi hanno un po 'più di dettagli di questo, quindi diamo un'occhiata a quello successivo. Per prima cosa, esamineremo i passaggi di Chunk e poi i Batchlet .

4. Creazione di un blocco

Come affermato in precedenza, un blocco è una sorta di passaggio . Useremo spesso una parte per esprimere un'operazione che viene eseguita più e più volte, diciamo su una serie di elementi. È un po 'come le operazioni intermedie da Java Streams.

Quando si descrive un blocco, è necessario indicare da dove prendere gli elementi, come elaborarli e dove inviarli in seguito.

4.1. Articoli di lettura

Per leggere gli elementi, dobbiamo implementare ItemReader.

In questo caso, creeremo un lettore che emetterà semplicemente i numeri da 1 a 10:

@Named public class SimpleChunkItemReader extends AbstractItemReader { private Integer[] tokens; private Integer count; @Inject JobContext jobContext; @Override public Integer readItem() throws Exception { if (count >= tokens.length) { return null; } jobContext.setTransientUserData(count); return tokens[count++]; } @Override public void open(Serializable checkpoint) throws Exception { tokens = new Integer[] { 1,2,3,4,5,6,7,8,9,10 }; count = 0; } }

Ora, stiamo solo leggendo dallo stato interno della classe qui. Ma, ovviamente, readItem potrebbe eseguire il pull da un database , dal file system o da qualche altra fonte esterna.

Notare che stiamo salvando parte di questo stato interno utilizzando JobContext # setTransientUserData () che tornerà utile in seguito.

Inoltre, nota il parametro checkpoint . Lo riprenderemo anche noi.

4.2. Articoli di elaborazione

Ovviamente, il motivo per cui stiamo dividendo è che vogliamo eseguire qualche tipo di operazione sui nostri articoli!

Ogni volta che restituiamo null da un elaboratore di articoli, eliminiamo quell'articolo dal batch.

Quindi, diciamo qui che vogliamo mantenere solo i numeri pari. Possiamo usare un ItemProcessor che rifiuta quelli dispari restituendo null :

@Named public class SimpleChunkItemProcessor implements ItemProcessor { @Override public Integer processItem(Object t) { Integer item = (Integer) t; return item % 2 == 0 ? item : null; } }

processItem verrà chiamato una volta per ogni elemento emesso dal nostro ItemReader .

4.3. Articoli di scrittura

Infine, il lavoro richiamerà ItemWriter in modo che possiamo scrivere i nostri elementi trasformati:

@Named public class SimpleChunkWriter extends AbstractItemWriter { List processed = new ArrayList(); @Override public void writeItems(List items) throws Exception { items.stream().map(Integer.class::cast).forEach(processed::add); } } 

Quanto durano gli articoli ? In un momento, definiremo la dimensione di un blocco, che determinerà la dimensione della lista che viene inviata a writeItems .

4.4. Definizione di un pezzo in un lavoro

Ora mettiamo tutto questo insieme in un file XML usando JSL o Job Specification Language. Nota che elencheremo il nostro lettore, processore, chunker e anche una dimensione del blocco:

La dimensione del blocco è la frequenza con cui l'avanzamento nel blocco viene trasferito al repository dei lavori , che è importante per garantire il completamento, in caso di guasto di una parte del sistema.

Avremo bisogno di inserire questo file in META-INF / batch-jobs per. jar e in WEB-INF / classes / META-INF / batch-jobs per i file .war .

Abbiamo assegnato al nostro lavoro l'id "simpleChunk", quindi proviamolo in uno unit test.

Ora, i lavori vengono eseguiti in modo asincrono, il che li rende difficili da testare. Nell'esempio, assicurati di controllare il nostro BatchTestHelper che esegue il polling e attende fino al completamento del lavoro:

@Test public void givenChunk_thenBatch_completesWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleChunk", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); } 

Quindi questo è ciò che sono i pezzi. Ora, diamo un'occhiata ai batchlet.

5. Creazione di un batchlet

Non tutto si adatta perfettamente a un modello iterativo. Ad esempio, potremmo avere un'attività che dobbiamo semplicemente richiamare una volta, eseguire fino al completamento e restituire uno stato di uscita.

Il contratto per un batchlet è abbastanza semplice:

@Named public class SimpleBatchLet extends AbstractBatchlet { @Override public String process() throws Exception { return BatchStatus.COMPLETED.toString(); } }

Come il JSL:

E possiamo testarlo usando lo stesso approccio di prima:

@Test public void givenBatchlet_thenBatch_completeWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

Quindi, abbiamo esaminato un paio di modi diversi per implementare i passaggi.

Vediamo ora i meccanismi per contrassegnare e garantire il progresso.

6. Checkpoint personalizzato

I fallimenti sono destinati a verificarsi nel bel mezzo di un lavoro. Dovremmo semplicemente ricominciare tutto o possiamo in qualche modo ricominciare da dove avevamo lasciato?

Come suggerisce il nome, i checkpoint ci aiutano a impostare periodicamente un segnalibro in caso di guasto.

Per impostazione predefinita, la fine dell'elaborazione del blocco è un checkpoint naturale .

Tuttavia, possiamo personalizzarlo con il nostro CheckpointAlgorithm :

@Named public class CustomCheckPoint extends AbstractCheckpointAlgorithm { @Inject JobContext jobContext; @Override public boolean isReadyToCheckpoint() throws Exception { int counterRead = (Integer) jobContext.getTransientUserData(); return counterRead % 5 == 0; } }

Ricordi il conteggio che abbiamo inserito in precedenza nei dati transitori? Qui, possiamo estrarlo con JobContext # getTransientUserDataper affermare che vogliamo impegnarci su ogni 5 numero elaborato.

Senza questo, un commit avverrebbe alla fine di ogni blocco o, nel nostro caso, ogni 3 numeri.

And then, we match that up with the checkout-algorithm directive in our XML underneath our chunk:

Let's test the code, again noting that some of the boilerplate steps are hidden away in BatchTestHelper:

@Test public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId) .stream() .map(BatchTestHelper::getCommitCount) .forEach(count -> assertEquals(3L, count.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

So, we might be expecting a commit count of 2 since we have ten items and configured the commits to be every 5th item. But, the framework does one more final read commit at the end to ensure everything has been processed, which is what brings us up to 3.

Next, let's look at how to handle errors.

7. Exception Handling

By default, the job operator will mark our job as FAILED in case of an exception.

Let's change our item reader to make sure that it fails:

@Override public Integer readItem() throws Exception { if (tokens.hasMoreTokens()) { String tempTokenize = tokens.nextToken(); throw new RuntimeException(); } return null; }

And then test:

@Test public void whenChunkError_thenBatch_CompletesWithFailed() throws Exception { // ... start job and wait for completion assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED); }

But, we can override this default behavior in a number of ways:

  • skip-limit specifies the number of exceptions this step will ignore before failing
  • retry-limit specifies the number of times the job operator should retry the step before failing
  • skippable-exception-class specifies a set of exceptions that chunk processing will ignore

So, we can edit our job so that it ignores RuntimeException, as well as a few others, just for illustration:

And now our code will pass:

@Test public void givenChunkError_thenErrorSkipped_CompletesWithSuccess() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId).stream() .map(BatchTestHelper::getProcessSkipCount) .forEach(skipCount -> assertEquals(1L, skipCount.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8. Executing Multiple Steps

We mentioned earlier that a job can have any number of steps, so let's see that now.

8.1. Firing the Next Step

By default, each step is the last step in the job.

In order to execute the next step within a batch job, we'll have to explicitly specify by using the next attribute within the step definition:

If we forget this attribute, then the next step in sequence will not get executed.

And we can see what this looks like in the API:

@Test public void givenTwoSteps_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(2 , jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.2. Flows

A sequence of steps can also be encapsulated into a flow. When the flow is finished, it is the entire flow that transitions to the execution element. Also, elements inside the flow can't transition to elements outside the flow.

We can, say, execute two steps inside a flow, and then have that flow transition to an isolated step:

And we can still see each step execution independently:

@Test public void givenFlow_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(3, jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.3. Decisions

We also have if/else support in the form of decisions. Decisions provide a customized way of determining a sequence among steps, flows, and splits.

Like steps, it works on transition elements such as next which can direct or terminate job execution.

Let's see how the job can be configured:

Any decision element needs to be configured with a class that implements Decider. Its job is to return a decision as a String.

Each next inside decision is like a case in a switch statement.

8.4. Splits

Splits are handy since they allow us to execute flows concurrently:

Of course, this means that the order isn't guaranteed.

Let's confirm that they still all get run. The flow steps will be performed in an arbitrary order, but the isolated step will always be last:

@Test public void givenSplit_thenBatch_CompletesWithSuccess() throws Exception { // ... start job and wait for completion List stepExecutions = jobOperator.getStepExecutions(executionId); assertEquals(3, stepExecutions.size()); assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

9. Partitioning a Job

We can also consume the batch properties within our Java code which have been defined in our job.

They can be scoped at three levels – the job, the step, and the batch-artifact.

Let's see some examples of how they consumed.

When we want to consume the properties at job level:

@Inject JobContext jobContext; ... jobProperties = jobContext.getProperties(); ...

This can be consumed at a step level as well:

@Inject StepContext stepContext; ... stepProperties = stepContext.getProperties(); ...

When we want to consume the properties at batch-artifact level:

@Inject @BatchProperty(name = "name") private String nameString;

This comes in handy with partitions.

See, with splits, we can run flows concurrently. But we can also partition a step into n sets of items or set separate inputs, allowing us another way to split up the work across multiple threads.

To comprehend the segment of work each partition should do, we can combine properties with partitions:

10. Stop and Restart

Now, that's it for defining jobs. Now let's talk for a minute about managing them.

We've already seen in our unit tests that we can get an instance of JobOperator from BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator();

And then, we can start the job:

Long executionId = jobOperator.start("simpleBatchlet", new Properties());

However, we can also stop the job:

jobOperator.stop(executionId);

And lastly, we can restart the job:

executionId = jobOperator.restart(executionId, new Properties());

Let's see how we can stop a running job:

@Test public void givenBatchLetStarted_whenStopped_thenBatchStopped() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobOperator.stop(executionId); jobExecution = BatchTestHelper.keepTestStopped(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); }

And if a batch is STOPPED, then we can restart it:

@Test public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess() { // ... start and stop the job assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties()); jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId)); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

11. Fetching Jobs

When a batch job is submitted then the batch runtime creates an instance of JobExecution to track it.

To obtain the JobExecution for an execution id, we can use the JobOperator#getJobExecution(executionId) method.

And, StepExecution provides helpful information for tracking a step's execution.

To obtain the StepExecution for an execution id, we can use the JobOperator#getStepExecutions(executionId) method.

And from that, we can get several metrics about the step via StepExecution#getMetrics:

@Test public void givenChunk_whenJobStarts_thenStepsHaveMetrics() throws Exception { // ... start job and wait for completion assertTrue(jobOperator.getJobNames().contains("simpleChunk")); assertTrue(jobOperator.getParameters(executionId).isEmpty()); StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0); Map metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics()); assertEquals(10L, metricTest.get(Metric.MetricType.READ_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.FILTER_COUNT).longValue()); assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.WRITE_COUNT).longValue()); // ... and many more! }

12. Disadvantages

JSR 352 is powerful, though it is lacking in a number of areas:

  • Sembra esserci mancanza di lettori e scrittori in grado di elaborare altri formati come JSON
  • Non c'è supporto per i generici
  • Il partizionamento supporta solo un singolo passaggio
  • L'API non offre nulla per supportare la pianificazione (sebbene J2EE abbia un modulo di pianificazione separato)
  • A causa della sua natura asincrona, i test possono essere una sfida
  • L'API è piuttosto prolissa

13. Conclusione

In questo articolo, abbiamo esaminato JSR 352 e imparato a conoscere blocchi, batchlet, suddivisioni, flussi e molto altro. Tuttavia, abbiamo appena scalfito la superficie.

Come sempre il codice demo può essere trovato su GitHub.