Integrazione di Spring con AWS Kinesis

1. Introduzione

Kinesis è uno strumento per la raccolta, l'elaborazione e l'analisi dei flussi di dati in tempo reale, sviluppato in Amazon. Uno dei suoi principali vantaggi è che aiuta con lo sviluppo di applicazioni guidate dagli eventi.

In questo tutorial, esploreremo alcune librerie che consentono alla nostra applicazione Spring di produrre e consumare record da un flusso Kinesis . Gli esempi di codice mostreranno le funzionalità di base ma non rappresentano il codice pronto per la produzione.

2. Prerequisito

Prima di procedere, dobbiamo fare due cose.

Il primo è creare un progetto Spring, poiché l'obiettivo qui è interagire con Kinesis da un progetto Spring.

Il secondo è creare un Kinesis Data Stream. Possiamo farlo da un browser web nel nostro account AWS. Un'alternativa per i fan di AWS CLI tra di noi è usare la riga di comando. Poiché interagiremo con esso dal codice, dobbiamo anche avere a portata di mano le credenziali AWS IAM, la chiave di accesso e la chiave segreta e la regione.

Tutti i nostri produttori creeranno record di indirizzi IP fittizi, mentre i consumatori leggeranno quei valori e li elencheranno nella console dell'applicazione.

3. SDK AWS per Java

La primissima libreria che useremo è l'SDK AWS per Java. Il suo vantaggio è che ci consente di gestire molte parti del lavoro con Kinesis Data Streams. Possiamo leggere dati, produrre dati, creare flussi di dati e reshard flussi di dati . Lo svantaggio è che per avere codice pronto per la produzione, dovremo codificare aspetti come il resharding, la gestione degli errori o un demone per mantenere in vita il consumatore.

3.1. Dipendenza da Maven

La dipendenza amazon-kinesis-client Maven porterà tutto ciò di cui abbiamo bisogno per avere esempi funzionanti. Ora lo aggiungeremo al nostro file pom.xml :

 com.amazonaws amazon-kinesis-client 1.11.2 

3.2. Installazione primaverile

Riutilizziamo l' oggetto AmazonKinesis necessario per interagire con il nostro Kinesis Stream. Lo creeremo come @Bean all'interno della nostra classe @SpringBootApplication :

@Bean public AmazonKinesis buildAmazonKinesis() { BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); return AmazonKinesisClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) .withRegion(Regions.EU_CENTRAL_1) .build(); }

Successivamente, definiamo aws.access.key e aws.secret.key , necessari per la macchina locale, in application.properties :

aws.access.key=my-aws-access-key-goes-here aws.secret.key=my-aws-secret-key-goes-here

E li leggeremo usando l' annotazione @Value :

@Value("${aws.access.key}") private String accessKey; @Value("${aws.secret.key}") private String secretKey;

Per motivi di semplicità, faremo affidamento sui metodi @Scheduled per creare e utilizzare i record.

3.3. Consumatore

L' SDK AWS Kinesis Consumer utilizza un modello pull , il che significa che il nostro codice trarrà i record dai frammenti del flusso di dati Kinesis:

GetRecordsRequest recordsRequest = new GetRecordsRequest(); recordsRequest.setShardIterator(shardIterator.getShardIterator()); recordsRequest.setLimit(25); GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest); while (!recordsResult.getRecords().isEmpty()) { recordsResult.getRecords().stream() .map(record -> new String(record.getData().array())) .forEach(System.out::println); recordsRequest.setShardIterator(recordsResult.getNextShardIterator()); recordsResult = kinesis.getRecords(recordsRequest); }

L' oggetto GetRecordsRequest crea la richiesta di dati di flusso . Nel nostro esempio, abbiamo definito un limite di 25 record per richiesta e continuiamo a leggere finché non c'è più niente da leggere.

Possiamo anche notare che, per la nostra iterazione, abbiamo utilizzato un oggetto GetShardIteratorResult . Abbiamo creato questo oggetto all'interno di un metodo @PostConstruc t in modo da iniziare subito a tracciare i record:

private GetShardIteratorResult shardIterator; @PostConstruct private void buildShardIterator() { GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest(); readShardsRequest.setStreamName(IPS_STREAM); readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST); readShardsRequest.setShardId(IPS_SHARD_ID); this.shardIterator = kinesis.getShardIterator(readShardsRequest); }

3.4. Produttore

Vediamo ora come gestire la creazione di record per il nostro flusso di dati Kinesis .

Inseriamo i dati utilizzando un oggetto PutRecordsRequest . Per questo nuovo oggetto, aggiungiamo un elenco che comprende più oggetti PutRecordsRequestEntry :

List entries = IntStream.range(1, 200).mapToObj(ipSuffix -> { PutRecordsRequestEntry entry = new PutRecordsRequestEntry(); entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes())); entry.setPartitionKey(IPS_PARTITION_KEY); return entry; }).collect(Collectors.toList()); PutRecordsRequest createRecordsRequest = new PutRecordsRequest(); createRecordsRequest.setStreamName(IPS_STREAM); createRecordsRequest.setRecords(entries); kinesis.putRecords(createRecordsRequest);

We've created a basic consumer and a producer of simulated IP records. All that's left to do now is to run our Spring project and see IPs listed in our application console.

4. KCL and KPL

Kinesis Client Library (KCL) is a library that simplifies the consuming of records. It’s also a layer of abstraction over the AWS SDK Java APIs for Kinesis Data Streams. Behind the scenes, the library handles load balancing across many instances, responding to instance failures, checkpointing processed records, and reacting to resharding.

