# Using Apache Airflow and the Snowflake Data Warehouse to ingest Flume S3 data

Do you use Apache Flume to stage event-based log files in Amazon S3 before ingesting them in your database? Have you noticed .tmp files scattered throughout S3? Have you wondered what they are and how to deal with them? This article describes a simple solution to this common problem, using the Apache Airflow workflow manager and the Snowflake Data Warehouse.

## Goal: Data Integrity

Your goal is to ingest each event exactly once into your analytic database during ETL (extract-transfer-load). You do not want to leave any events behind, nor do you want to ingest any event more than once. Otherwise, your event counts will be wrong. If we assume that any particular event is in exactly one log file, the goal becomes ingesting each log file exactly once. At Sharethrough, we have seen that this data integrity goal cannot be met without dealing with those darn .tmp files.

## Problem: duplicated .tmp files from Flume/S3 anomalies

In our EC2/Flume/S3 environment, we see a few dozen .tmp files in every hourly batch of several thousand files. In our case, some these.tmp files are duplicates of other, normal, files without that extension. But sometimes these .tmp files do not have a corresponding file without the extension.

The following directory listing shows normal, regular log files, created by adserver A through D. The log file from adserver-E has a .tmp file that duplicates the normal file. The log file from adsever-F exists only as a .tmp file, without a corresponding normal file.

1
2
3
4
5
6
7
8


As far we we know, this .tmp file anomaly originates with the way Apache Flume writes files. Flume assumes a normal filesystem. In a normal filesystem, a file is visible to other processes as gets written, and renaming a file is atomic. Under these assumptions, it uses the .tmp file during writing, then renames when the file becomes fully written. In such cases, one would not ingest the growing .tmp files, and only ingest regular ones.

But S3 semantics seem to be different: a file is already fully written once when it becomes visible to other processes, and a file is renamed by first copying the .tmp file to another file without that extension, then deleting the source .tmp file.

Since this “rename” is not atomic, the process can fail. If the copy fails, only the original .tmp version remains (adserver-F above). If the delete fails, both copies remain (adserver-E above).

This situation poses a threat to data integrity. If we ingested all files in this directory, our even counts would be too high (i.e. we duplicated events from adserver-E). But if we ignore all .tmp files, our event counts would be too low; we failed to include events form adserver-F. We need a simple way to ingest .tmp files, but only when we lack corresponding normal, non-tmp file. This is where Apache Airflow can help.

## Apache Airflow solution

Apache Airflow is a workflow manager very well-suited to ETL. Airflow uses hooks to manage basic connectivity to data sources, and operators to perform dynamic data processing.

For example, you can store encrypted S3 credentials in the Airflow backend CONNECTION table. Airflow’s S3Hook can access those credentials, and the Airflow S3KeySensor operator can use that S3Hook to continually poll S3 looking for a certain file, waiting until appears before continuing the ETL.

Airflow comes with a full suite of hooks and operators for most data systems. But Airflow really shines when you create your own operators and hooks by inheriting from the Airflow abstract base classes.

We use Airflow extensibility to create an operator that solves this Flume S3 .tmp file problem. The Operator, SnowflakeFlumeS3Copy(), implements the following logic to ingest each log file exactly once, despite occasional duplicated .tmp files.

1. Ingest all normal files (i.e. files that lack the .tmp extension)
2. Using a common Python interface into Amazon AWS S3 boto (already included in Apache Airflow), get a list of .tmp files; this list usually has only a couple dozen .tmp files.
3. Iterate through the list of .tmp files: for each file, trim the .tmp extension from the filename and use boto to see if the non-tmp version of that file exists. If it does exist, then it must have already been ingested during Step 1, so we do not want to also ingest this duplicative .tmp file version.
4. Ingest all of the .tmp files that lack a non-tmp version.

## Snowflake data warehouse features

The Snowflake Data Warehouse ingests data using its COPY INTO command. A couple of its features make this Airflow operator easy to code and maintain. Most important, Snowflake automatically keeps track of which files it has already ingested. So whenever we ingest a particular hour, we also ask Snowflake to ingest the previous hour. Snowflake will then look at the previous hour’s files, and ingest only those files that arrived too late to be ingested during the previous hour’s run. This Snowflake feature is the key to our confidence that we are ingesting every file, a fundamental element of our data integrity goals.

The Snowflake COPY INTO command also has a PATTERN parameter. We give it a regular expression that filters for our regular files only. This is how we are sure that our Step 1 above ingests only our regular files, those lacking a .tmp extension.

Finally, the Snowflake COPY INTO command has a FILES parameter. We give this parameter the list of non-duplicative .tmp files from the list generated in Step 3 above. This is how we are sure that our Step 4 ingests only those .tmp files that we need.

Below is an example Apache Airflow task definition that uses this SnowflakeFlumeS3Copy() operator. As you can see, the ETL author does not need to worry about the non-trivial logic encapsulated by the Airflow operator. Relevant parameters are explained below.

1
2
3
4
5
6
7
8
9
10
11
12
13
dag=dag,
source_region_keys=['us-east-1','eu-central-1'],
source_hour='{{ ts }}',
snowflake_conn_id='airflow_wh',
s3_conn_id='airflow_s3',
provide_context=True,
depends_on_past=False,
)

• source_stage - Stage is a fundamental Snowflake concept. It is essentially a pointer to an S3 bucket and prefix, usually also including access credentials. It defines the data source.
• source_region_keys - Region keys are peculiar to the way Sharethrough organizes data in S3. These strings are part of our S3 subdirectory organization, and we must iterate through them. Your S3 conventions are likely different.
• source_hour='{{ ts }}' - We use several Apache Airflow variables to pass context to instantiated tasks. The ts variable is the hourly timestamp that defines the ETL batch. The contents of the double brackets get expanded by Airflow’s Jinja templating to strings like ‘2017-07-14T16:00:00’
• destination_table - Obviously, the target Snowflake table
• snowflake_conn_id, s3_conn_id - These are data source connections available to Apache Airflow.

## SnowflakeFlumeS3Copy() source code

The source code for Sharethrough’s SnowflakeFlumeS3Copy() operator is available on GitHub. You will need to modify the code relating to S3 subdirectory naming conventions; we haven’t attempted to generalize support for any arbitrary convention. Also, this code uses Snowflake’s data transformation option to include etl_batch_tag for idempotent and parallal ETL, and this transformation is easy to remove if necessary. But the boto and Snowflake parts should be generally applicable to most situations.

## Conclusion

Combining Apache Airflow and the Snowflake Data Warehouse makes it possible for us to solve non-trivial data ingest problems. This example would be hard to solve without Airflow’s extensibility, and Snowflake’s features simplify many aspects of data ingestion. Together, these tools make it easy to provide a high level of data integrity to our ETL processes.