Logo

dev-resources.site

for different kinds of informations.

How moving from Pandas to Polars made me write better code without writing better code

Published at
3/5/2024
Categories
polars
dataengineering
rust
airflow
Author
duvenagep
Author
9 person written this
duvenagep
open
How moving from Pandas to Polars made me write better code without writing better code

In a scale-up like Check Technologies data not only grows, but it grows faster too. It was merely a matter of time before our data processes would run into resource limitations. Reason enough to find a more performant solution. Interestingly, the actual result, while impressive, was not the most interesting part of the solution.

In this article, we will discuss what I like to refer to as the Polarification of the Check data stack. More specifically: how we used Polars to solve a specific problem and then ended up completely replacing Pandas with Polars on Airflow.

We also highlight some challenges and learnings anyone can use should they consider the move over to Polars.

So what is Polars anyway?

Before going down this rabbit hole it is important to know the basics. If you have been using Python for a while chances are you have come across Pandas. An open-source dataframe library widely used in the Python ecosystem, most notably the data engineering/science and analytics world.

Below is a snippet from Pandas on how to read data from a CSV file into a dataframe.



import pandas as pd

df = pd.read_csv('data.csv')
df.head()


Enter fullscreen mode Exit fullscreen mode

While Pandas is a fantastic library and it revolutionised the data analytics world it has several drawbacks. The original author of Pandas Wes McKinney famously gave a talk back in 2013 titled 10 Things I Hate About Pandas* where he highlighted the main design changes he would make had he rebuilt Pandas again today.

These things include:

  1. Internals too far from “the metal”
  2. No support for memory-mapped datasets
  3. Poor performance in database and file ingest / export
  4. Warty missing data support
  5. Lack of transparency into memory use, RAM management
  6. Weak support for categorical data
  7. Complex groupby operations awkward and slow
  8. Appending data to a DataFrame is tedious and very costly
  9. Limited, non-extensible type metadata
  10. Eager evaluation model, no query planning
  11. “Slow”, limited multicore algorithms for large datasets

*11 things but who is counting...

The new kid on the block

In comes Polars: a brand new dataframe library, or how the author Ritchie Vink describes it... a query engine with a dataframe frontend. Polars is built on top of the Arrow memory format and is written in Rust, which is a modern performant and memory-safe systems programming language similar to C/C++.

Below is the Polars equivalent of reading data from a CSV file:



import polars as pl

df = pl.read_csv('data.csv')
df.head()


Enter fullscreen mode Exit fullscreen mode

Polars addresses many of the issues with Pandas raised by the author and in doing so has resulted in blazingly fast performance and low memory consumption.

While Polars boasts many improvements, such as its intuitive design, ease of use, powerful expressions and so much more, my favourite and perhaps its core strength, is its Lazy API.



import polars as pl

lf = pl.scan_csv('data.csv')
df = lf.collect()
df.head()


Enter fullscreen mode Exit fullscreen mode

In Pandas the steps in your code are executed eagerly, meaning they are executed as is, in sequential order. Lazy execution, on the other, hand means your code is given to the polars library, query planner to optimise and the results are only materialised when you call the collect() method.

This Lazy API allows the user to write code and let polars optimise it. These optimisations result in much faster execution times with less resource usage.

It is this Lazy API coupled with the power of Airflow where the magic happens.

Data Engineering at Check

At Check Technologies, we believe strongly in data-informed decision-making. A core part of this is our Check Data Platform. It allows us to perform various analyses, from marketing campaign performance, and shift demand forecasting to fleet-health monitoring, fraud detection, zonal & spatial analytics and so much more. The results from these analyses give us the incentives to improve existing features and create new ones.

A key component of this platform is Airflow, an open-source workflow management platform. Airflow is the industry standard workflow management tool in the world of data engineering and was chosen as it is widely used, actively developed & maintained and has a very large community. It also has very good support with established cloud providers, in the form of external packages*:

*An extensive list of Airflow Provider packages can be found here

Airflow is essential for our data pipelines and forms the backbone of our data infrastructure.

Pandas is very well integrated with Airflow. This is clear when looking at the Airflow Postgres integration, where returning a Pandas dataframe from a SQL query is predefined, see below:



from airflow.providers.postgres.hooks.postgres import PostgresHook

postgres = PostgresHook(postgres_conn_id="postgres")
df = postgres.get_pandas_df(sql=statement, parameters=parameters)
df.head()



Enter fullscreen mode Exit fullscreen mode

Most of the Directed Acyclic Graphs (DAGs) at Check were built with Pandas. It was used to read data from various sources (databases, s3, etc.), clean the data, and then write the data back to various destinations, most notably our Datalake and Data Warehouse (DWH).

The Problem

As Check grew, so did the amount of data we were generating. One of our DAGs that processes AppsFlyer data (user attribution & app usage data) grew so large that on busy days we started getting the dreaded SIGKILL (Airflow is out of resources) error. Scaling up our Kubernetes (k8s) cluster to give Airflow more resources worked for a while but we knew this was not a long-term solution. A new approach was needed.

The AppsFlyer process was made up of 2 separate DAGs, one to parse the raw data and write it to the DWH and a second to take the parsed data and "sessionize" it, meaning grouping all user app interactions within a 15-minute window into unique sessions per user. This enables us to measure various user behaviour metrics and ultimately improve our product through learnings gained from these insights.

Due to the amount of the data, we had to write the parsed data to the DWH and then re-load it in smaller chunks to sessionize it, hence the 2-step process above. Not only did this create a duplication of data, but it also introduced a new problem: inaccurate sessionizing of data.

A better solution was needed.

Pandas works great, why switch to Polars?

I had been experimenting with Polars for a bit more than a year (since Polars 0.14.22 back in Oct 2022) when I faced this problem. Being written in Rust, it is known for being fast and having a much smaller memory footprint as compared to Pandas.

Additionally, the Polars Lazy API, which defers the query execution to the last minute, allowing for optimisations that can have significant performance advantages, could be the right approach to our problem.

Thus when faced with the out-of-memory error I thought it a perfect situation to try and use Polars in a production environment.

Before jumping headfirst into installing new dependencies on our production Airflow cluster I wanted to do some tests. Firstly to see if I could even do the necessary data parsing & pre-processing in Polars and secondly to determine what benefits we could gain.

The Experiment

The experiment was simple, take a single hour of the data and compare the eager original Pandas solution with the new Lazy API-powered Polars solution, measuring dataframe size and time taken. Yes this is crude and no this is not scientifically sound but it was merely to confirm the rumours about Polars.

The data, in the form of partitioned parquet files, consists of 103 columns, with one of these columns, event_value, containing JSON data in string format. It is this event_value column that contains most of the important data we need.

Unfortunately, the JSON in this column is not uniform and can also be null. Below is a snippet from this column.
Source Data

We will not do a detailed code comparison between Pandas & Polars in this blog, however, I do want to highlight the differences between the eager and lazy approach from each library respectively using small snippets from the original solutions

A detailed comparison between Pandas and Polars solutions along with additional benefits that Polars offer will follow in the 2nd blog in this series.

The Pandas way

In Pandas we can parse this JSON to a dataframe using the json_normalize() function and then concatenate it to the original dataframe and continue the data transformation.

We first need to parse the string data to valid JSON using the json.loads() function which, unfortunately, does not take a Series as input.



TypeError: the JSON object must be str, bytes or bytearray, not Series


Enter fullscreen mode Exit fullscreen mode

Therefore we have to use a lambda function and apply the string to JSON conversion to each row in the series and then we can convert the JSON to a dataframe.



df = pd.concat((pd.read_parquet(p) for p in paths))
df = df.reset_index().drop(["index"], axis=1)

df["event_value"] = df["event_value"].apply(
    lambda row: json.loads(row) if row != "" else ""
)

df_normalized = pd.json_normalize(df["event_value"])
df_events = pd.concat([df[["event_name"]], df_normalized], axis=1)
df_events = df_events.reset_index().drop(["index"], axis=1)

keep_columns = list(COLUMNS_INAPPS.keys())
df_final = df_events[keep_columns]



Enter fullscreen mode Exit fullscreen mode

The Polars way

How does this solution compare to the Polars Lazy approach?

Well first thing to know is that Polars != Pandas, thus to solve the same problem, it is not as simple as changing the imports from Pandas to Polars. It requires a new way of thinking, one which I would argue is much more simple and intuitive.

The Polars solution below is only a small part of the original solution. The sections of this solution that are not important to this comparison have been replaced with ...:



lazy_df = pl.scan_parquet(paths)

df = (
    lazy_df.with_columns(
            [
            ...
            ...
            pl.col("event_value")
                .str.json_path_match(r"$.last_location_timestamp")
                .str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S%Z")
                .cast(pl.Datetime)
                .alias("location_last_updated_at"),
            pl.col("event_value")
                .str.json_path_match(r"$.latitude")
                .cast(pl.Float64, strict=False)
                .alias("latitude"),
            pl.col("event_value")
                .str.json_path_match(r"$.longitude")
                .cast(pl.Float64, strict=False)
                .alias("longitude"),
            ...
            ...
            ...
            ]
        )
        .select(list(COLUMNS_INAPPS.keys()))
        ...
        ...
        .collect()
)