Kinesis Producer Library (KPL) is a library useful for writing to a Kinesis data stream. It also provides a layer of abstraction that sits over the AWS SDK Java APIs for Kinesis Data Streams. For better performance, the library automatically handles batching, multi-threading, and retry logic.

KCL and KPL both have the main advantage that they're easy to use so that we can focus on producing and consuming records.

4.1. Maven Dependencies

The two libraries can be brought separately in our project if needed. To include KPL and KCL in our Maven project, we need to update our pom.xml file:

 com.amazonaws amazon-kinesis-producer 0.13.1   com.amazonaws amazon-kinesis-client 1.11.2 

4.2. Spring Setup

The only Spring preparation we need is to make sure we have the IAM credentials at hand. The values for aws.access.key and aws.secret.key are defined in our application.properties file so we can read them with @Value when needed.

4.3. Consumer

First, we'll create a class that implements the IRecordProcessor interface and defines our logic for how to handle Kinesis data stream records, which is to print them in the console:

public class IpProcessor implements IRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { processRecordsInput.getRecords() .forEach(record -> System.out.println(new String(record.getData().array()))); } @Override public void shutdown(ShutdownInput shutdownInput) { } }

The next step is to define a factory class that implements the IRecordProcessorFactory interface and returns a previously created IpProcessor object:

public class IpProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new IpProcessor(); } }

And now for the final step, we’ll use a Worker object to define our consumer pipeline. We need a KinesisClientLibConfiguration object that will define, if needed, the IAM Credentials and AWS Region.

We’ll pass the KinesisClientLibConfiguration, and our IpProcessorFactory object, to our Worker and then start it in a separate thread. We keep this logic of consuming records always alive with the use of the Worker class, so we’re continuously reading new records now:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration( APP_NAME, IPS_STREAM, new AWSStaticCredentialsProvider(awsCredentials), IPS_WORKER) .withRegionName(Regions.EU_CENTRAL_1.getName()); final Worker worker = new Worker.Builder() .recordProcessorFactory(new IpProcessorFactory()) .config(consumerConfig) .build(); CompletableFuture.runAsync(worker.run());

4.4. Producer

Let's now define the KinesisProducerConfiguration object, adding the IAM Credentials and the AWS Region:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration() .setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials)) .setVerifyCertificate(false) .setRegion(Regions.EU_CENTRAL_1.getName()); this.kinesisProducer = new KinesisProducer(producerConfig);

We'll include the kinesisProducer object previously created in a @Scheduled job and produce records for our Kinesis data stream continuously:

IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes())) .forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));

5. Spring Cloud Stream Binder Kinesis

We’ve already seen two libraries, both created outside of the Spring ecosystem. We’ll now see how the Spring Cloud Stream Binder Kinesis can simplify our life further while building on top of Spring Cloud Stream.

5.1. Maven Dependency

The Maven dependency we need to define in our application for the Spring Cloud Stream Binder Kinesis is:

 org.springframework.cloud spring-cloud-stream-binder-kinesis 1.2.1.RELEASE 

5.2. Spring Setup

When running on EC2, the required AWS properties are automatically discovered, so there is no need to define them. Since we're running our examples on a local machine, we need to define our IAM access key, secret key, and region for our AWS account. We've also disabled the automatic CloudFormation stack name detection for the application:

cloud.aws.credentials.access-key=my-aws-access-key cloud.aws.credentials.secret-key=my-aws-secret-key cloud.aws.region.static=eu-central-1 cloud.aws.stack.auto=false

Spring Cloud Stream is bundled with three interfaces that we can use in our stream binding:

  • The Sink is for data ingestion
  • The Source is used for publishing records
  • The Processor is a combination of both

We can also define our own interfaces if we need to.

5.3. Consumer

Defining a consumer is a two-part job. First, we'll define, in the application.properties, the data stream from which we'll consume:

spring.cloud.stream.bindings.input.destination=live-ips spring.cloud.stream.bindings.input.group=live-ips-group spring.cloud.stream.bindings.input.content-type=text/plain

And next, let's define a Spring @Component class. The annotation @EnableBinding(Sink.class) will allow us to read from the Kinesis stream using the method annotated with @StreamListener(Sink.INPUT):

@EnableBinding(Sink.class) public class IpConsumer { @StreamListener(Sink.INPUT) public void consume(String ip) { System.out.println(ip); } }

5.4. Producer

The producer can also be split in two. First, we have to define our stream properties inside application.properties:

spring.cloud.stream.bindings.output.destination=live-ips spring.cloud.stream.bindings.output.content-type=text/plain

And then we add @EnableBinding(Source.class) on a Spring @Component and create new test messages every few seconds:

@Component @EnableBinding(Source.class) public class IpProducer { @Autowired private Source source; @Scheduled(fixedDelay = 3000L) private void produce() { IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix) .forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build())); } }

Questo è tutto ciò di cui abbiamo bisogno per far funzionare Spring Cloud Stream Binder Kinesis. Possiamo semplicemente avviare l'applicazione ora.

6. Conclusione

In questo articolo, abbiamo visto come integrare il nostro progetto Spring con due librerie AWS per interagire con un Kinesis Data Stream. Abbiamo anche visto come utilizzare la libreria Spring Cloud Stream Binder Kinesis per rendere l'implementazione ancora più semplice.

Il codice sorgente di questo articolo può essere trovato su Github.