In the data-driven world of e-commerce, analyzing sales data is crucial for making informed business decisions. Apache Airflow, a powerful open-source platform, allows you to automate and orchestrate complex workflows. In this blog, we'll guide you through setting up an ETL (Extract, Transform, Load) workflow using Apache Airflow to analyze e-commerce sales data. By leveraging Airflow, we'll extract sales data from a CSV file, process and transform it using Python, and load the cleaned data into a PostgreSQL database for further analysis. This end-to-end solution will enable you to efficiently manage and analyze your sales data, driving actionable insights and business growth.
Set up Airflow and pgAdmin:
Airflow setup: https://vipinmp.hashnode.dev/quick-and-easy-apache-airflow-setup-tutorial
pgAdmin setup: https://vipinmp.hashnode.dev/accessing-postgresql-using-pgadmin-with-a-dockerized-apache-airflow-setup
Add postgres connection details in Airflow.
Go to Admin → Connections → Add a new record. Enter the details as below and Save it.
Setup Test data(E-Commerce Sales Data)
Example CSV File: sales_data.csv which needs to be placed under the folder “dags“.
order_id,product_id,customer_id,quantity,price, date 1, 101, 1001, 2,10.5, 2024-01-15 2,102,1002,1,25.0,2024-02-25 3,101,1003,3,10.5,2024-03-10
Place the airflow python code
ecommerse_etl.py
in thedags
folder.
Explanation
- Importing Libraries and Modules
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
airflow: Importing modules for creating the DAG and defining tasks.
datetime: Importing for setting the start date and schedule interval.
pandas: Importing for data manipulation and analysis.
PostgresOperator and PostgresHook: Importing for interacting with PostgreSQL database.
- Extract Sales Data
def extract_sales_data():
df = pd.read_csv('dags/sales_data.csv')
df.to_csv('dags/extracted_sales_data.csv', index=False) # Save extracted data
pd.read_csv('dags/sales_data.csv'): Reads the sales data from a CSV file located in the
dags
directory.df.to_csv('dags/extracted_sales_data.csv', index=False): Saves the extracted data to a new CSV file named
extracted_sales_data.csv
.
- Transform Sales Data
def transform_sales_data():
df = pd.read_csv('dags/extracted_sales_data.csv')
df['total'] = df['quantity'] * df['price']
transformed_df = df.groupby(['product_id', 'date']).agg({'total': 'sum'}).reset_index()
transformed_df.to_csv('dags/transformed_sales_data.csv', index=False)
pd.read_csv('dags/extracted_sales_data.csv'): Reads the previously extracted data.
df['total'] = df['quantity'] * df['price']: Creates a new column
total
by multiplyingquantity
andprice
.df.groupby(['product_id', 'date']).agg({'total': 'sum'}).reset_index(): Groups the data by
product_id
anddate
, summing thetotal
for each group.transformed_df.to_csv('dags/transformed_sales_data.csv', index=False): Saves the transformed data to a new CSV file named
transformed_sales_data.csv
.
- Load Sales Data
def load_sales_data():
hook = PostgresHook(postgres_conn_id='postgres_connection')
df = pd.read_csv('dags/transformed_sales_data.csv')
for _, row in df.iterrows():
hook.run("""
INSERT INTO sales_data (product_id, purchase_date, total)
VALUES (%s, %s, %s);
""", parameters=(row['product_id'], row['date'], row['total']))
PostgresHook(postgres_conn_id='postgres_connection'): Establishes a connection to the PostgreSQL database using the connection ID
postgres_connection
.pd.read_csv('dags/transformed_sales_data.csv'): Reads the transformed data.
for _, row in df.iterrows(): Iterates over each row in the DataFrame.
hook.run(...): Inserts each row into the
sales_data
table in the PostgreSQL database.
- Define the DAG
dag = DAG('etl_sales_analysis', description='ETL workflow for e-commerce sales analysis',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1), catchup=False)
DAG('etl_sales_analysis', ...): Defines a new DAG with the ID
etl_sales_analysis
.description: Provides a description of the DAG.
schedule_interval='@daily': Specifies that the DAG will run daily.
start_date=datetime(2024, 1, 1): Sets the start date for the DAG.
catchup=False: Disables backfilling (catching up on missed runs).
- Define Extract Task
extract_task = PythonOperator(task_id='extract_sales_data', python_callable=extract_sales_data, dag=dag)
- PythonOperator(task_id='extract_sales_data', ...): Defines a Python task that calls the
extract_sales_data
function.
- Define Transform Task
transform_task = PythonOperator(task_id='transform_sales_data', python_callable=transform_sales_data, dag=dag)
- PythonOperator(task_id='transform_sales_data', ...): Defines a Python task that calls the
transform_sales_data
function.
- Define Create Table Task
create_table_task = PostgresOperator(task_id='create_table',postgres_conn_id='postgres_connection',
sql="""
CREATE TABLE IF NOT EXISTS sales_data (
product_id INT,
purchase_date DATE,
total INT
);
""",
dag=dag,
)
- PostgresOperator(task_id='create_table', ...): Defines a task that creates the
sales_data
table in the PostgreSQL database if it doesn't already exist.
- Define Load Task
load_task = PythonOperator(task_id='load_sales_data', python_callable=load_sales_data, dag=dag)\
- PythonOperator(task_id='load_sales_data', ...): Defines a Python task that calls the
load_sales_data
function.
- Set Task Dependencies
extract_task >> transform_task >> create_table_task >> load_task
- extract_task >> transform_task >> create_table_task >> load_task: Specifies the order in which the tasks should be executed. The
transform_task
runs after theextract_task
, thecreate_table_task
runs after thetransform_task
, and theload_task
runs after thecreate_table_task
.
Running the DAG
- Enable and Trigger the DAG: Find your
etl_sales_analysis
DAG in the list, turn it on, and trigger it manually to run the workflow.
Explore the sales data
Github:
https://github.com/vipinputhanveetil/airflow-ecommerce-etl-pipeline
Summary
This ETL workflow reads sales data from a CSV file, transforms the data by calculating total sales, and loads the transformed data into a PostgreSQL database. The workflow is defined as a DAG in Apache Airflow, with tasks executed in a specific order. This setup helps automate the ETL process, making it easier to manage and monitor.
Conclusion
By following these steps, you can set up an automated ETL workflow using Apache Airflow to perform E-Commerce Sales Analysis. Airflow's powerful scheduling and monitoring capabilities make it an excellent choice for handling complex workflows efficiently.