Enter fullscreen mode Exit fullscreen mode

Glossing over a ton of detail, the main things to note are the use of the Polars Lazy API and the difference in syntax.

Step one we lazily load the dataset using the scan_parquet() function. Something important to highlight is that the scan_parquet() input path can contain wildcards * meaning you can lazily load multiple files at once.

We then define all the transformations we want to apply to this LazyFrame using the polars expression and then we call the collect() method.

The polars expressions are also very intuitive and result in clear and readable code. Here we use a string function json_path_match() to match the JSON we want, then we parse it and cast it to a datetime and finally assign the value to a new column with a name using the alias() method.

Once we call the collect() method, all our transformations will be passed to the Polars query planner which will optimise it and materialise the results to a dataframe.

The Experiment Results

Both solutions were repeatedly tested against various batches of the same data to ensure accurate results. Jupyter Notebooks %%time magic command was used to measure execution time. For Pandas the memory_usage() was used to measure size. For Polars, the estimated_size() function was used.

Below are the results from that test:


Pandas Time & Memory usage measurements


Polars Time & Memory usage measurements


Pandas vs Polars: Execution Time


Pandas vs Polars: Memory Usage

A 3.3x speed and approximately 3.1x memory improvement. Quite a big change, more than I was expecting. It confirmed the rumours about Polars but why the big change?

Well, it turns out that not only is Polars fast and low on resource usage, but it also helps you as a developer write better code. How does it do that? Simple, by not requiring you (the developer) to write good code.

What do I mean by this ridiculous statement?

In Pandas, to write the most optimised code you need to think of every optimisation yourself, from column selection, order of operation, and materialisations to vectorisation. This can be complex and it is easy to get it wrong. Polars on the other hand lets you focus on solving the business problem while it handles the optimisations. That is the power of the Lazy API.

Running the .show_graph() shows a plot of the query plan.


Polars Query Plan

In it, we see the below:


Column Selection Filter at Scan Level

which means Polars automatically applied a filter and only loaded the 19 columns we needed. Thus not reading in the remaining 84 columns greatly improves performance and reduces memory overhead.

While this optimisation (known as Predicate Pushdown) is also possible in Pandas, it is often overlooked. It's possible to write fast code in Pandas, but in Polars with the Lazy API fast is the default.

From this test, it is clear that Polars offers substantial performance and memory efficiency gains. Armed with these results I decided to migrate the AppsFlyer DAG from Pandas to Polars in production.

In Comes Airflow

Migrating 1 DAG
Having developed the Polars Lazy API solution for the test above, migrating the AppsFlyer DAG was mostly done. Only minor refactors to logging & notifications were required.

The new and improved DAG ran in production for a week while I closely monitored its performance. Not only did it work flawlessly but we also saw a massive speed improvement. This was fairly in line with the experiment results.


AppsFlyer DAG post Polars migration

With this positive outcome, I decided to migrate all remaining DAGs.

Migrating 100+ Dags
At Check, the Airflow DAGs can be grouped into 3 categories:

  • Extract and Load (EL, T is done in the DWH)
  • Complex Data Ingestion, Transformations or Parsing
  • Other (Spatial Computation, Archival, Reporting, etc)

The majority of the dags fall into the EL group and share similar logic. They all extract data from a database and write it to our s3 datalake using the parquet file format. From there, the data is either loaded into our DWH or used by another downstream process. The remaining DAGs all have unique data sources but still write to the same s3 datalake.

Our initial Airflow setup abstracted away this shared logic into helper functions that can be re-used by all DAGs.

Below are the Pandas implementations of the Read get_df_for_psql()




def get_df_for_psql(stmt: str, params: dict = {}) -> pd.DataFrame:
    log.info("Setting up Postgres hook")
    postgres = PostgresHook(postgres_conn_id="postgres")
    dataframe = postgres.get_pandas_df(sql=stmt, parameters=params)
    return dataframe



Enter fullscreen mode Exit fullscreen mode

and Write wrangle_to_s3() helper functions.



def wrangle_to_s3(dataframe: DataFrame, dtype: dict, key: str) -> str:
    s3_location = f"s3://{LAKE_BUCKET}/datalake/{key}"
    wr.s3.to_parquet(
        df=dataframe,
        path=s3_location,
        dtype=dtype,
        sanitize_columns=True,
        pyarrow_additional_kwargs={
            "coerce_timestamps": "ms",
            "allow_truncated_timestamps": True,
            "use_deprecated_int96_timestamps": False,
        },
    )
    return s3_location


