Processing IoT Data using Amazon Kinesis

Andrew Smith Posted on

In this post I’m using Kinesis Data Streams, Kinesis Data Firehose and Kinesis Data Analytics to process data received from an IoT sensor.

The inbound IoT data stream is written to Redshift, has rolling averages calculated over it which are again written to Redshift, and is filtered to discover data that’s above an alert threshold.

The top level architecture is:

KinesisIoTSchematic

In summary:

  • AWS IoT Core is configured with 2 actions, one that writes IoT messages to a Kinesis Firehose stream that then writes them to Redshift, and another that writes the same messages to a Kinesis data stream for processing by Kinesis Analytics
  • Kinesis Analytics joins the IoT message data with static lookup data that’s read from S3, and then calculates a rolling average of a subset of this data which is written to a Kinesis Firehose stream for routing to Redshift
  • Kinesis Analytics detects data that is over a given threshold and writes it to a Kinesis data stream. A Lambda function receives data from this stream and formats it into an alert message that is sent to an SNS topic, e.g. for emailing to subscribers

The steps to achieve this are described below.

1. Creating the Kinesis Data and Firehose Streams

Kinesis data streams

Two Kinesis data streams are used in the architecture – one from IoT Core to Kinesis Analytics and one from Kinesis Analytics to Lambda. For this example both were created with a single shard, since data volumes will be very light.

Kinesis Firehose stream from IoT Core to Redshift

Kinesis Firehose streams with Redshift destinations write their data first to an intermediate S3 bucket. I created a new bucket and set the Firehose buffering so that the stream would write every 1MB or every 60 seconds (in practice here the latter will generally be the condition that flushes the buffer).

The IoT messages that travel the stream use JSON and look like this:

{“deviceid”: “1”, “datetime”: “2020-05-11 09:42:05”, “temperature”: “33”, “humidity”: “85”, “winddirection”: “304”, “windintensity”: “75”, “rainheight”: “29”}

I created a table in a new Redshift database to mirror this structure:

create table iotsensordata.public.sensordata
(
deviceid int,
datetime timestamp,
temperature int,
humidity int,
winddirection int,
windintensity int,
rainheight int
);

The Firehose COPY command configured for the stream then looks as follows:

COPY sensordata (deviceid,datetime,temperature,humidity,winddirection,windintensity,rainheight) FROM ‘s3://iot-kinesis-fh-redshift-sensordata-msgs/‘ CREDENTIALS ‘aws_iam_role=arn:aws:iam:::role/‘ MANIFEST JSON ‘auto’;

The JSON ‘auto’ property handles the conversion from JSON into the SQL table and relies on column names in the destination table matching JSON names in the source object (not necessarily in the same order).

Kinesis Firehose stream from Kinesis Analytics to Redshift

Similar to the previous stream I created an intermediate S3 bucket for this second Firehose stream and set the S3 buffering so that the stream would write every 1MB or every 60 seconds. The stream outputs rows that require this table schema in Redshift:

create table iotsensordata.public.sensordata_avgs
(
deviceid int,
weatherstationname varchar(64),
min_datetime timestamp,
avg_temperature decimal(4,1),
avg_humidity decimal(4,1),
msg_count int
);

The Firehose COPY command configured for the stream is as follows:

COPY sensordata_avgs (deviceid,weatherstationname,min_datetime,avg_temperature,avg_humidity,msg_count) FROM ‘s3://iot-kinesis-fh-redshift-sensordata-avgs/‘ CREDENTIALS ‘aws_iam_role=arn:aws:iam:::role/‘ MANIFEST JSON ‘auto’;

Kinesis Firehose stream from Kinesis Analytics to S3

This stream is used to route any errors occurring in the default Kinesis Analytics error stream to an S3 bucket. I created the stream using the default configuration values for streams with S3 destinations, with the exception that buffering was changed to 1MB or every 60 seconds, as with the previous 2 streams.

2. Configuring AWS IoT Core

I’m using the IoT infrastructure that I set up in a previous post, where an IoT device emulated by a Python script sends environmental sensor data via MQTT to AWS IoT Core.

I’ve changed the IoT Core rule used in that solution to send the IoT messages received to 2 of the streams created above:

KinesisIoTRule

Both actions configured here share an IAM service role that allows IoT Core to write data to the streams.

3. Creating the Kinesis Analytics Application

A Kinesis Analytics application is made up of streaming sources, destinations and application SQL. These are described below.

Kinesis Analytics Sources

The Analytics application was set up with these sources:

KinesisAppSources

The first is the stream that IoT Core writes to and the second is a text file stored in S3 that contains lookup reference data:

deviceid,weatherstationname
1,UK-Worthing
2,UK-Southampton
3,UK-Reading
4,UK-Northampton
5,UK-Manchester

The first column is the device ID that’s received within IoT messages and the second maps the device ID to the name of the environmental sensor that it refers to.

Kinesis Analytics Destinations

The following are the destinations set up for the Analytics application:

KinesisAppDestinations

The IoT_Analysis_Alerts_Stream Kinesis stream carries alert data to a Lambda function, the IoT_Avgs_Stream Firehose stream writes rolling average data to Redshift and the IoT_Analytics_Errors_Stream Firehose stream write any Analytics application errors to an S3 bucket.

Kinesis Analytics SQL

The SQL written for the Kinesis Analytics application is described in the sections below.

First the weather station lookup data is joined with the inbound IoT message data. This joined data is used by all subsequent queries:

KinesisIoTScript1

Next rolling averages are created for the temperature and humidity data:

KinesisIoTScript2

So these averages are always looking back 1 minute from the latest message received. The IoT device sends 1 message every 12 seconds, so there will typically be 5 messages per minute.

Note that a sliding window is used here, which is overlapping window, i.e. there is an output that corresponds to every record in the source stream such that the same record can appear in more than one window. This is in contrast to the stagger windows and tumbling windows that are also supported by Kinesis Analytics.

Finally we have the alerting logic which filters messages that include a temperature of more than 40 degrees:

KinesisIoTScript3

Note that a stagger window is used here, to consolidate alerts that occur within a 2 minute window for a single weather station into a single output row.

The script above and the Lambda function below can be found in my GitHub repository.

4. Creating the Lambda Function for publishing alert messages to SNS

I created a Lambda function that uses the Python 3.8 runtime and runs the following script:

KinesisIoTLambda

I set the IoT_Analysis_Alerts_Stream Kinesis stream defined earlier as a trigger, where the event source mapping that Lambda implements for Kinesis polls this stream for messages. The poller can deliver a batch of messages to the function (which is invoked synchronously), hence the need for the loop seen above that iterates through the one or more messages received.

I set the event mapping batch size to deliver at most 100 messages, and to wait a maximum of 60 seconds when gathering messages. I also set the starting position to TRIM_HORIZON, which reads the stream from the oldest messages onwards. I left retry attempts set to the default of 10, and didn’t create an on-failure destination (a SQS queue or SNS topic) for this example but in production usage this would be valuable.

Aside from the event mapping, I set the Lambda function to have a timeout of 20 seconds and to run with 128 RAM.

Finally, the execution role associated with the Lambda function is given permissions to read from the specific Kinesis stream configured for its trigger, to publish to the specific SNS topic used for the alerts, and to write to CloudWatch.

5. Results

Using SQL Workbench/J to query Redshift, example data resulting from the Kinesis Firehose stream from IoT Core to Redshift is:

KinesisRedshift1

Example data from the Kinesis Analytics rolling averages calculation is:

KinesisRedshift2

The msg_count column here is interesting because it shoes the Kinesis Analytics sliding window in operation. The window looks back one minute, emitting an output for every row that arrives in its source stream. So the first row in the table is the average of just that row, since there are no other rows in the minute preceding it. New rows arrive into the source stream every 12 seconds, so successive rolling average calculations are running over more rows. Once the first minute of operation has elapsed, alternate rows then alternate between 5 and 6 rows. This reflects the fact that 12 seconds divides exactly into 1 minute, so with rows occurring at 0, 12, 24, 36, 48, 60 seconds, the row at 60 seconds may or may not be included in the same minute as row at 0 seconds, subject to the exact timing.

Finally, a typical alert email received by subscribing to the SNS topic published to by the Lambda function shown previously is:

Weather station “UK-Worthing” reported 3 alert(s) between 43°C and 46°C from 2020-05-20 14:58:28 to 2020-05-20 14:59:52.

So this shows how the stagger window used for alert generation in Kinesis Analytics consolidates multiple alerts that occur within a 2 minute window.