How to Succeed at Container Migrations on AWS
How to Succeed at Container Migrations on AWS
Apache Airflow is a phenomenal tool for building data flows. It's reliable, well-maintained, well-documented, and has plugins for just about everything. It's easy to extend, has pre-built deployment patterns, and is readily available in a variety of cloud environments.
However, it's also an aging tool, and has a few quirks that hearken to an earlier time in its development. Some issues include rigid DAG-centric dataflows, confusing differences between logical runtime and wall-clock execution time, and significant limitations in the size of individual DAGs.
Several tools have arisen to address these features (among which Prefect is a personal favorite). However, there's something to be said for building on the rock-solid foundation that is Airflow, and at Rearc Data, we also have a substantial investment into making Airflow an efficient platform for our data flows. So what can we do within that environment to streamline the process of building maintainable dataflows?
One pattern we've started using is data-driven DAG definitions using operator factories. This is not necessarily an alternative to the traditional way of defining DAGs, nor necessarily exclusive of using Airflow's new Task API. While this new pattern doesn't fundamentally alter the limitations of Airflow, it does dramatically reduce the burden of developing new dataflows. Let's dig in using an example to motivate the discussion.
As a motivating example, let's say we want to print "hello world" in Docker container (I'm going to use AWS managed containers with ECS). Fortunately, there's a pre-built operator that we can use:
hello_world = EcsOperator(
task_id="hello_world",
cluster="existing_cluster_name",
task_definition="existing_task_definition_name",
launch_type="FARGATE",
aws_conn_id="aws_ecs",
overrides={
"containerOverrides": [
{
"name": "hello-world-container",
"command": ["echo", "hello", "world"],
},
],
},
network_configuration={
"awsvpcConfiguration": {
"securityGroups": [os.environ.get("SECURITY_GROUP_ID", "sg-123abc")],
"subnets": [os.environ.get("SUBNET_ID", "subnet-123456ab")],
},
},
awslogs_group="/ecs/hello-world",
awslogs_stream_prefix="prefix_b/hello-world-container",
)
That's a lot of code to say "Run echo hello world
in a container on ECS". Write that a few times, and very quickly you'll decide you're fed up with all the boilerplate. What do we do when we some repetitive code? Make it a function, of course:
def make_ecs_operator(task_id, task_definition, command):
return EcsOperator(
...
)
So now we can get our operator just by calling that function:
hello_world_operator = make_ecs_operator(
task_id="hello_world",
task_definition="existing_task_definition_name",
command="echo hello world".split(),
)
Phew, that's so much better. This might even tide you over for a while.
A few months later, someone comes along and asks you to run a command in a different docker image. They want to use:
hello_world_operator = make_ecs_operator(
task_id="hello_world",
docker_image="alpine:latest", # <-- Docker Image, not Task Definition
command="echo hello world".split(),
)
To avoid a detour into discussion ECS, it's sufficient to summarize this change as turning our problem into a two-step task:
Ok, so if we want to update make_ecs_operator
to be able to run arbitrary Docker Images, we have two options. As one option, we could write a custom Airflow operator that pre-builds the task definition before running the ECS job:
.--------------------------------------------------.
| Custom Ecs Operator |
| 1. Create task definition |
| 2. Run ECS task |
'--------------------------------------------------'
Wrapping everything into a single operator, however, reduces visibility, fails to re-use standard operators (including future upgrades to those operators), and increases our technical debt. There are cases where custom operators are the best solution, but this isn't one of them.
The alternative is to write a pattern of operators where the first one creates the task definition and the second is a standard ECS operator that runs that task definition, something like:
.----------------------. .------------.
| create task definition | -----------> | run ecs task |
'----------------------' '------------'
Ok, that's not so bad. We can whip up the first as a Python Operator, and use the regular ECS Operator for the second node. We get code that looks something like the following:
def create_task_definition(docker_image):
# ...
return task_definition_arn
def make_ecs_operator(task_id, docker_image, command):
create_task_definition_operator = PythonOperator(
# ...
task_id=f"{task_id}_create",
python_callable=create_task_definition,
op_args=(docker_image,),
# ...
)
run_ecs_task_operator = EcsOperator(
# ...
task_id=f"{task_id}_run",
task_definition=XComArg(create_task_definition_operator, "return_value")
)
create_task_definition_operator >> run_ecs_task_operator
return ... # which one?
The last improvement we'll make here is to wrap these operators into a Task Group. Visually, this will make these two operators collapse into one node in the Airflow UI, since logically we're doing just one thing here; functionally, it'll also allow us to sequence this pair of operators as if they were a single operator, so our caller code doesn't have to know about the guts of how we use ECS.
def make_ecs_operator(task_id, docker_image, command):
with TaskGroup(group_id=task_id) as group:
create_task_definition_operator = PythonOperator(
# ...
task_id="create", # the task_id prefix is auto-added
)
run_ecs_task_operator = EcsOperator(
# ...
task_id="run",
)
create_task_definition_operator >> run_ecs_task_operator
return group
.-------------------------------.
| <task_id> |
| .------. .---. |
| | create | -----------> | run | |
| '------' '---' |
'-------------------------------'
The functional pattern above is fine, but can start to get convoluted when producing large or complicated patterns. You might end up with lots of function factories being called within other function factories, and passing arguments all over the place. At this point, taking an object-oriented approach begins to simplify things.
Let's shift the example. Let's say the ECS job we ran generated some data file, and now we want to publish that data to a Redshift SQL cluster. We'll need to manage some administrivia along the way, and then do a variety of things with each table. Let's say we end up wanting a pattern like the following:
Handle a bunch of administrivia... Load the data
.-----------------------------------------------------------------------------------------------------------. .---------------.
| | | |
.---------------. .--------------. .---------.
| Validate file 1 | -. .-> | Recreate Table | ---> | Load Data |
'---------------' \ / '--------------' '---------'
\ /
.---------------. \ .--------------. .--------------------. / .--------------. .---------.
| Validate file 2 | -----+--> | Resume cluster | ---> | Ensure schema exists | -+-----> | Recreate Table | ---> | Load Data |
'---------------' / '--------------' '--------------------' \ '--------------' '---------'
/ \
.---------------. / \ .--------------. .---------.
| Validate file 3 | -' '-> | Recreate Table | ---> | Load Data |
'---------------' '--------------' '---------'
Let's just assume we're going to need to do all this more than once, and we're going to want another abstraction for this pattern. We could try to define this factory as a function:
def make_db_upload_operators(
task_id,
cluster_name,
schema,
tables_and_files
):
...
but clearly this is going to become a complicated function, or will depend on a lot of helper functions. You know what we call a pile of related functions that depend on the same data? A class!
class Table:
# Tracks the table name, its column structure, and the file it's sourced from
def make_validate_operator(self):
...
def make_recreate_operator(self):
...
def make_load_data_operator(self):
...
class DbUploadFactory:
def __init__(self, task_id, cluster_name, schema, tables: List[Table]):
...
def make_resume_cluster_operator(self):
...
def make_ensure_schema_exists_operator(self):
...
def make_operator(self):
with TaskGroup(group_id=self.task_id) as group_that_contains_everything:
# Call the various methods of this class and the Tables
# Hook those operators up correctly
...
return group_that_contains_everything
This is much more maintainable than a pile of loosely related functions. If we later want to restructure how we interact with our database, we need only find the object responsible for that part of our DAG and update it accordingly. All of our operators are neatly wrapped up in little groups, so nothing outside the factory (in general) needs to know how the internals are structured.
This pattern decouples the complexity of defining the DAG from the complexity of the resulting dataflow that Airflow sees, and thus also decouples the top-level DAG definition from the challenges of maintaining complicated DAG logic. The following simple DAG definition might result in a data flow with advanced, precisely architected logic:
with DAG(...) as dag:
generate_data_operator = ...
upload_operator = DbUploadFactory(
...,
tables=[
Table(...),
...
],
)
generate_data_operator >> upload_operator.make_operator()
It is possible to eliminate the need to explicitly call .make_operator
but I leave this as an exercise for the reader.
This pattern, for the most part, results in classes that contain some data and then convert that data into an Airflow operator (or group of operators). These classes aren't operators in their own right, and probably (in general) shouldn't support mutation. They simply convert metadata into a DAG sub-structure. Seems like a perfect candidate for a dataclass
:
@dataclass
class Table:
name: str
columns: List[str]
source_file: str
...
@dataclass
class DbUploadFactory:
task_id: str
cluster_name: str
schema: str
tables: List[Table]
...
At first glance, this doesn't seem much simpler than the original class. Why bother with dataclasses? Well, some possible advantages include:
mypy
or pydantic
)dacite
)This last bullet is something we have found particularly useful, so I'll dig into that a bit further.
Dataclasses are so tight and well-defined that they're pretty easy to directly serialize and deserialize into common markup formats, like JSON or YAML. Since each class is merely a mapping from metadata into a part of an Airflow DAG, it's not hard to build objects that represent complete, functional DAGs.
I'll note at this point that any additional classes shown below are just arbitrary abstractions; which particular abstractions are helpful for you will depend on the kinds of data flows you typically write. These particular abstractions are unlikely to be useful for you, but the pattern this follows can probably be of use to you.
At Rearc Data, we have a lot of data flows that look vaguely like:
.---------. .--------. .------------.
| Do lots | | Validate | | Publish data |
| of custom | ---> | output | ---> | to various |
| stuff | | files | | outputs |
'---------' '--------' '------------'
Great, let's define a top-level DAG object that looks like that:
@dataclass
class TypicalDag:
jobs: JobCollection
validators: ValidationCollection
publishers: PublishCollection
def make_dag(self) -> DAG:
with DAG(...) as dag:
jobs_op = self.jobs.make_operator()
validators_op = self.validators.make_operator()
publishers_op = self.publishers.make_operator()
jobs_op >> validators_op >> publishers_op
return dag
For the sake of brevity, I'll leave the definition of the various Collection
classes to your imagination.
Note that the above is general-purpose and isn't specific to any single data flow. We might have a bunch of data flows that all "Run some code, generate some files, validate them, then publish them", just maybe from different data sources or with differences in how we process the data. We can then write an adapter DAG file that loads metadata from a whole pile of metadata files and generates a DAG for each one. The following is a simple example of such an adapter:
# dags/load_yml_files.py
from pathlib import Path
from airflow import DAG
import yaml
import dacite
from our_own_code import TypicalDag
dag_dir = Path(__file__).parent
# For each YAML file in a particular directory...
for yaml_file_path in dag_dir.glob('typical_dags/**.yml'):
with open(yaml_file_path) as f:
dag_metadata = yaml.safe_load(f)
# ... generate a DAG from that metadata
dag_metadata_obj = dacite.from_dict(TypicalDag, dag_metadata)
dag = dag_metadata_obj.make_dag()
# See https://www.astronomer.io/guides/dynamically-generating-dags/
dag_name = yaml_file_path.with_suffix('').name
globals()[dag_name] = dag
That's it! Well, almost. Let's make an example YAML file (I'm just going to make stuff up for what goes into each Collection
object):
# dags/typical_dags/sample_dag.yml
jobs:
job_definitions:
- name: download_data
docker_image: my_docker_image
command: "python download_data.py"
- name: clean_data
depends_on: [download_data]
docker_image: my_docker_image
command: "python clean_data.py"
validators:
files:
- path: path/to/output/file.csv
validation_suite: file_expectations.json
publishers:
sftp:
host: sftp://my.file.server
connection_id: sftp_default
files:
- path/to/output/file.csv
s3:
bucket: my_public_bucket
prefix: some/prefix/
files:
- path/to/output/file.csv
.-------------------------------------. .------------. .--------.
| JOBS | | VALIDATE | | PUBLISH |
| .-------------. .----------. | | .--------. | | .----. |
| | download data | ---> | clean data | | ---> | | file.csv | | ---> | | sftp | |
| '-------------' '----------' | | '--------' | | '----' |
'-------------------------------------' '------------' | .----. |
| | s3 | |
| '----' |
'--------'
Cool, whatever all that means. But here's the interesting part: that YAML file represents a no-code DAG. Instead of building each DAG as custom Python code, however simple that code might be, we now have an entire DAG without any DAG-specific code. We've entirely decoupled the process of writing new dataflows from the task of maintaining useful, functional abstractions. We've also accomplished that without writing any custom Airflow Operators or losing any visibility into our dataflow from the Airflow UI.
Critically, we've done all this without any loss of functionality: nothing about this approach prevents you from writing DAGs in Python, or even writing DAGs that bypass your abstractions altogether. We need very few additional dependencies, no plugins, and no alteration to our underlying data platform.
We can upgrade every single DAG by updating our code in one location. Our DAG definition files store all information that differentiates our DAGs in one location, while everything they share in common is elsewhere; that makes these data files extremely descriptive and compact, and makes our code easier to maintain.
And that's not even to mention the utility of having your DAGs defined as data instead of as code. Lots of maintenance and self-analysis chores become simple when your dataflows are defined, themselves, as data rather than as arbitrary Python code.
In this article, we've discussed how to build maintainable DAGs, starting with simple factory functions, then with factory classes, then by simplifying those classes into Dataclasses, then finally by eliminating the need for custom DAG code altogether. The result is a pattern for supporting low-code or no-code DAGs in Airflow, making each individual DAG nothing more than its essential metadata.
At Rearc Data, we've leveraged this pattern to dramatically reduce development time and maintenance overhead for dozens of DAGs, allowing us to focus on the challenging parts of data engineering rather than re-writing boilerplate DAG code each time. We've been able to upgrade our DAGs in bulk, eliminating huge swaths of technical debt from the challenge of maintaining a large number of unrelated data flows. This has become a critical part of how we generate 440+ data products available on AWS Data Exchange here. If you are more interested in getting data than in engineering your own data flows and none of the products match exactly what you need, reach out to us at data@rearc.io, and we would love to work with you.
Read more about the latest and greatest work Rearc has been up to.
How to Succeed at Container Migrations on AWS
Rearc at AWS re:Invent 2024: A Journey of Innovation and Inspiration
A People-First Vocation: People Operations as a Calling
Use Azure's Workload Identity Federation to provide an Azure Pipeline the ability to securely access AWS APIs.
Tell us more about your custom needs.
We’ll get back to you, really fast
Kick-off meeting