Posted on 24 Jul 19 by Jordan Huizenga - Full-stack Developer
A Guide to Automated Workflows with AWS CloudFormation and Glue
At KZN Group we are often tasked with performing data processing for large amounts of data, often stored in some kind of data warehouse, lake or mart. The traditional name given to this processing is Extract, Transform, and Load, or ETL. We also like to practice a DevOps culture when it comes to delivering software for clients which for us means enabling everyone on our team to be involved in the entire software delivery process and utilizing as much automation as possible to ensure we are delivering quality code, with confidence in a timely manner. The problem we faced was, how can we deliver these data processing solutions for our clients, while still leveraging the benefits of our DevOps approach to software?
The answer: AWS CloudFormation and AWS Glue
AWS Glue is a fully managed extract, transform, and load (ETL) service (for more, see: https://aws.amazon.com/glue/). One of the core utilities in AWS Glue, are the AWS Glue Jobs. These jobs can be scala or python scripts which are deployed and run on a highly scalable, fully managed, EMR cluster, so that developers can have on-demand, pay-as-you-go access to high compute power without having to worry about managing the underlying nodes themselves. While this all sounds good, like any AWS offering it comes with it’s subtle quirks and hidden problems. At KZN Group, we have developed a simple workflow for developing, testing and deploying AWS Glue Jobs, with as much automation as possible, that we’d like to share.
For the purpose of this blog post, all examples and demonstrations will be made using python and pyspark, but the conecpts should be able to be translated to scala and spark. A copy of the sample code can be found in the Github repository here https://github.com/KZNGroup/glue-workflow-example.
In order to start writing our data transformations with AWS glue, we first refer to the documentation (https://docs.aws.amazon.com/glue/latest/dg/author-job.html#author-job-workflow). Here, AWS outlines an example workflow for creating and using our jobs to process our data. Unfortunately, the actual script writing process, where we would handle most of our business logic, is described here (https://docs.aws.amazon.com/glue/latest/dg/console-edit-script.html) and gives us two choices: edit the script via the AWS console, or upload an existing script to S3. Despite AWS’ recommendations to use the console editor, we’d much rather have a standard git-based approach where code can be PR’d and reviewed prior to being deployed to production. To add to this, editing the job scripts via the console automatically saves the code to S3, and while that is good for experimentation in sandbox or development accounts, it’s not great for deployment to production environments. Since our goal is deliver code with confidence, and no unintended changes, we need our production deployments to be as similar to our development and test deployments as possible, so we should come up with some way to iteratively develop and deploy our glue scripts. Further, the developer feedback experience is horrible when having to edit and re-run AWS Glue jobs, since the turnaround time from code change to execution can be over 10 minutes if the job requires a cold start (https://stackoverflow.com/a/56229236).
The easiest way to develop and deploy iteratively is to do it locally. Developers can use whatever tools they’re comfortable with, Git can be used for version control, and automation can be set up around pull requests for deployment of our code. The problem with AWS Glue is that once deployed, it runs on a proprietary version of pyspark that we can’t run locally. And even if we were to run it locally, we’d probably want to be writing data transformations against samples or actual data that we’ll be using - but since the data is most likely sensitive and very large, it’s unlikely we’d be able to get a dump of it on our local machine.
Thankfully, AWS provides us with a solution - Glue Dev Endpoints: a remote machine running the exact same version of pyspark that glue runs. We can connect to the dev endpoint with Zeppelin, providing a developer friendly environment very similar to Jupyter Notebooks for developing our ETL logic. For more information about Zeppelin and what it is, see https://zeppelin.apache.org/.
We launch glue dev endpoints to perform some data investigation, work out what joins are required where, and then copy the code to our local machine so it can be tracked in Git. There are some subtle differences between code that runs on the dev endpoints and what you put in glue, including how arguments are passed to the glue job and the fact that glue jobs have an implicit spark context already created. To workaround this we often have a simple _is_dev_endpoint flag in our glue jobs that would be set to true if we’re developing code against the dev endpoint, and run some bootstrapping things required for glue jobs if the variable is false, as follows:
_is_dev_endpoint = False
try:
# In dev there's no JOB_NAME in args, so code will raise an exception here
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'env', 'debug_mode'])
sc = SparkContext()
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
except Exception as e:
print("Exception:", e)
print("No job name, therefore we are running in dev environment...")
_is_dev_endpoint = True
# todo: move these default args to their own function or global,
# and use the keys to pass to getResolvedOptions above to avoid duplication
args = {
'JOB_NAME': 'Not used during development',
'env': 'dev',
'debug_mode': 'true',
}
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# Job code goes below here...
It’s also worth noting that if your glue job requires access to AWS resources located in private subnets, be prepared to setup a jump box and do some SSH port forwarding in order to access your dev endpoint.
Depsite having the ability to access glue specific funcionality through the use of the dev endpoints, we recommend to keep your code as “vanilla” pyspark as possible. An example of the differences between gluepyspark and vanilla pyspark is the way in which data is stored. While vanilla pyspark will often store data in a construct called a DataFrame (a fancy word for a data structure that stores tabular data - ie. named columns and rows), when using the glue specific functionality you are encouraged to store your data in a DynamicFrame, an AWS specific concept which is basically a wrapper around DataFrames with some common functionality added on top. Although useful when letting AWS Glue automatically generate scripts (https://docs.aws.amazon.com/glue/latest/dg/edit-script.html), the use of these functions make the code hard to test, since we can no longer reproduce the functionality locally, and so the use of these functions should be kept to a minimum.
If we keep our complex business logic to using vanilla DataFrame operations, we can easily create testable scenarios in our code. We can achieve this by extracting our business logic into simple, DataFrame in, DataFrame out type operations. As an example, if at some point in our job we had a part of code that had to filter out people 18 years old or younger, then we might have some code that looks like
def etl_job():
dynamicFrame = ... # read from S3
dynamicFrame2 = Filter.apply(frame = dynamicFrame, f = lambda x: x["age"] < 18)
# perform some other operations...
In order to fix this, we just convert the dynamic frame to a DataFrame, then extract the reading logic and business logic into separate functions:
def filter_dataframe_for_people_older_than_18(df):
return df.where(col('age') > 18)
def read_from_s3():
dynamicFrame = ... # read from S3
return dynamicFrame.toDF()
def etl_job():
df = read_from_s3()
df = filter_dataframe_for_people_older_than_18(df)
# perform some other operations...
Now we can have standalone unit tests for the business logic function, filter_dataframe_for_people_older_than_18. There is one further change we need to make, and that is ensure any glue specific imports that may be required for the reading and writing parts of our job are not imported when running our unit tests. To do this, we introduce another flag that indicates whether we are running under our unit test environment, and if so we refrain from importing anything glue related, like below:
import os
import sys
if os.environ.get('ENV') != 'test':
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql.functions import to_timestamp, ...
# other imports ...
With the abstractions written correctly, we can combine parts of our script with a local verison of pyspark (we are using a dockerized version of pyspark found here https://github.com/CoorpAcademy/docker-pyspark) to test pyspark-specific business logic. With the correct test setup, our unit test now looks like:
@pytest.mark.usefixtures("spark_session", "spark_context")
def test_filter_dataframe_for_people_older_than_18(spark_session, spark_context):
df = spark_context.parallelize([
(0, "bob", "jane", 16),
(1, "john", "smith", 18),
(2, "tester", "mctestface", 20),
]).toDF(["id", "first_name", "last_name", "age"])
result_df = job.filter_dataframe_for_people_older_than_18(df)
assert result_df.count() == 1
For more information about the setup of the test suite, and how to run these tests, refer to the Github repository.
This opens up the ability for us to test our code locally, but most of the time when we are dealing with data transformations we want to run against a realistic set of data, or sample of production data. In this scenario, we’d first need to deploy all our code to a test environment, run our job, inspect the results to ensure it’s what we want, and then deploy the, hopefully, same thing to production. With respect to getting this right, we’d need to set up some kind of automation! lucky AWS provides us with a few tools to help us on the way.
For information about what CloudFormation is, I encourage you to do a google search as there is probably thousands of blog posts on the subject. In order to use CloudFormation with our AWS glue scripts, we set up our repository with the following structure:
etl-jobs
├── etl-template.yml
├── my-awesome-etl/
│ ├── job.py
│ └── template.yml
The first thing to look at is the file named etl-template.yml in the etl-jobs folder. This contains a template that defines the common resources required for each glue job, and parameterizes the bits that will change between glue jobs, such as script location and schedule information.
GlueJob:
Type: AWS::Glue::Job
Properties:
Role: !GetAtt GlueJobRole.Arn
Command:
Name: glueetl
ScriptLocation:
!Sub s3://${ScriptsBucketName}/reports/${ReportName}/job.py
AllocatedCapacity: !Ref JobDPUs
Name: !Sub ${JobNamePrefix}-job
Connections:
Connections:
- !Ref ConnectionName
DefaultArguments:
"--job-bookmark-option": !If [UseJobBookmark, "job-bookmark-enable", "job-bookmark-disable"]
"--temp_bucket": !Ref TempBucket
"--debug_mode": "false"
"--connection_name": !Ref ConnectionName
"--aws_account_id": !Ref AWS::AccountId
A Glue Job requires several things, an IAM Role that, at minimum, must include the AWS Glue Service Role Manage Policy permissions (arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole), a Command parameter that includes a name that MUST be glueetl and a ScriptLocation parameter for the S3 location of the glue script to run. Most other options are all optional: the Connections parameter is handy for giving Glue access to Redshift clusters or RDS instances without passing around credentials, and also allows Glue access to Redshift clusters and RDS instances in private subnets, the DefaultArguments parameter allows us to pass in any arbirtrary argument - helpful for environment specific things such as a bucket name or account ID.
In order to take advantage of the above template, we need to create another template that references it. In CloudFormation terms, we would be adding it as a nested stack. Each ETL job will have it’s own template containing a reference to this stack as follows:
ETLStack:
Type: AWS::CloudFormation::Stack
Properties:
TemplateURL: '../etl-template.yml'
Parameters:
TriggerSchedule: cron(0 21 ? * * *)
ReportName: my-awesome-etl
ConnectionName: my-awesome-etl-connection
JobNamePrefix: !Ref AWS::StackName
JobDPUs: 2
UseJobBookmark: False
TempBucketName: !Ref TempBucketName
ScriptsBucketName: !Ref ScriptsBucketName
SourceBucketName: !Ref SourceBucketName
This helps us reduce code duplication, but should be used carefully in case the jobs you are deploying require vastly different configurations. There are some minor parameters that will always differ between ETL jobs, these we hardcode (such as the trigger schedule), but there are also some parameters that we can’t hardcode as these will differ between environments (ie. development, test and production), so these must be passed in as a parameter (such as the ScriptsBucketName parameter).
We store this template inside of it’s own folder under etl-jobs, and include any other resources related only to the job.
The job file itself will need to be uploaded to S3, and the S3 location referenced in the template, so we need to script up the uploading to S3 separately ourselves. There is an update to the AWS CLI (https://github.com/aws/aws-cli/pull/4019) that now allows us to make a local reference to the job file and the package command will automatically handle the uploading and reference replacement for us, however since we already had the nested stack setup we couldn’t use this feature, but in the future we plan to refactor our setup to take advantage of it.
Once it’s all setup, deployment to any environment is just a matter of running the package and deploy commands, for example:
export SCRIPT_BUCKET= # Here we need to set the name of an S3 bucket where we can store our glue scripts
aws s3 cp my-awesome-etl/job.py $SCRIPT_BUCKET/my-awesome-etl/job.py
aws cloudformation package --template-file my-awesome-etl/template.yml --output-template-file out.yml ...
aws cloudformation deploy --template-file out.yml --stack-name my-awesome-etl --parameter-overrides ScriptsBucketName=$SCRIPT_BUCKET ...
Once it is deployed to a non-production environment, testing can begin! If you inspect our sample repository, we include a script that automatically handles the creation of the AWS services required for the deployment to work, including the S3 bucket for storing the glue job scripts.
Testing our ETL job once it is deployed is just a matter of following these steps:
As an example, we have a job that reads from S3 via the AWS Glue Data Catalogue, performs some transformations on the data, and writes the data to redshift.
In order to test this, our integration test was just a python script that starts the job, waits for the job to finish and perform as few redshift queries to ensure that the data is there. The only manual steps involved is getting some sample data into S3 in the first place, and potentially performing some manual inspections on the resulting data that would otherwise be hard to automatically inspect (ie. asking yourself, does the data look right?)
AWS Glue is a great platform for deploying pyspark applications without having to worry about the underlying infrastructure. Automating our deployment and testing process is a little tricky but we think we’ve come a long way into making it as elegant as possible.
There are still some caveats to this approach that I think you should be aware of,
There are also some cool things to look forward to that we haven’t quite got the chance to play with, including the new Workflows feature for AWS Glue, https://docs.aws.amazon.com/glue/latest/dg/orchestrate-using-workflows.html. While it looks promising, we are still waiting for proper CloudFormation support before we can begin to incorporate into our existing development and deployment workflow.
Once again, if you would like to check out a sample repository using this approach, it’s available on Github at https://github.com/KZNGroup/glue-workflow-example/. The example currently doesn’t work but it can give you a good idea on how to structure things.