dev-resources.site
for different kinds of informations.
Simplifying Data Transformation in Redshift: An Approach with DBT and Airflow
Let's transform and model data stored in Redshift with a simple and effective approach using DBT and Airflow. Why make it complicated when we can simplify it?
In this article, we'll work with the following scenario:
Amazon Redshift stores the input data from an e-commerce website.
We'll provide insights into customer behavior.
The DW is not yet structured for analytical applications; it contains a copy of the raw data
We'll transform the data and structure the DW to enable business analysis.
The data used for this article is open and relates to sales on the Amazon website, which can be found on Kaggle at this link.
The complete code for this project can also be found on my GitHub.
About the solution
To solve the presented problem, we have the following architecture design with the technologies and tools that will be used:
For those unfamiliar with the tools to be used, I'll provide a brief explanation of them below.
If you're already familiar, feel free to skip to the next section: Data Modeling.
About dbt
Data Build Tool or dbt is a modern tool for data transformation in the data warehouse or lakehouse scenario. It allows data engineers, data scientists, and data analysts to manipulate data using SQL.
DBT has two versions:
- dbt-core: It's an open-source version maintained by the community and can be freely used.
- dbt-cloud: It's the paid version managed as a SaaS, which can be used in the cloud with a monthly subscription.
About Airflow and Cosmos
Airflow is the most widely used and well-known tool for orchestrating data workflows. It allows for efficient pipeline construction, scheduling, and monitoring.
Since we're talking about Airflow, let's also discuss Cosmos.
Cosmos is a library for Airflow developed by Astronomer that aims to simplify the execution of DBT projects with Airflow.
With Cosmos, you can execute a DBT project through a group of Airflow tasks, which are automatically recognized. Each DBT model becomes an Airflow task or group, performing transformations and tests.
Amazon Redshift
The Redshift cloud-based data warehouse is intended for analyzing and querying massive amounts of data.
Nowadays, Redshift has both a managed and a serverless version and has evolved into a robust data platform.
Data Modeling
To better understand the solution we'll adopt in this article, the following diagram shows the data modeling for building the DW.
As mentioned earlier, there is a table where the data is stored without transformation (raw data), which is the sales table.
The sales table also serves as a staging area for data processing and the creation of other tables.
In the data warehouse modeling, there are three dimension tables:
- dim_product: This table contains data about the store's products.
- dim_user: It contains data about users who are customers of the store.
- dim_rating: It contains all the product ratings given by customers of the store.
And two fact tables:
- fact_product_rating: This table contains data for extracting product rating metrics by users, e.g., the top-rated products.
- fact_sales_category: This table contains data for extracting sales metrics by product categories, e.g., the top categories with the most profit for the store.
Note: It's important to remember that the example in this article is hypothetical and may not represent the best data modeling for the given data; it's for educational purposes only.
Show me the Code
Next, it's time to explore in practice the construction of the dbt project, the DAG in Airflow and the results of data transformations in Redshift.
1. Dbt Project
In this section, the focus is on the structure, configuration, and SQL code for the data transformations that are part of the project.
To start a DBT project, you need to install the Python package and use the CLI.
To install the DBT package:
$ pip install dbt-core==1.4.9
To initialize a project:
$ dbt init <project_name>
By default, a DBT project already has some folders and configurations.
Here's the structure with all the folders and files created for our scenario:
-- dbt_project/
|______ dbt_project.yml
|______ analyses/
|______ macros/
|______ models/
| | |______ dimensions/
| | | |______ dim_product.sql
| | | |______ dim_rating.sql
| | | |______ dim_user.sql
| | |______ facts/
| | | |______ fact_product_rating.sql
| | | |______ fact_sales_category.sql
| | |______ staging/
| | | |______ stg_sales_eph.sql
| | | |______ staging.yml
|______ seeds/
|______ snapshots/
|______ tests/
Most of the work is done within the models folder.
The next step is to analyze each SQL transformation code.
Staging Table
As mentioned earlier, the staging table is the sales table itself, which already exists in the data warehouse. Essentially, it involves loading the entire table into memory.
{{
config(
materialized='ephemeral'
)
}}
SELECT
*
FROM {{ source('public', 'sales') }}
I'd like to highlight three points:
- In DBT, there are two ways to materialize tables: ephemeral, which is a virtual table loaded only in memory and not persisted in the database, and table, which is a table that will be persisted in the data warehouse.
- The second point concerns processing, which is entirely done in the source database, in this case, Redshift. So when I say that the table is loaded into memory, it refers to Redshift's memory.
- Tables, whether ephemeral or table are created with the same name as the SQL file.
Dimension Tables
Now let's analyze the SQL code for the three dimension tables, which will be materialized as table.
The first one is dim_product, containing data about the store's products based on the sales table.
{{
config(
materialized='table'
)
}}
SELECT
DISTINCT
product_id,
product_name,
category,
about_product,
img_link,
product_link
FROM {{ ref('stg_sales_eph') }}
The second is dim_rating, which includes product ratings by buyers.
{{
config(
materialized='table'
)
}}
SELECT
user_id,
product_id,
rating,
rating_count
FROM {{ ref('stg_sales_eph') }}
The third and final one is dim_user, containing data about store customers.
{{
config(
materialized='table'
)
}}
SELECT DISTINCT
user_id,
user_name
FROM {{ ref('stg_sales_eph') }}
With the dimension tables code completed, it's time to look at the fact tables.
Fact Tables
The two fact tables are more complex as they include business metrics, aggregations, and relationships with dimensions.
Here's the code for fact_product_rating, which relates to the product and rating dimensions, calculating average rating scores grouped by products.
{{
config(
materialized='table'
)
}}
SELECT
p.product_id,
p.product_name,
AVG(CASE
WHEN r.rating ~ '^[0-9.]+$' THEN CAST(r.rating AS numeric)
ELSE NULL
END) AS avg_rating
FROM
{{ ref('stg_sales_eph') }} s
JOIN {{ ref('dim_product') }} p ON s.product_id = p.product_id
JOIN {{ ref('dim_rating') }} r ON r.product_id = p.product_id
GROUP BY
p.product_id,
p.product_name
The second fact table is fact_sales_category, which groups products by category and calculates total revenue. It relates to the user and category dimensions.
{{
config(
materialized='table'
)
}}
SELECT
u.user_id,
p.category,
SUM(CAST(REGEXP_REPLACE(s.actual_price, '[^0-9.]', '') AS DECIMAL(10, 2))) sales_amount
FROM
{{ ref('stg_sales_eph') }} s
JOIN {{ ref('dim_product') }} p ON s.product_id = p.product_id
JOIN {{ ref('dim_user') }} u ON s.user_id = u.user_id
GROUP BY
u.user_id,
p.category
With the data modeling ready, it's time to complete the project and make it all run with Airflow.
2. Construction of the DAG in Airflow
As mentioned earlier, for this project, we'll use the Cosmos library, which offers a great integration for projects with DBT and Airflow.
To do this, we need to follow a few steps:
- Install the necessary dependencies on the Airflow environment to run the project.
- Configure an Airflow connection for Redshift. This is an interesting feature of Cosmos; we don't need to configure the database connection in the DBT project. Instead, we can pass it as a parameter through Airflow.
- Finally, build the tasks with the "DbtTaskGroup" from Cosmos. Following the mentioned steps, here's the "requirements.txt" file with the necessary dependencies to be installed on the Airflow server:
dbt-core==1.4.9
dbt-redshift==1.4.0
astronomer-cosmos==1.0.5
The configuration of the connection in the Airflow UI for Redshift looks like this:
Now, here's the complete DAG code:
from airflow.decorators import dag
from airflow.operators.dummy_operator import DummyOperator
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig
from cosmos.profiles import RedshiftUserPasswordProfileMapping
from cosmos.constants import TestBehavior
from pendulum import datetime
CONNECTION_ID = "redshift_default"
DB_NAME = "amazon_sales"
SCHEMA_NAME = "public"
ROOT_PATH = '/opt/airflow/dags/dbt'
DBT_PROJECT_PATH = f"{ROOT_PATH}/sales_dw"
profile_config = ProfileConfig(
profile_name="sales_dw",
target_name="dev",
profile_mapping=RedshiftUserPasswordProfileMapping(
conn_id=CONNECTION_ID,
profile_args={"schema": SCHEMA_NAME},
)
)
@dag(
start_date=datetime(2023, 10, 14),
schedule=None,
catchup=False
)
def dag_dbt_sales_dw_cosmos():
start_process = DummyOperator(task_id='start_process')
transform_data = DbtTaskGroup(
group_id="transform_data",
project_config=ProjectConfig(DBT_PROJECT_PATH),
profile_config=profile_config,
default_args={"retries": 2},
)
start_process >> transform_data
dag_dbt_sales_dw_cosmos()
I'd like to highlight the following part of the code for further analysis:
transform_data = DbtTaskGroup(
group_id="transform_data",
project_config=ProjectConfig(DBT_PROJECT_PATH),
profile_config=profile_config,
default_args={"retries": 2},
)
The "DbtTaskGroup" reads the directory where the DBT project is located. In the previous code, the path is indicated by the "DBT_PROJECT_PATH" variable.
It then constructs Airflow tasks based on the models created in DBT, which, in our case, include staging, dimensions, and fact tables.
ROOT_PATH = '/opt/airflow/dags/dbt'
DBT_PROJECT_PATH = f"{ROOT_PATH}/sales_dw"
Note: It's important to emphasize that in this case, the DBT project needs to be on the same server as Airflow, as defined above.
The ProfileConfig is the object that configures the connection.
Essentially, it's the Airflow connection and some parameters that can be passed, such as the database schema.
CONNECTION_ID = "redshift_default"
DB_NAME = "amazon_sales"
SCHEMA_NAME = "public"
profile_config = ProfileConfig(
profile_name="sales_dw",
target_name="dev",
profile_mapping=RedshiftUserPasswordProfileMapping(
conn_id=CONNECTION_ID,
profile_args={"schema": SCHEMA_NAME},
)
)
Constructing the DAG is that simple – it's straightforward and efficient, isn't it?
Here's an image of how the tasks built automatically by Cosmos in the Airflow UI will look:
You can see the dependencies for building fact_product_rating in the image, as defined by the SQL in the models.
Also highlighted in the image are the dependencies for building fact_sales_category.
3. Finally... Results in Redshift
Upon running the DAG and building the models configured in DBT, we achieve the following result in Redshift:
You can see the sales table, which contains raw data used for staging, as well as the materialized dimension and fact tables.
Performing a query on the fact_sales_category table, we get the following result:
Conclusion
DBT is an excellent tool for data processing and modeling, providing convenience and speed for data projects. One of its strengths is that it automatically creates dependencies between models, which makes it easier to work with because you only need to write your transformations in SQL.
The Cosmos library from Astronomer is also a great help in orchestrating DBT with Airflow, simplifying and speeding up the process. It provides an overview of the DBT models and their execution.
Airflow + DBT + Cosmos = The Perfect Combination ❤
If you're looking for a simple, efficient, and cost-effective data stack, the solution presented in this article may be effective and ideal for your scenario.
I'll be writing more articles about DBT in the future, so stay tuned!
Follow me here and on other social networks:
LinkedIn: https://www.linkedin.com/in/cicero-moura/
Github: https://github.com/cicerojmm
Featured ones: