Logo

dev-resources.site

for different kinds of informations.

Normalizing data with dlt

Published at
2/17/2024
Categories
dlt
duckdb
dataengineering
Author
cmcrawford2
Categories
3 categories in total
dlt
open
duckdb
open
dataengineering
open
Author
11 person written this
cmcrawford2
open
Normalizing data with dlt

This is the second in a series of posts about dlt. It was given as part of the Data Talks Club's data engineering zoomcamp. The instructor is Adrian Brudaru, and you can see the dlt repository HERE.

Normalizing Data

There are two parts to normalizing data. The first is normalizing without changing data. There is also filtering data, where you remove outliers or data that has no meaning. This changes the data slightly.

The first method includes adding types, renaming columns, flattening nested dictionaries, and turning lists or arrays into child tables. Lists or arrays can't be flattened, because they contain different amounts of data.

json doesn't describe data. Types are not enforced. If you get one message from the Discord api, you get a string, but if there is more than one, you get a list. json is meant to be a transfer mechanism, not a database. It's not suitable for direct analytical usage.

For this example, the instructor modified a version of the taxi database. He added nested dictionaries, nested lists, and a hash id for each record.

data = [
    {
        "vendor_name": "VTS",
    "record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "Trip_Distance": 17.52,
        # nested dictionaries could be flattened
        "coordinates": {
            "start": {
                "lon": -73.787442,
                "lat": 40.641525
            },
            "end": {
                "lon": -73.980072,
                "lat": 40.742963
            }
        },
        "Rate_Code": None,
        "store_and_forward": None,
        "Payment": {
            "type": "Credit",
            "amt": 20.5,
            "surcharge": 0,
            "mta_tax": None,
            "tip": 9,
            "tolls": 4.15,
        "status": "booked"
        },
        "Passenger_Count": 2,
        # nested lists need to be expressed as separate tables
        "passengers": [
            {"name": "John", "rating": 4.9},
            {"name": "Jack", "rating": 3.9}
        ],
        "Stops": [
            {"lon": -73.6, "lat": 40.6},
            {"lon": -73.5, "lat": 40.5}
        ]
    },
    # ... more data
]
Enter fullscreen mode Exit fullscreen mode

dlt automates normalization. We'll see how it does this next. This removes a huge burden from the data engineer and automatically normalizes the data using best practices.

The instructor created a dlt data pipeline and ran it, creating a table in DuckDB.

# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later

pipeline = dlt.pipeline(destination='duckdb',
                dataset_name='taxi_rides')

# run with merge write disposition.
# This is so scaffolding is created for the next example,
# where we look at merging data

pipeline.run(data, 
                table_name="rides",         
                write_disposition="merge",
                primary_key="record_hash")
Enter fullscreen mode Exit fullscreen mode

dlt can use schemas as data contracts. You can specify the format, and reject data that doesn't have that format. It will type the data, flatten structures, rename columns to fit database standards, and process a stream of events/rows without filling memory. It will load to a variety of databases or file formats.

Let's look at what we just created:

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))


print("\n\n\n Rides table below: Note the times are properly typed")
rides = conn.sql("SELECT * FROM rides").df()
display(rides)

print("\n\n\n Passengers table")
passengers = conn.sql("SELECT * FROM rides__passengers").df()
display(passengers)
print("\n\n\n Stops table")
stops = conn.sql("SELECT * FROM rides__stops").df()
display(stops)

Enter fullscreen mode Exit fullscreen mode

The output is:

Loaded tables: 

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚        name         β”‚
β”‚       varchar       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ _dlt_loads          β”‚
β”‚ _dlt_pipeline_state β”‚
β”‚ _dlt_version        β”‚
β”‚ rides               β”‚
β”‚ rides__passengers   β”‚
β”‚ rides__stops        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜




 Rides table below: Note the times are properly typed

    record_hash     vendor_name     time__pickup    time__dropoff   trip_distance   coordinates__start__lon     coordinates__start__lat     coordinates__end__lon   coordinates__end__lat   payment__type   payment__amt    payment__surcharge  payment__tip    payment__tolls  payment__status     passenger_count     _dlt_load_id    _dlt_id
0   b00361a396177a9cb410ff61f20015ad    VTS     2009-06-14 19:23:00-04:00   2009-06-14 19:48:00-04:00   17.52   -73.787442  40.641525   -73.980072  40.742963   Credit  20.5    0   9   4.15    booked  2   1708172075.423542   OuSbrdhe7+UTfA




 Passengers table

    name    rating  _dlt_root_id    _dlt_parent_id  _dlt_list_idx   _dlt_id
0   John    4.9     OuSbrdhe7+UTfA  OuSbrdhe7+UTfA  0   e8Cw1jDEzQ4Mww
1   Jack    3.9     OuSbrdhe7+UTfA  OuSbrdhe7+UTfA  1   /h4QT5P61zivrQ




 Stops table

    lon     lat     _dlt_root_id    _dlt_parent_id  _dlt_list_idx   _dlt_id
0   -73.6   40.6    OuSbrdhe7+UTfA  OuSbrdhe7+UTfA  0   PYqbqAEcmMiESw
1   -73.5   40.5    OuSbrdhe7+UTfA  OuSbrdhe7+UTfA  1   aSmvX/UKtYsclQ
Enter fullscreen mode Exit fullscreen mode

You can see that the fields for pickup and dropoff time have been converted to columns with the names time_pickup and time_dropoff, and likewise for other fields. Two tables were created from the lists. There is a relationship between these two tables and the top level data that they came from. dlt stores the parent id in the table, so the two tables will have that field to connect back to the parent. We could join the tables back into the main data if we want to have a single table.

dlt Article's
30 articles in total
Favicon
Blockchain for Business: Dev Centre House Ireland 10+ years building Blockchain Applications
Favicon
Incremental loading in dlt
Favicon
Normalizing data with dlt
Favicon
Extracting data with dlt
Favicon
Podcast: Eliminate The Overhead In Your Data Integration With The Open Source dlt Library
Favicon
What can you expect from a hackernews launch?
Favicon
Delta Live Tables
Favicon
Need help in building a new application from scratch
Favicon
[II - IOTA Development]: ConfiguraciΓ³n y puesta en marcha del nodo Hornet
Favicon
Free Corda Next-Gen Bootcamp
Favicon
CSDE: Hello world with Next-Gen Corda
Favicon
Getting ready for Corda 5 is simple.
Favicon
R3 NYC Tech Meetup: The DLT News You Need to Know in 2023
Favicon
DISTRIBUTED LEDGER TECHNOLOGY SYSTEMS A Conceptual Framework
Favicon
What is enterprise blockchain, and why should you care?
Favicon
What is NFT?
Favicon
How does Hederas Hashgraph consensus improve things & what are its use cases?
Favicon
A Brief Overview of Quantum Ledger Database
Favicon
The benefits of Erlang & Elixir for blockchain
Favicon
Java Client Library - General Use Code Examples
Favicon
Corda - Connecting a Ktor web server to a Corda node
Favicon
Corda - Flows can do anything
Favicon
Corda - Extending Flows to customise transaction validation
Favicon
Logical Components in blockchain
Favicon
Corda - Saving transactions where only a subset of parties are signers
Favicon
10 years of Bitcoin history, replayed in under 30 minutes
Favicon
JSON-RPC vs REST for distributed platform APIs
Favicon
Radix DLT - Alpha desktop wallet API documentation
Favicon
Future Developments - Decentralized Identity
Favicon
Corda - Broadcasting a transaction to external organisations

Featured ones: