Wikipedia word counter using Airflow and Spark — Part 1 — Airflow DAG

Anurag Chatterjee
7 min readDec 31, 2020

--

In this series, you will learn to use Airflow and Spark to create a simple word counter application for Wikipedia articles which can also be used to solve real-world ‘big-data’ problems such as:

1. Ingesting data from multiple sources to a staging area, and

2. Performing scheduled ETL (Extract, Transform and Load) to create curated datasets for analysis.

This article will guide you through a step-by-step process of creating this application.

The use case

Setting up an Airflow DAG (Directed Acyclic Graph) to download the text content from a Wikipedia API for a given topic, get the top 10 meaningful words in that article in terms of their count and finally send an email to you with the words and their counts. In case there are issues while getting the content using the API then also an email is triggered. While this is a trivial use case and of not much practical significance, however, in enterprises, there might be scenarios where a lot of data has to be received from some API/another source system, and then some distributed processing has to be performed on this received data, and then the processed data has to be stored/sent somewhere. In these cases, an automated pipeline using Airflow and Spark can be trusted to get the job done, assuming these tools and setups are available at your disposal.

The final result — an email with the word counts

Airflow tasks

There are many good introductions and guides to Airflow which someone just starting with Airflow should first read. Here, I will get straight to some of the more interesting things that I have performed in the DAG. In order to understand any Airflow DAG file, I have found the easiest way is to first identify where the relations between the tasks in the DAG are defined and then for each task identify the operator and in case it is a PythonOperator, study the Python callable. So, my suggestion is that you too start at the place where you see in my DAG code, which happens to be the last line in the code

start >> get_details_from_wikipedia_task >> [email_task, spark_submit_task] >> send_processed_file_as_email_task >> end

So basically the tasks in the DAG, other than start and end, which do nothing get the content from the Wikipedia API, send an email when the content downloads (i.e. the previous task passes), submit the Spark job for the word count, and finally send the processed word counts file as another email.

Below is the complete code for the DAG:

The interesting bits are as below:

Configuring SMTP server for sending emails using your own Google account

Follow the method here to set up the SMTP server. The airflow configuration file is located in ~/airflow which is where Airflow is installed by default. As an aside, if you have installed Airflow using Conda, then the Conda environment with Airflow should also have the Wikipedia package.

Communication between Airflow tasks using XCom

XCom lets tasks exchange small amounts of data and operate via push (send) and pull (receive) APIs. If a Python callable returns a value, it is automatically pushing that result as an XCom object. I used it in the get_details_from_wikipedia_api to push the location where the Wikipedia content is downloaded. This value is pulled in the spark_submit_task , since it is a parameter for the Spark job (more on this later). The value is also pulled in the send_processed_file_as_email method.

Airflow configurations using Airflow UI

The Airflow Admin tab has useful configuration settings. Do note that in an enterprise setting, you might not be able to see this section based on your role. More details on roles in Airflow can be found here. In our case, we will look at the Connections, variables, and XComs.

For connections, we are interested in the spark default connection. Here we use a local stand-alone Spark installation, hence the connection will look as below with the spark_home value depending on where you have downloaded the Spark binaries. Also, check this answer from SO on how to configure for cluster mode, which is what will be performed in the enterprise.

Spark connection in Airflow Admin

In the variables section, we can specify variables that are used by the DAG, here I specify the Wikipedia topic for which to download the content. Ideally, variables can be used for how many times a task needs to be retried before being marked as a failure, an API URL to send a request to, etc.

Setting the variable in Airflow

Inside the Xcom you should be able to see all the Xcom objects created within the DAGs. In case the Xcom object was created as a return value from a function as is the case with our DAG the key is return_value

Executing the Spark job from Airflow

In order to count the words in the Wikipedia article, a Spark job is written. This job takes as an argument the location of the file where the content of the topic is present. Looking closely at the spark_submit_task you can see that the SparkSubmitOperator is used. This operator takes the application parameter where we specify the entry point for our PySpark job and in the application args parameter, which is a list of arguments that are passed to the Spark job, we specify the file location, getting it using the Jinja template. Further details on the parameters for this operator can be found here.

I found this SO answer helpful to identify the various ways to trigger a Spark job from Airflow. As mentioned in the SO answer, I am able to see the logs from the Spark job in Airflow when I use the SparkSubmitOperator.

Logs from the Spark job captured in Airflow logs

For the interested reader, the code for the Spark job can be found here. I plan to write part 2 of this story to describe my learnings with the Spark job in detail.

The ‘‘provide_context’’ parameter

The PythonOperator has a parameter called ‘provide_context’ such that when it is set to true, the python callable which is executed as part of the Python Operator will receive a dictionary, which we can name context based on convention. Check the send_processed_file_as_email function to see how this context dictionary is used to get the task instance ti and then perform the xcom pull to get the value of the file location. As an aside, this function also shows how we can instantiate another operator (EmailOperator) inside the PythonOperator and use its execute method to trigger it.

DAG execution and monitoring

Monitoring DAG execution is easy in Airflow using the UI. The DAG is scheduled to execute every day at 12 AM UTC which is 8 AM SGT (next day) in my current time zone, since SGT = UTC + 8. Do note that in default args, start_date is past 2 days, hence whenever I start the Airflow scheduler in my local machine it will back-fill for a couple of days if DAG runs for the past 2 days do not exist.

All DAG runs

Conclusion

Airflow, when used correctly is a powerful tool to orchestrate tasks with monitoring, dependency management, and error handling built-in. Since Airflow can back-fill DAG runs and DAGs with the same parameters can be run multiple times, the best practice is to have tasks operate with the business date of the current DAG run rather than the latest date and designing tasks such that they are idempotent, i.e. if you run the task multiple times with the same input, the output will always be the same. This is frequently achieved by dropping and creating partitions based on the business date/ DAG execution date, rather than append.

In this story, we saw how Airflow can be used in combination with Spark to perform a simple analysis task in our local machines. In an enterprise, the file with the Wikipedia content (data received from source) will be stored in an object store (like an S3 bucket in AWS or on-premises IBM S3). Rather than sending an email with the counts, the Spark job would be writing the counts to a Hive table in a more optimal format e.g. Parquet which can then be consumed for analysis using SQL or another Spark job, which is scheduled to run by another Airflow DAG after this DAG run completes successfully. Also, after the Spark job finishes, an additional task in Airflow might rename and move the file from the downloaded location to a separate archive location in S3 to clean up the landing area.

Hope you find this content informative and useful in your data engineering journey!

--

--

Anurag Chatterjee
Anurag Chatterjee

Written by Anurag Chatterjee

I am an experienced professional who likes to build solutions to real-world problems using innovative technologies and then share my learnings with everyone.

No responses yet