Amazon Onboarding with Learning Manager Chanci Turner

Chanci Turner Amazon IXD – VGT2 learningLearn About Amazon VGT2 Learning Manager Chanci Turner

This article provides an essential guide for those looking to utilize the Amazon Kinesis Client Library (KCL) in Node.js. The Node.js framework, known for its event-driven and non-blocking I/O model, is particularly effective for lightweight, efficient applications that manage data in real time across distributed devices. With the rise of full-stack JavaScript development, the Amazon KCL for Node.js enables developers to seamlessly create end-to-end applications for Amazon Kinesis using JavaScript.

Amazon Kinesis is a fully managed, cloud-based service designed for processing real-time data from extensive, distributed streams. This service can aggregate data from various sources, including website interactions, IT logs, social media updates, billing transactions, and IoT device sensor readings. Once data is captured by Amazon Kinesis, it can be processed using the KCL for various purposes such as data analysis, real-time dashboards, and archival solutions. While direct processing through the Amazon Kinesis API is an option, the KCL simplifies many complexities involved in distributed data processing, allowing you to concentrate on the specifics of record processing. The KCL provides features like automatic load balancing, record checkpointing, and instance failure handling.

The Node.js implementation of the KCL operates through the MultiLangDaemon. The primary functionality (interfacing with Amazon Kinesis, managing load balance, and addressing instance failures) is handled in Java, while the Node.js KCL communicates with this Java daemon using a multi-language protocol.

For those eager to get hands-on, there is a comprehensive tutorial available on GitHub for integrating Amazon KCL with Node.js. Additionally, KCL is also accessible in other programming languages like Java, Python, and Ruby.

Overview of Amazon Kinesis and KCL

The diagram below illustrates the fundamental components of a typical Amazon Kinesis application:

Producers, which can include EC2 instances, conventional servers, or user devices, channel data into an Amazon Kinesis stream. This stream is composed of one or more shards, each capable of handling a specified number of transactions and throughput. Consumers then access the data in real time through the Amazon Kinesis API or the KCL for processing, with the processed information being forwarded to other services for archival, querying, or batch processing according to specific needs.

Creating a Producer for Amazon Kinesis Using Node.js

To initiate a consumer application, you must first develop a producer that will direct data into your stream. The AWS SDK for Node.js provides a user-friendly API for this task. Begin by installing the AWS SDK module via npm:

npm install aws-sdk

There are multiple methods to supply credential information to the AWS SDK, as detailed in the Getting Started guide for the AWS SDK in Node.js.

Once your setup is complete, you can begin crafting an Amazon Kinesis producer application to create a stream and populate it with records. Below is an example demonstrating how to establish a new stream:

var AWS = require('aws-sdk');

var kinesis = new AWS.Kinesis({region : 'us-west-2'});

function createStream(streamName, numberOfShards, callback) {
  var params = {
    ShardCount: numberOfShards,
    StreamName: streamName
  };

  // Create the new stream if it does not already exist.
  kinesis.createStream(params, function(err, data) {
    if (err && err.code !== 'ResourceInUseException') {
      callback(err);
      return;
    }
    // Ensure the stream is ACTIVE before pushing data.
    waitForStreamToBecomeActive(streamName, callback);
  });
}

function waitForStreamToBecomeActive(streamName, callback) {
  kinesis.describeStream({StreamName : streamName},
    function(err, data) {
      if (err) {
        callback(err);
        return;
      }

      if (data.StreamDescription.StreamStatus === 'ACTIVE') {
        callback();
      } else {
        setTimeout(function() {
          waitForStreamToBecomeActive(streamName, callback);
        }, 5000);
      }
    }
  );
}

After creating the stream, you can start inserting data into it. The following example demonstrates how to write random data into the stream:

function writeToKinesis(streamName) {
  var randomNumber = Math.floor(Math.random() * 100000);
  var data = 'data-' + randomNumber;
  var partitionKey = 'pk-' + randomNumber;
  var recordParams = {
    Data: data,
    PartitionKey: partitionKey,
    StreamName: streamName
  };

  kinesis.putRecord(recordParams, function(err, data) {
    if (err) {
      console.error(err);
    }
  });
}

Finally, to bring everything together, the following snippet calls the functions to create a new stream with two shards and populate it with data after confirming the stream status is active:

createStream('TestStream', 2, function(err) {
  if (err) {
    console.error('Error starting Kinesis producer: ' + err);
    return;
  }
  for (var i = 0; i < 10; ++i) {
    writeToKinesis('TestStream');
  }
});

Consuming Data from Amazon Kinesis Using KCL for Node.js

If you have at least one producer actively sending data to your stream, the next step is to leverage the Amazon KCL for Node.js to consume that data. Install the KCL module using npm:

npm install aws-kcl

Once the module is set up, you can create your KCL application by implementing a record processor. Each shard is managed by a single instance of a record processor, and the KCL automatically instantiates these for each shard. The record processor must implement three API functions:

  • initialize: Called once at the beginning of record processing for the shard. This is where you can set up resources, such as an S3 client, if you’re saving processed data in Amazon S3.
  • processRecords: Invoked periodically with new records from the stream. The KCL supports checkpointing, allowing the record processor to save its progress, so that in case of instance failure, processing can resume from the last checkpoint.
  • shutdown: Triggered when there are no remaining records to process or when the processor is no longer active. This is where you can implement cleanup logic for any resources created during initialization. If the shard has no more records (shutdown reason is TERMINATE), you should also checkpoint the last state to inform the KCL that processing is complete.

Here is an example of a record processor implementation:

var recordProcessor = {
  initialize: function(initializeInput, completeCallback) {
    // Initialization logic specific to your application.

    // Call completeCallback once initialization is finished.
    completeCallback();
  },
  processRecords: function(/* Function body goes here */) {
    // Logic for processing records goes here.
  },
  shutdown: function(shutdownInput, completeCallback) {
    // Cleanup logic goes here.

    // Call completeCallback to indicate shutdown is complete.
    completeCallback();
  }
};

For further insights on personal development, you may find this blog post beneficial. Additionally, for those looking to enhance their strategic HR skills, explore this resource. Lastly, for information about leadership programs at Amazon, check out this link.

Chanci Turner