Running ETL Jobs using Amazon Athena

Andrew Smith Posted on

Amazon Athena supports the ‘CREATE TABLE AS’ and ‘INSERT INTO’ statements, which can be used to transform data between a source and a destination.

In this post I’m using them to optimise the storage of data that is received into S3 as files of JSON objects. Specifically, I’m using CREATE TABLE AS (CTAS) to create a table for the JSON data that is:

  • in Parquet format
  • compressed
  • partitioned

The benefits that result from this transformation are that:

  • the data is in a columnar format, which results in less data being read for some queries
  • the compression results in less data being read for queries
  • the partitioning results in less data being read for queries that are selective by partition

Reading less data is valuable when using Athena, since the service is charged by the amount of data read.

Note that Amazon Kinesis Firehose could be used as an alternative mechanism for generating partitioned, compressed, Parquet files in S3 from JSON sources, in the case where the JSON data is naturally streaming. The mechanism provided in this post is more generally applicable however, as many other types of transformation can also be implemented in the CTAS and INSERT INTO statements. It’s suitable for non-streaming sources also.

After the initial CTAS table creation, I then have an Athena INSERT INTO statement that runs once per hour, that loads newly arrived JSON files into the optimised table. The hourly ETL is illustrated as follows:

AthenaIoTSchematic

The construction of this process is described below, and consists of the following steps:

  • Create the S3 buckets and set up the data feed
  • Create the Athena source and staging tables
  • Use the Athena CTAS statement to create the optimised destination table
  • Create the Lambda function that runs the hourly INSERT INTO update
  • Set up the hourly EventBridge schedule

The use of a staging bucket means that we have a static data set to load from, and to delete or archive once we know it’s loaded OK. (The messages bucket where the JSON files arrive is frequently being updated with new files.)

1) Create the S3 buckets and set up the data feed

I used the S3 management console to create 3 buckets with these names:
iot-sensordata-messsages
iot-sensordata-staging
iot-sensordata-processed

A top-level folder called partitioned/ was created for the iot-sensordata-processed bucket also, since Athena partitions can only be written to a folder and not to the root of a bucket.

I then set up a data feed into the iot-sensordata-messsages bucket that created files containing JSON objects, each file containing multiple objects separated by a new line (required by Athena):

{“deviceid”: “1”, “datetime”: “2020-05-12 09:36:36”, “temperature”: “4”, “humidity”: “81”, “winddirection”: “45”, “windintensity”: “77”, “rainheight”: “26”}
{“deviceid”: “1”, “datetime”: “2020-05-12 09:36:48”, “temperature”: “20”, “humidity”: “80”, “winddirection”: “199”, “windintensity”: “57”, “rainheight”: “22”}
{“deviceid”: “1”, “datetime”: “2020-05-12 09:37:00”, “temperature”: “14”, “humidity”: “88”, “winddirection”: “280”, “windintensity”: “96”, “rainheight”: “17”}
{“deviceid”: “1”, “datetime”: “2020-05-12 09:37:12”, “temperature”: “43”, “humidity”: “25”, “winddirection”: “80”, “windintensity”: “16”, “rainheight”: “15”}
{“deviceid”: “1”, “datetime”: “2020-05-12 09:37:24”, “temperature”: “43”, “humidity”: “24”, “winddirection”: “285”, “windintensity”: “13”, “rainheight”: “25”}

When the hourly ETL job runs, everything in this bucket is moved to iot-sensordata-staging, from where it is loaded into the target table. At the start of the next hourly run, everything in staging is then deleted if the prior load was successful.

2) Create the Athena source and staging tables

The Athena source table that reads the JSON encoded files from the iot-sensordata-messages bucket is created using the following Hive DDL:

CREATE EXTERNAL TABLE iot_sensor_data.iot_data_messages
(
deviceid int,
datetime timestamp,
temperature smallint,
humidity smallint,
winddirection smallint,
windintensity smallint,
rainHeight smallint
)
ROW FORMAT SERDE ‘org.openx.data.jsonserde.JsonSerDe’
LOCATION ‘s3://iot-sensordata-messages/’;

A JSON SERDE library is specified here to handle the conversion into table rows. Almost identical DDL is used to create the table that maps to the staging bucket, just changing the table name to be iot_sensor_data.iot_data_staging and the bucket name to be s3://iot-sensordata-staging/. Prior to creating both tables the iot_sensor_data database referred to above was created in the AWS Glue catalogue. The tables appear there also, as well as being visible in the Athena console.

3) Run the Athena CTAS statement to create the optimised destination table

The Athena CTAS statement for the target table creation is:

CREATE TABLE iot_sensor_data.iot_data_processed
WITH
(
external_location = ‘s3://iot-sensordata-processed/partitioned/’,
format = ‘PARQUET’,
parquet_compression = ‘SNAPPY’,
partitioned_by = ARRAY[‘day’]
)
AS SELECT deviceid, datetime, temperature, humidity, winddirection, windintensity, rainheight, date_format(datetime, ‘%Y-%m-%d’) as day
FROM “iot_data_messages”;

So the Parquet and compression formats can be seen there, as well as the partitioning which has been chosen to be by day. Note that the partition column(s) must appear at the end of the source SELECT statement.

4) Create the Lambda function that runs the hourly INSERT INTO update

Now the Lambda function is created, which uses a Python 3 script (and the Lambda Python 3.8 runtime) to run the hourly ETL. The script is in my GitHub repository and is explained in sections below.

A) Setup

As part of the general initialisation below, the Athena INSERT INTO statement can be seen, again specifying a partition column similar to the CTAS statement above.

AthenaIoTScript1

B) Lambda Handler

The Lambda handler function is next, which just contains the high level logic for the ETL. This is:
– check last hourly run status
– if it was successful then delete files from staging
– move all files from messages bucket to staging bucket
– load data from the staging bucket into the optimised table

AthenaIoTScript2

C) Check last ETL run status

The status of the last ETL run is checked next. The Athena query_id of the prior run’s INSERT INTO is stored in a DynamoDB table and is retrieved here and used to acquire the status of that run. (The run couldn’t be checked at the time it was invoked, because Boto 3 Athena queries are invoked asynchronously, and the script could exit before it completed.)

AthenaIoTScript3png

D) Empty the staging bucket

This step deletes all files in the staging bucket as they were loaded in the last run. As an alternative to deletion they could be moved to an archive bucket instead and kept for a limited time. This would be more suited to a production environment as they could be accessed and (re)loaded in the case of any script problems.

AthenaIoTScript4png

E) Move new message files into staging

The move is achieved via a copy and a delete:

AthenaIoTScript5png

F) Load data from staging

The INSERT INTO statement is run here, to do the incremental load. The Athena query id is then saved so we can check its status in the next run.

AthenaIoTScript6png

G) Helper function

Finally there’s a little DynamoDB helper function, used because we need to do the write from two places:

AthenaIoTScript7png

The memory for the Lambda function was left at the default of 128MB. Runtime statistics showed a maximum of 81MB being used. The timeout was set to 1 minute, to allow for the processing of many S3 files and for the subsequent load.

5) Set up the hourly EventBridge schedule

The final step is to use EventBridge to schedule the Lambda function to run once per hour. A rule is set up with the Lambda function as a target and with a schedule set as follows:

AthenaIoTEventBridge

Error handling and retries

The Python script uses no exception handlers, because any errors that could occur will be unplanned and should fail the function. Transient errors have the chance to be resolved by Lambda’s retry policy. The function is called asynchronously by EventBridge, so in the case of a failure Lambda will retry 2 times more (this is the default setting).

The function has been written so that it is safe to retry, with key logic occurring just after the copying of message files into staging. This logic will prevent the function deleting these files from staging on any retry, in the case where they have not yet been loaded.

One limitation of the function is that if 2 instances of it are run concurrently then subject to the exact timing, one could delete the files just staged by the other, before they are loaded. A workaround in the case of this ever happening would be for the staged files to be archived by the function rather than deleted, so that they can be loaded once the problem is diagnosed.

An enhancement to the ETL that would mitigate both the last 2 points would be to move the deletion/archiving of previously loaded staged files into a separate Lambda function, that ran before this one.

The Lambda function execution role

The Lambda function needs to run against an execution role, which gives it the permissions to access various AWS services and resources. I initially based the role on the AWSLambdaBasicExecutionRole managed policy, which gives it access to CloudWatch logs. I then added the following policies:

(A) Access to the 2 Athena functions required, for the required workgroup

AthenaIoTRole1

(B) Access to the 2 DynamoDB functions required, for the required table

AthenaIoTRole2

(C) Access to the 3 S3 buckets

Note that for the iot-sensordata-processed bucket there are actions that aren’t defined for the other two. This is the bucket that Athena writes to via the INSERT INTO statement, and without these additional actions I was receiving an “Unable to verify/create output bucket” error. This support article from AWS describes the S3 permissions that Athena needs.

AthenaIoTRole3

(D) Access to the Glue Catalog

This last section shows permissions required for Athena to access table definitions in the Glue catalog. The motivation for the first section below isn’t obvious just from looking at the code above, but the reason for its inclusion is documented by AWS here.

AthenaIoTRole4