Spring Batch - Tasklet vs Chunks

1. Introduzione

Spring Batch fornisce due modi diversi per implementare un lavoro: utilizzando tasklet e blocchi .

In questo articolo impareremo come configurare e implementare entrambi i metodi usando un semplice esempio di vita reale.

2. Dipendenze

Iniziamo aggiungendo le dipendenze richieste :

 org.springframework.batch spring-batch-core 4.2.0.RELEASE   org.springframework.batch spring-batch-test 4.2.0.RELEASE test 

Per ottenere l'ultima versione di spring-batch-core e spring-batch-test, fare riferimento a Maven Central.

3. Il nostro caso d'uso

Consideriamo un file CSV con il seguente contenuto:

Mae Hodges,10/22/1972 Gary Potter,02/22/1953 Betty Wise,02/17/1968 Wayne Rose,04/06/1977 Adam Caldwell,09/27/1995 Lucille Phillips,05/14/1992

La prima posizione di ogni riga rappresenta il nome di una persona e la seconda posizione rappresenta la sua data di nascita .

Il nostro caso d'uso è generare un altro file CSV che contenga il nome e l'età di ogni persona :

Mae Hodges,45 Gary Potter,64 Betty Wise,49 Wayne Rose,40 Adam Caldwell,22 Lucille Phillips,25

Ora che il nostro dominio è chiaro, andiamo avanti e creiamo una soluzione utilizzando entrambi gli approcci. Inizieremo con tasklet.

4. Approccio Tasklets

4.1. Introduzione e design

I tasklet hanno lo scopo di eseguire una singola attività all'interno di un passaggio. Il nostro lavoro consisterà in diversi passaggi che vengono eseguiti uno dopo l'altro. Ogni passaggio dovrebbe eseguire solo un'attività definita .

Il nostro lavoro consisterà in tre passaggi:

  1. Leggi le righe dal file CSV di input.
  2. Calcola l'età per ogni persona nel file CSV di input.
  3. Scrivi il nome e l'età di ogni persona in un nuovo file CSV di output.

Ora che il quadro generale è pronto, creiamo una classe per passaggio.

LinesReader sarà incaricato di leggere i dati dal file di input:

public class LinesReader implements Tasklet { // ... }

LinesProcessor calcolerà l'età di ogni persona nel file:

public class LinesProcessor implements Tasklet { // ... }

Infine, LinesWriter avrà la responsabilità di scrivere nomi ed età in un file di output:

public class LinesWriter implements Tasklet { // ... }

A questo punto, tutti i nostri passaggi implementano l' interfaccia Tasklet . Questo ci costringerà a implementare il suo metodo di esecuzione :

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { // ... }

Questo metodo è dove aggiungeremo la logica per ogni passaggio. Prima di iniziare con quel codice, configuriamo il nostro lavoro.

4.2. Configurazione

Dobbiamo aggiungere alcune configurazioni al contesto dell'applicazione di Spring . Dopo aver aggiunto la dichiarazione del bean standard per le classi create nella sezione precedente, siamo pronti per creare la nostra definizione di lavoro:

@Configuration @EnableBatchProcessing public class TaskletsConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean protected Step readLines() { return steps .get("readLines") .tasklet(linesReader()) .build(); } @Bean protected Step processLines() { return steps .get("processLines") .tasklet(linesProcessor()) .build(); } @Bean protected Step writeLines() { return steps .get("writeLines") .tasklet(linesWriter()) .build(); } @Bean public Job job() { return jobs .get("taskletsJob") .start(readLines()) .next(processLines()) .next(writeLines()) .build(); } // ... }

Ciò significa che il nostro "taskletJob" consisterà in tre passaggi. Il primo ( readLines ) eseguirà il tasklet definito nel bean linesReader e passerà alla fase successiva: processLines. ProcessLines eseguirà il tasklet definito nel bean linesProcessor e andrà al passaggio finale: writeLines .

Il nostro flusso di lavoro è definito e siamo pronti per aggiungere un po 'di logica!

4.3. Modello e utensili

Dato che manipoleremo le linee in un file CSV, creeremo una classe Line:

public class Line implements Serializable { private String name; private LocalDate dob; private Long age; // standard constructor, getters, setters and toString implementation }

Si noti che Line implementa Serializable. Questo perché Line fungerà da DTO per trasferire i dati tra i passaggi. Secondo Spring Batch, gli oggetti trasferiti tra i passaggi devono essere serializzabili .

D'altra parte, possiamo iniziare a pensare a leggere e scrivere righe.

Per questo, utilizzeremo OpenCSV:

 com.opencsv opencsv 4.1 

Cerca l'ultima versione di OpenCSV in Maven Central.

Una volta incluso OpenCSV, creeremo anche una classe FileUtils . Fornirà metodi per leggere e scrivere righe CSV:

public class FileUtils { public Line readLine() throws Exception { if (CSVReader == null) initReader(); String[] line = CSVReader.readNext(); if (line == null) return null; return new Line( line[0], LocalDate.parse( line[1], DateTimeFormatter.ofPattern("MM/dd/yyyy"))); } public void writeLine(Line line) throws Exception { if (CSVWriter == null) initWriter(); String[] lineStr = new String[2]; lineStr[0] = line.getName(); lineStr[1] = line .getAge() .toString(); CSVWriter.writeNext(lineStr); } // ... }

Si noti che readLine funge da wrapper sul metodo readNext di OpenCSV e restituisce un oggetto Line .

Allo stesso modo, writeLine avvolge writeNext di OpenCSV ricevendo un oggetto Line . L'implementazione completa di questa classe può essere trovata nel progetto GitHub.

A questo punto, siamo tutti pronti per iniziare con l'implementazione di ogni passaggio.

4.4. LinesReader

Andiamo avanti e completiamo la nostra classe LinesReader :

public class LinesReader implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesReader.class); private List lines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { lines = new ArrayList(); fu = new FileUtils( "taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Lines Reader initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { Line line = fu.readLine(); while (line != null) { lines.add(line); logger.debug("Read line: " + line.toString()); line = fu.readLine(); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines); logger.debug("Lines Reader ended."); return ExitStatus.COMPLETED; } }

Il metodo di esecuzione di LinesReader crea un'istanza FileUtils sul percorso del file di input. Quindi, aggiunge righe a un elenco finché non ci sono più righe da leggere .

La nostra classe implementa anche StepExecutionListener che fornisce due metodi extra: beforeStep e afterStep . Useremo questi metodi per inizializzare e chiudere le cose prima e dopo l' esecuzione delle esecuzioni.

Se diamo uno sguardo al codice afterStep , noteremo la riga in cui l'elenco dei risultati ( righe) viene inserito nel contesto del lavoro per renderlo disponibile per il passaggio successivo:

stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines);

A questo punto, il nostro primo passo ha già adempiuto alla sua responsabilità: caricare le righe CSV in una Lista in memoria. Passiamo al secondo passaggio e li elaboriamo.

4.5. LinesProcessor

LinesProcessor implementerà anche StepExecutionListener e, naturalmente, Tasklet . Ciò significa che implementerà anche i metodi beforeStep , execute e afterStep :

public class LinesProcessor implements Tasklet, StepExecutionListener { private Logger logger = LoggerFactory.getLogger( LinesProcessor.class); private List lines; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List) executionContext.get("lines"); logger.debug("Lines Processor initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { long age = ChronoUnit.YEARS.between( line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Lines Processor ended."); return ExitStatus.COMPLETED; } }

È facile capire che carica l' elenco delle righe dal contesto del lavoro e calcola l'età di ogni persona .

Non è necessario inserire un altro elenco di risultati nel contesto poiché le modifiche avvengono sullo stesso oggetto che proviene dal passaggio precedente.

E siamo pronti per il nostro ultimo passaggio.

4.6. LinesWriter

Il compito di LinesWriter è quello di andare oltre l' elenco delle righe e scrivere nome ed età nel file di output :

public class LinesWriter implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private List lines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List) executionContext.get("lines"); fu = new FileUtils("output.csv"); logger.debug("Lines Writer initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Lines Writer ended."); return ExitStatus.COMPLETED; } }

We're done with our job's implementation! Let's create a test to run it and see the results.

4.7. Running the Job

To run the job, we'll create a test:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = TaskletsConfig.class) public class TaskletsTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenTaskletsJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }

ContextConfiguration annotation is pointing to the Spring context configuration class, that has our job definition.

We'll need to add a couple of extra beans before running the test:

@Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } @Bean public JobRepository jobRepository() throws Exception { MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(); factory.setTransactionManager(transactionManager()); return (JobRepository) factory.getObject(); } @Bean public PlatformTransactionManager transactionManager() { return new ResourcelessTransactionManager(); } @Bean public JobLauncher jobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); return jobLauncher; }

Everything is ready! Go ahead and run the test!

After the job has finished, output.csv has the expected content and logs show the execution flow:

[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized. [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended. [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized. [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended. [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized. [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.

That's it for Tasklets. Now we can move on to the Chunks approach.

5. Chunks Approach

5.1. Introduction and Design

As the name suggests, this approach performs actions over chunks of data. That is, instead of reading, processing and writing all the lines at once, it'll read, process and write a fixed amount of records (chunk) at a time.

Then, it'll repeat the cycle until there's no more data in the file.

As a result, the flow will be slightly different:

  1. While there're lines:
    • Do for X amount of lines:
      • Read one line
      • Process one line
    • Write X amount of lines.

So, we also need to create three beans for chunk oriented approach:

public class LineReader { // ... }
public class LineProcessor { // ... }
public class LinesWriter { // ... }

Before moving to implementation, let's configure our job.

5.2. Configuration

The job definition will also look different:

@Configuration @EnableBatchProcessing public class ChunksConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public ItemReader itemReader() { return new LineReader(); } @Bean public ItemProcessor itemProcessor() { return new LineProcessor(); } @Bean public ItemWriter itemWriter() { return new LinesWriter(); } @Bean protected Step processLines(ItemReader reader, ItemProcessor processor, ItemWriter writer) { return steps.get("processLines"). chunk(2) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public Job job() { return jobs .get("chunksJob") .start(processLines(itemReader(), itemProcessor(), itemWriter())) .build(); } }

In this case, there's only one step performing only one tasklet.

However, that tasklet defines a reader, a writer and a processor that will act over chunks of data.

Note that the commit interval indicates the amount of data to be processed in one chunk. Our job will read, process and write two lines at a time.

Now we're ready to add our chunk logic!

5.3. LineReader

LineReader will be in charge of reading one record and returning a Line instance with its content.

To become a reader, our class has to implement ItemReader interface:

public class LineReader implements ItemReader { @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } }

The code is straightforward, it just reads one line and returns it. We'll also implement StepExecutionListener for the final version of this class:

public class LineReader implements ItemReader, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LineReader.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Line Reader initialized."); } @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); logger.debug("Line Reader ended."); return ExitStatus.COMPLETED; } }

It should be noticed that beforeStep and afterStep execute before and after the whole step respectively.

5.4. LineProcessor

LineProcessor follows pretty much the same logic than LineReader.

However, in this case, we'll implement ItemProcessor and its method process():

public class LineProcessor implements ItemProcessor { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } }

The process() method takes an input line, processes it and returns an output line. Again, we'll also implement StepExecutionListener:

public class LineProcessor implements ItemProcessor, StepExecutionListener { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public void beforeStep(StepExecution stepExecution) { logger.debug("Line Processor initialized."); } @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug( "Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Line Processor ended."); return ExitStatus.COMPLETED; } }

5.5. LinesWriter

Unlike reader and processor, LinesWriter will write an entire chunk of lines so that it receives a List of Lines:

public class LinesWriter implements ItemWriter, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("output.csv"); logger.debug("Line Writer initialized."); } @Override public void write(List lines) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Line Writer ended."); return ExitStatus.COMPLETED; } }

LinesWriter code speaks for itself. And again, we're ready to test our job.

5.6. Running the Job

We'll create a new test, same as the one we created for the tasklets approach:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = ChunksConfig.class) public class ChunksTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenChunksJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }

After configuring ChunksConfig as explained above for TaskletsConfig, we're all set to run the test!

Once the job is done, we can see that output.csv contains the expected result again, and the logs describe the flow:

[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized. [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized. [main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended. [main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.

We have the same result and a different flow. Logs make evident how the job executes following this approach.

6. Conclusion

Contesti diversi mostreranno la necessità di un approccio o dell'altro. Mentre i tasklet sembrano più naturali per gli scenari "un'attività dopo l'altra", i blocchi forniscono una soluzione semplice per gestire letture impaginate o situazioni in cui non si desidera conservare una quantità significativa di dati in memoria.

L'implementazione completa di questo esempio può essere trovata nel progetto GitHub .