Spring Batch utilizzando Partitioner

1. Panoramica

Nella nostra precedente introduzione a Spring Batch, abbiamo introdotto il framework come strumento di elaborazione batch. Abbiamo anche esaminato i dettagli di configurazione e l'implementazione per l'esecuzione di un processo a singolo thread e processo singolo.

Per implementare un lavoro con una elaborazione parallela, viene fornita una gamma di opzioni. A un livello superiore, ci sono due modalità di elaborazione parallela:

  1. Processo singolo, multi-thread
  2. Multi-processo

In questo rapido articolo, discuteremo del partizionamento di Step , che può essere implementato sia per processi a processo singolo che per processi multi-processo.

2. Partizionamento di un passaggio

Spring Batch con partizionamento ci fornisce la possibilità di dividere l'esecuzione di uno Step :

Panoramica sul partizionamento

L'immagine sopra mostra un'implementazione di un lavoro con un passo partizionato .

C'è uno Step chiamato “Master”, la cui esecuzione è divisa in alcuni step “Slave”. Questi schiavi possono prendere il posto di un padrone e il risultato rimarrà comunque invariato. Sia master che slave sono istanze di Step . Gli slave possono essere servizi remoti o semplicemente eseguire thread in locale.

Se necessario, possiamo passare i dati dal master allo slave. I metadati (cioè il JobRepository ), fanno in modo che ogni slave venga eseguito una sola volta in una singola esecuzione del Job.

Ecco il diagramma di sequenza che mostra come funziona tutto:

Fase di partizionamento

Come mostrato, PartitionStep guida l'esecuzione. Il PartitionHandler è responsabile della suddivisione del lavoro di "Master" in "Slaves". Lo Step più a destra è lo schiavo.

3. Il Maven POM

Le dipendenze Maven sono le stesse menzionate nel nostro articolo precedente. Cioè Spring Core, Spring Batch e la dipendenza per il database (nel nostro caso, SQLite ).

4. Configurazione

Nel nostro articolo introduttivo, abbiamo visto un esempio di conversione di alcuni dati finanziari da CSV a file XML. Estendiamo lo stesso esempio.

Qui, convertiremo le informazioni finanziarie da 5 file CSV in file XML corrispondenti, utilizzando un'implementazione multi-thread.

Possiamo ottenere ciò utilizzando un unico partizionamento Job e Step . Avremo cinque thread, uno per ciascuno dei file CSV.

Prima di tutto, creiamo un lavoro:

@Bean(name = "partitionerJob") public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException { return jobs.get("partitioningJob") .start(partitionStep()) .build(); }

Come possiamo vedere, questo lavoro inizia con PartitioningStep . Questo è il nostro passaggio principale che sarà suddiviso in vari passaggi slave:

@Bean public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("partitionStep") .partitioner("slaveStep", partitioner()) .step(slaveStep()) .taskExecutor(taskExecutor()) .build(); }

Qui creeremo PartitioningStep usando StepBuilderFactory . Per questo, dobbiamo fornire le informazioni su SlaveSteps e Partitioner .

Il Partitioner è un'interfaccia che fornisce la possibilità di definire un insieme di valori di input per ciascuno degli slave. In altre parole, la logica per dividere le attività nei rispettivi thread va qui.

Creiamo un'implementazione di esso, chiamata CustomMultiResourcePartitioner , dove inseriremo i nomi dei file di input e output in ExecutionContext da trasmettere a ogni passaggio slave:

public class CustomMultiResourcePartitioner implements Partitioner { @Override public Map partition(int gridSize) { Map map = new HashMap(gridSize); int i = 0, k = 1; for (Resource resource : resources) { ExecutionContext context = new ExecutionContext(); Assert.state(resource.exists(), "Resource does not exist: " + resource); context.putString(keyName, resource.getFilename()); context.putString("opFileName", "output"+k+++".xml"); map.put(PARTITION_KEY + i, context); i++; } return map; } }

Creeremo anche il bean per questa classe, dove forniremo la directory di origine per i file di input:

@Bean public CustomMultiResourcePartitioner partitioner() { CustomMultiResourcePartitioner partitioner = new CustomMultiResourcePartitioner(); Resource[] resources; try { resources = resoursePatternResolver .getResources("file:src/main/resources/input/*.csv"); } catch (IOException e) { throw new RuntimeException("I/O problems when resolving" + " the input file pattern.", e); } partitioner.setResources(resources); return partitioner; }

Definiremo il passaggio slave, proprio come qualsiasi altro passaggio con il lettore e lo scrittore. Il lettore e lo scrittore saranno gli stessi che abbiamo visto nel nostro esempio introduttivo, tranne che riceveranno il parametro del nome del file da StepExecutionContext.

Si noti che questi bean devono essere definiti come ambito in modo che possano ricevere i parametri stepExecutionContext , ad ogni passaggio. Se non vengono definiti con l'ambito del passaggio, i loro bean verranno creati inizialmente e non accetteranno i nomi dei file a livello di passaggio:

@StepScope @Bean public FlatFileItemReader itemReader( @Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException { FlatFileItemReader reader = new FlatFileItemReader(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = {"username", "userid", "transactiondate", "amount"}; tokenizer.setNames(tokens); reader.setResource(new ClassPathResource("input/" + filename)); DefaultLineMapper lineMapper = new DefaultLineMapper(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper); return reader; } 
@Bean @StepScope public ItemWriter itemWriter(Marshaller marshaller, @Value("#{stepExecutionContext[opFileName]}") String filename) throws MalformedURLException { StaxEventItemWriter itemWriter = new StaxEventItemWriter(); itemWriter.setMarshaller(marshaller); itemWriter.setRootTagName("transactionRecord"); itemWriter.setResource(new ClassPathResource("xml/" + filename)); return itemWriter; }

Pur menzionando il lettore e lo scrittore nel passaggio slave, possiamo passare gli argomenti come null, perché questi nomi di file non verranno utilizzati, poiché riceveranno i nomi di file da stepExecutionContext :

@Bean public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("slaveStep").chunk(1) .reader(itemReader(null)) .writer(itemWriter(marshaller(), null)) .build(); }

5. conclusione

In questo tutorial, abbiamo discusso come implementare un lavoro con elaborazione parallela utilizzando Spring Batch.

Come sempre, l'implementazione completa per questo esempio è disponibile su GitHub.