Enter fullscreen mode Exit fullscreen mode

The first step in the migration was to refactor these helper functions to the Polars equivalents. This was very straightforward, see below:



def get_df_for_psql(stmt: str, params: dict = {}) -> pl.DataFrame:
    log.info("Setting up Postgres hook")
    postgres = PostgresHook(postgres_conn_id="postgres")
    uri = postgres.get_uri()
    dataframe = pl.read_database_uri(query=stmt, uri=uri)
    return dataframe



Enter fullscreen mode Exit fullscreen mode


def wrangle_to_s3(dataframe: DataFrame, dtype: dict, key: str) -> str:
    s3_location = f"s3://{LAKE_BUCKET}/datalake/{key}"
    fs = s3fs.S3FileSystem() 
    with fs.open(s3_location, mode='wb') as f:
        dataframe.write_parquet(f)
    return s3_location


Enter fullscreen mode Exit fullscreen mode

To my surprise, for some DAGs, it was really this simple. Top level import changes and 2 small functions refactored and then you have migrated from Pandas to Polars.

Other DAGs required a bit more work. Most of the migration refactoring was centred around transformation steps using the Polars Expressions. While this took some time to get used to, the resulting code was much more readable and maintainable.

The migration however did have a couple of challenges. Testing these changes locally was essential to ensure no downtime in services.

Using an M1 Macbook means developing locally on Airflow requires aarch64 support. While Polars is very well supported on all platforms, connector-x a dependency for reading data from a database, at the time of writing this blog, still does not have pre-built wheels for aarch64.

This was originally a blocker, however, we managed to set up a multi-stage Docker build to build from source. Here is the Github issue where we, along with community members, managed to solve it.

Being able to test Polars locally on Airflow gave us the confidence to proceed to use it in production. For us, having the benefits of Polars and having to, only for local testing purposes, build a beta dependency from source was worth the additional effort.

Polars does support multiple database connection drivers (ADBC, ODBC, SQLAlchemy etc), however, connector-x is noticeably faster. It is worth pointing out that there is an open PR to add support for aarch64 to connector-x, which I expect to fix this issue any day now. In addition, having spoken to the author and core maintainers of Polars, I know big changes are coming. Especially once the ADBC project reaches maturity.

Secondly while developing the original Lazy API solution I encountered an error that I could not resolve. Seeking help in the Polars discord channel, the author suggested a fix, which worked, and requested I log a Github issue. Having logged the issue I was amazed to see it resolved in a matter of hours and in the next release.

Polars is a new library and is in very active development with frequent release cycles (often 1-2 per week). Additionally, the maintainers are super responsive and helpful, thus any issues you might have are quickly resolved.

Results

Post migration we observed that almost all DAGs gained roughly a 2x speed improvement.


DAG execution time reduced after Polars release

The original goal, fixing the out-of-memory error, was not only realised but we were also able to combine 2 separate processes into one. Thereby simplifying the process, avoiding data duplication and improving the sessionizing accuracy.


2 Processes combined using Polars LazyAPI

We also observed a healthy drop in overall resource usage.


Reduction in overall resource utilisation after Polars migration

This allowed us to scale down (avoid constantly scaling up) our services and resulted in a much more stable platform. In addition, this scaling down of resources also resulted in an impressive 25% cost saving on our cloud provider bill for our data stack.

Conclusions

Finally, the Polarification of Check's data stack is complete. With over 100 DAGs currently running in production, the migration took less than 2 weeks (1 sprint) and with no disruption to normal operations.

Having migrated all the DAGs from Pandas to Polars and observing the benefits, it is clear that it was the right decision. Not only did we see a reduction in resource usage but many DAGs gained a speed improvement. This decreases the time from raw data to insights and makes the Check Data Platform more agile.

Polars does the heavy lifting for you. You can focus on solving the problem at hand and Polars will take care of all the optimisations.

With these optimisations, and being written in Rust, Polars uses fewer resources and your cloud infrastructure and wallet will love you.

While migrating from Pandas to Polars wasn't without its challenges it was surprisingly easy. As Polars matures I would not be surprised to see it integrated natively into providers packages similar to Pandas. This will most certainly be a benefit to the whole data engineering industry.

The responsiveness of the maintainers and the supportive community added to our decision to migrate.

For us, the results speak for themselves. Polars not only solved our initial problem but opened the door to new possibilities. We are excited to use Polars on future data engineering projects.

Anyone not yet considering Polars can hopefully learn from our experience and take the leap.

Featured ones: