Logo

dev-resources.site

for different kinds of informations.

Data Engineering with DLT and REST

Published at
11/29/2024
Categories
Author
Sophia Parafina
Categories
1 categories in total
open
Data Engineering with DLT and REST

Building a Data Pipeline with DLT

In this example, a company provides IoT data through a near real-time API for monitoring and a historical API feed for analysis. Each serves a specific purpose, but you can use a single ETL (extraction/transform/loading)solution to manage data. APIs are a convenient way to access data from vehicles and devices. However, an endpoint can be oversubscribed. One method to scale is to ingest the data into a distributed database. This article shows how to ingest data from a REST API into DuckDB, a distributed database built for online analytic processing (OLAP) and historical data analysis.

Requirements

To build this solution, you will need:

  • Python 3.10 or higher
  • duckDB installed locally

Setup

It's good practice to create a dedicated environment for a Python project. Create a directory and an environment and activate the environment.

mkdir rest_iot 
cd rest_iot
python3 -m venv venv
source venv/bin/activate

This example uses the data load tool - dlt to consume and ingest near real-time data from the API. Dlt is a lightweight production-ready extract, transform, and load (ETL) Python library. To install dlt:

pip install -U dlt[duckdb]"

We can use dlt to create a new project.

dlt init iot duckdb

Dlt creates several project files, including a code example, requirements.txt, and a directory with configuration files.

iot_pipeline.py
requirements.txt
.dlt/
 config.toml
 secrets.toml

The requirements.txt file contains a list of Python packages required for the project. Installing these packages is essential to ensure the project runs smoothly.

pip install -r requirements.txt

The final step is to configure the API key. Add your API key to the secrets.toml file

rest_token = "rest_api_aaaaaaaaaaaaabbbbbbbbbbbbbbbbccccccccc"

Near Real-Time Data

We won't use the example code generated by dlt. Open the file and delete the code. Copy and paste the following code.

import dlt
from dlt.sources.rest_api import rest_api_source
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

nrt_source = rest_api_source({
    "client": {
        "base_url": "https://api.rest-iot.com/fleet/vehicles/stats/",
        "auth": BearerTokenAuth(token=dlt.secrets["rest_token"]),
        "paginator": {
            "type": "json_response",
            "next_url_path": "paging.next",
        },
    },
    "resources": [
        { 
            "name" : "feed",
            "endpoint" : {
                "params": {
                    "types": "gps",
                    "decorations": "obdOdometerMeters"
                },
            }
        },
    ],
})

pipeline = dlt.pipeline(
    pipeline_name="feed_pipeline",
    destination="duckdb",
    dataset_name="iot",
    progress="log",
)

load_info = pipeline.run(nrt_source)

The code defines a source, which includes the REST IoT endpoint and the API token. Within the source, the near real-time resource is the gps, which is loaded into duckdb. Dlt handles pagination automatically, extracting data until there are no more pages.

The second part of the code creates a pipeline for extracting data. It defines duckdb as the destination and writes the data to the iot database.

To extract and load the data, run the following:

python3 iot_pipeline.py

The pipeline outputs the log to the console as the program runs. We can check the output by querying duckdb with the CLI.

duckdb

v1.1.3 19864453f7
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen a persistent database.

Open the database file in the duckdb CLI. You can use SQL to query the database.

.open iot.duckdb
SHOW TABLES;

SHOW TABLES

SELECT * FROM feed;

SELECT ALL

SELECT * FROM feed_gps;

SELECT ALL FROM GPS TABLE

This example uses duckdb, but dlt supports many destinations, including Clickhouse, Snowflake, Databricks, S3, and over 30 SQL databases. Developers can choose a data backend that meets their requirements.

Historical Data

Retrieving historical data follows the same pattern, i.e., define a source and create a pipeline.

params = {
    "types": "engineStates",
    "startTime": "2020-07-23T00:00:00Z",
    "endTime": "2020-07-24T00:00:00Z"
}

historical_source = rest_api_source({
    "client": {
        "base_url": "https://api.rest-iot.com/fleet/vehicles/stats/",
        "auth": BearerTokenAuth(token=dlt.secrets["rest_token"]),
        "paginator": {
            "type": "json_response",
            "next_url_path": "paging.next",
        },
    },
    "resources": [
        { 
            "name" : "history",
            "endpoint" : {
                "params": params,
            }
        },
    ],
})

pipeline = dlt.pipeline(
    pipeline_name="history_pipeline",
    destination="duckdb",
    dataset_name="iot",
    progress="log",
)

load_info = pipeline.run(historical_source)

You can programmatically work with the data in duckdb, but analysts often use spreadsheets. Exporting the data to a CSV file is simple. In the duckdb session, use the COPY command to write the data to a file.

COPY history TO 'history.csv' (HEADER, DELIMITER ',');

What will you build?

This article demonstrates how to work with near real-time and historical data using the dlt package. Whether you need to scale data access across the enterprise or provide historical data for post-event analysis, you can use the same framework to provide customer data. In a future article, I'll demonstrate how to use dlt with a workflow orchestrator such as Apache Airflow or Dagster.``

Featured ones: