dev-resources.site
for different kinds of informations.
Quick tip: Using SingleStore with PyIceberg
Abstract
In a previous article, we implemented an Iceberg catalog using SingleStore and JDBC. Another way that we can create the catalog is using PyIceberg. In this article, we'll see how.
The notebook file used in this article is available on GitHub.
Introduction
PyIceberg is a Python library that provides a native interface for working with Apache Iceberg. It allows users to efficiently handle table metadata, schema evolution, and data partitioning. PyIceberg provides methods to interact with Iceberg tables, independent of the processing engine. By offering a flexible API, PyIceberg helps users manage data more effectively.
Create a SingleStoreDB Cloud account
A previous article showed the steps to create a free SingleStoreDB Cloud account. We'll use the following settings:
- Workspace Group Name: PyIceberg Demo Group
- Cloud Provider: AWS
- Region: US East 1 (N. Virginia)
- Workspace Name: pyiceberg-demo
- Size: S-00
We'll make a note of the password and store it in the secrets vault using the name password
.
Import the notebook
We'll download the notebook from GitHub.
From the left navigation pane in the SingleStore cloud portal, we'll select DEVELOP > Data Studio.
In the top right of the web page, we'll select New Notebook > Import From File. We'll use the wizard to locate and import the notebook we downloaded from GitHub.
Run the notebook
After checking that we are connected to our SingleStore workspace, we'll run the cells one by one.
We'll use PyIceberg to create a tiny Iceberg Lakehouse in the SingleStore portal for testing purposes.
For production environments, please use a robust file system for your Lakehouse.
In the Iceberg Lakehouse, we'll store the Iris flower data set. We'll first download the Iris CSV file into a Pandas Dataframe.
We'll need to create a SingleStore database to use with Iceberg:
DROP DATABASE IF EXISTS iris_db;
CREATE DATABASE IF NOT EXISTS iris_db;
And a table to store some of our Iris data:
DROP TABLE IF EXISTS iris;
CREATE TABLE IF NOT EXISTS iris (
sepal_length FLOAT,
sepal_width FLOAT,
petal_length FLOAT,
petal_width FLOAT,
species VARCHAR(20)
);
A quick and easy way to find the connection details for the database is to use the following:
from sqlalchemy import *
db_connection = create_engine(connection_url)
url = db_connection.url
The url
will contain the host
, the port
, and the database
name. We'll use these details for storing the catalog in SingleStore.
Next, from the Pandas Dataframe, we'll save all Iris-virginica
records in SingleStore:
pandas_df[pandas_df["species"] == "Iris-virginica"].to_sql(
"iris",
con = db_connection,
if_exists = "append",
index = False
)
We'll then create the Iceberg catalog and namespace in SingleStore:
from pyiceberg.catalog.sql import SqlCatalog
config = {
"uri": f"singlestoredb://admin:{password}@{url.host}:{url.port}/{url.database}",
"warehouse": "warehouse",
}
catalog = SqlCatalog(
name = "s2_catalog",
**config
)
catalog.create_namespace("default")
Next, we'll create an Iceberg table from the Pandas Dataframe:
import pyarrow as pa
df = pa.Table.from_pandas(pandas_df)
table = catalog.create_table(
table_identifier,
schema = df.schema
)
and store it in our Lakehouse:
table.append(df)
len(table.scan().to_arrow())
Now, we'll retrieve all records, except Iris-virginica
:
df = table.scan(row_filter = "species != 'Iris-virginica'").to_arrow()
and overwrite the existing table:
table.overwrite(df)
len(table.scan().to_arrow())
So, this has deleted all Iris-virginica
records in the Lakehouse. We can check this as follows:
arrow_table = table.scan().to_arrow()
species_counts = arrow_table["species"].value_counts()
print(species_counts.to_pandas())
Example output:
0 {'values': 'Iris-setosa', 'counts': 50}
1 {'values': 'Iris-versicolor', 'counts': 50}
dtype: object
From SingleStore, we'll retrieve the previously saved Iris-virginica
records:
new_df = pd.read_sql(
"SELECT * FROM iris WHERE species = 'Iris-virginica'",
con = db_connection
)
and append these to the Iceberg Lakehouse:
table.append(pa.Table.from_pandas(new_df))
len(table.scan().to_arrow())
So, this restores the Iceberg Lakehouse to the full Iris data set. We can check this, as follows:
arrow_table = table.scan().to_arrow()
species_counts = arrow_table["species"].value_counts()
print(species_counts.to_pandas())
Example output:
0 {'values': 'Iris-virginica', 'counts': 50}
1 {'values': 'Iris-setosa', 'counts': 50}
2 {'values': 'Iris-versicolor', 'counts': 50}
dtype: object
Summary
In this short article, we've seen how to store an Iceberg catalog in SingleStore using PyIceberg. We've also stored some data in SingleStore and used that to restore an Iceberg Lakehouse. In a future article, we'll look at snapshots and time travel.
Featured ones: