Optimising Query Execution for Amazon Redshift Spectrum

Andrew Smith Posted on

The AWS Big Data Blog post Twelve Best Practices for Amazon Redshift Spectrum discusses a number of ways for optimising queries running on Redshift Spectrum.

Here I’m verifying ideas from that post, focusing in particular on using Redshift internals such as system views and query plans to confirm how changes in the query execution environment affects query performance, in particular with regards how much data is read from S3 and how much data is passed back to Redshift for further processing.

(1) The Test Environment

I’m starting with a single 111MB CSV file that I’ve uploaded to S3. I’ve created a new database called geographic_units in the AWS Glue catalogue and have run the following commands in Redshift to create an external schema and an external table for the file in Redshift Spectrum:

CREATE EXTERNAL SCHEMA geographic_units
FROM DATA CATALOG
DATABASE ‘geographic_units’
IAM_ROLE ‘arn:aws:iam::<my account number>:role/Redshift-S3-Glue’;

CREATE EXTERNAL TABLE geographic_units.fact_2000_2019(
anzsic06 varchar(7),
area varchar(7),
year char(4),
geo_count int,
ec_count int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
LINES TERMINATED BY ‘\n’
STORED AS TEXTFILE
LOCATION ‘s3://geographic-units-sample-data/’
TABLE PROPERTIES (‘skip.header.line.count’=’1’);

(2) Comparing CSV and Partitioned Parquet Files

First I’m comparing queries running on two versions of the file – the CSV file already described and a partitioned Parquet version of the same file. I created the partitioned Parquet version using the Redshift UNLOAD command:

UNLOAD (‘select anzsic06, area, year, geo_count, ec_count from geographic_units.fact_2000_2019’)
TO ‘s3://geographic-units-parquet/partitioned/’
IAM_ROLE ‘arn:aws:iam::<my account number>:role/Redshift-S3-Unload’
PARQUET
PARTITION BY (year);

I then created a Spectrum table for it:

CREATE EXTERNAL TABLE geographic_units.fact_2000_2019_parquet(
anzsic06 varchar(7),
area varchar(7),
geo_count int,
ec_count int)
PARTITIONED BY (year char(4))
STORED AS PARQUET
LOCATION ‘s3://geographic-units-parquet/partitioned/’;

and added the 20 year partitions, e.g.:

ALTER TABLE geographic_units.fact_2000_2019_parquet ADD PARTITION (year=’2000′)
LOCATION ‘s3://geographic-units-parquet/partitioned/year=2000’;

The SVL_S3QUERY_SUMMARY Redshift system view can be queried to obtain query stats. The results from running a SELECT COUNT(*) FROM … query on each table are:

spectrumqresults1

The Parquet table had a slower execution time – likely because of the partitioning creating many files, all of which had to be scanned for this query. More parallelism was possible though, again because of the additional files. Significantly, the Parquet query was cheaper to run, since Redshift Spectrum queries are costed by the number of bytes scanned. It scanned 1.8% of the bytes that the text file query did. This could be reduced even further if compression was used – both UNLOAD and
CREATE EXTERNAL TABLE support BZIP2 and GZIP compression.

The next query I ran on each table could make use of the partitioning: SELECT COUNT(*) FROM … WHERE year=’2012′

These are the resulting stats:

spectrumqresults2

The Parquet query was much faster than last time – it can be seen that only 1 file was scanned instead of the previous 21. The Parquet bytes scanned reduced significantly as well, meaning that this query was even cheaper to run.

The SVL_S3PARTITION Redshift system view can be used to confirm partition usage. When queried for the previous Parquet query we see:

spectrumqresults3

which confirms that just 1 partition was scanned to resolve the query.

(3) Setting Table Statistics for External Tables

The AWS Redshift Spectrum documentation states that: “Amazon Redshift doesn’t analyze external tables to generate the table statistics that the query optimizer uses to generate a query plan. If table statistics aren’t set for an external table, Amazon Redshift generates a query execution plan. Amazon Redshift generates this plan based on the assumption that external tables are the larger tables and local tables are the smaller tables.”

For this example I’m joining the Parquet fact table created above with a much smaller dimension table that I’ve loaded into Redshift. The query is shown within this EXPLAIN statement:

EXPLAIN SELECT d.code, d.description, f.geo_count, f.ec_count
FROM geographic_units.fact_2000_2019_parquet f
INNER JOIN spectrum.public.dim_area d
ON f.area = d.code
WHERE f.year = ‘2012’
AND f.area = ‘A152600’;

No table statistics have been set for the Parquet table on S3, and this is the query plan from the above:

spectrumqplan1

The row counts highlighted were estimated by the query optimiser for the Parquet table. I can set table size for this table either at creation time, or afterwards using:

ALTER TABLE geographic_units.fact_2000_2019_parquet SET TABLE PROPERTIES (‘numRows’=’5155186’);

With the same query the EXPLAIN statement yields:

spectrumqplan2

The query optimiser has clearly made use of the table statistic, estimating different numbers of rows and costs at various stages (in this particular case the structure of the overall plan has remained unchanged).

(4) Predicate Pushdown

From the AWS blog post I mentioned earlier: “The processing that is done in the Redshift Spectrum layer (the S3 scan, projection, filtering, and aggregation) is independent from any individual Amazon Redshift cluster. In general, any operations that can be pushed down to Redshift Spectrum experience a performance boost because of the powerful infrastructure that supports Redshift Spectrum. Using predicate pushdown also avoids consuming resources in the Amazon Redshift cluster.”

One SQL operation that doesn’t get pushed down to the Redshift Spectrum layer is DISTINCT. The following 2 queries are equivalent, one using DISTINCT and the other using GROUP BY:

SELECT DISTINCT anzsic06
FROM geographic_units.fact_2000_2019_parquet
WHERE year = ‘2012’;

SELECT anzsic06
FROM geographic_units.fact_2000_2019_parquet
WHERE year = ‘2012’
GROUP BY anzsic06;

After running these queries, the SVL_S3QUERY_SUMMARY Redshift system view shows the following (the DISTINCT query is the first one):

spectrumqresults4

Looking at the rows returned to Redshift for further processing, the DISTINCT query returned all 260574 rows in the partition for Redshift to perform the DISTINCT operation, and the GROUP BY query just returned the 316 rows that were the result of doing the GROUP BY. So this confirms that the latter pushed down its heavy lifting to the Redshift Spectrum layer and the former did not.

Using EXPLAIN to show the query plans confirms the same. This is for the DISTINCT:

spectrumqplan3

and this is for the GROUP BY:

spectrumqplan4

The S3 HashAggregate operation highlighted confirms that the GROUP BY was pushed down to the Redshift Spectrum layer.