Logo

dev-resources.site

for different kinds of informations.

Quick tip: Using SingleStore Spark Connector's Query Pushdown with SingleStore Notebooks

Published at
3/31/2024
Categories
singlestoredb
apachespark
querypushdown
Author
veryfatboy
Author
10 person written this
veryfatboy
open
Quick tip: Using SingleStore Spark Connector's Query Pushdown with SingleStore Notebooks

Abstract

The Singlestore Spark Connector supports the rewriting of Spark query execution plans into SingleStore queries, for both SQL and DataFrame operations. Computation is pushed into the SingleStore system automatically. In this short article, we'll look at an example of query pushdown.

The notebook file used in this article is available on GitHub.

Create a SingleStore Cloud account

A previous article showed the steps to create a free SingleStore Cloud account. We'll use the following settings:

  • Workspace Group Name: Spark Demo Group
  • Cloud Provider: AWS
  • Region: US East 1 (N. Virginia)
  • Workspace Name: spark-demo
  • Size: S-00

We'll make a note of the password and store it in the secrets vault using the name password.

Create a new notebook

From the left navigation pane in the cloud portal, we'll select DEVELOP > Data Studio.

In the top right of the web page, we'll select New Notebook > New Notebook, as shown in Figure 1.

Figure 1. New Notebook.

Figure 1. New Notebook.

We'll call the notebook spark_pushdown_demo, select a Blank notebook template from the available options, and save it in the Personal location.

Fill out the notebook

First, let's install Java:

!conda install -y --quiet -c conda-forge openjdk=8
Enter fullscreen mode Exit fullscreen mode

Next, we'll create a directory to store some jar files:

os.makedirs("jars", exist_ok = True)
Enter fullscreen mode Exit fullscreen mode

We'll now download some jar files, as follows:

def download_jar(url, destination):
    response = requests.get(url)
    with open(destination, "wb") as f:
        f.write(response.content)

jar_urls = [
    ("https://repo1.maven.org/maven2/com/singlestore/singlestore-jdbc-client/1.2.4/singlestore-jdbc-client-1.2.4.jar", "jars/singlestore-jdbc-client-1.2.4.jar"),
    ("https://repo1.maven.org/maven2/com/singlestore/singlestore-spark-connector_2.12/4.1.8-spark-3.5.0/singlestore-spark-connector_2.12-4.1.8-spark-3.5.0.jar", "jars/singlestore-spark-connector_2.12-4.1.8-spark-3.5.0.jar"),
    ("https://repo1.maven.org/maven2/org/apache/commons/commons-dbcp2/2.12.0/commons-dbcp2-2.12.0.jar", "jars/commons-dbcp2-2.12.0.jar"),
    ("https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.12.0/commons-pool2-2.12.0.jar", "jars/commons-pool2-2.12.0.jar"),
    ("https://repo1.maven.org/maven2/io/spray/spray-json_3/1.3.6/spray-json_3-1.3.6.jar", "jars/spray-json_3-1.3.6.jar")
]

for url, destination in jar_urls:
    download_jar(url, destination)

print("JAR files downloaded successfully")
Enter fullscreen mode Exit fullscreen mode

These jar files include the SingleStore JDBC Client and the SingleStore Spark Connector, as well as several other jar files needed for connectivity and data management.

Now we are ready to create a SparkSession:

# Create a Spark session
spark = (SparkSession
             .builder
             .config("spark.jars", ",".join([destination for _, destination in jar_urls]))
             .appName("Spark Pushdown Test")
             .getOrCreate()
        )

spark.sparkContext.setLogLevel("ERROR")
Enter fullscreen mode Exit fullscreen mode

Next, we'll obtain a weather dataset and load it using Pandas. The data are from the CORGIS Dataset Project on GitHub. The weather.csv file contains weather data for cities across the United States for 2016.

url = "https://raw.githubusercontent.com/corgis-edu/corgis/master/website/datasets/csv/weather/weather.csv"

pandas_df = pd.read_csv(url)
Enter fullscreen mode Exit fullscreen mode

We can check the first few rows:

pandas_df.head()
Enter fullscreen mode Exit fullscreen mode

Next, we'll define a Spark DataFrame schema:

schema = StructType([
    StructField("Precipitation", FloatType(), True),
    StructField("Date", StringType(), True),
    StructField("Month", IntegerType(), True),
    StructField("Week", IntegerType(), True),
    StructField("Year", IntegerType(), True),
    StructField("City", StringType(), True),
    StructField("Code", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Avg", IntegerType(), True),
    StructField("Max", IntegerType(), True),
    StructField("Min", IntegerType(), True),
    StructField("Wind_Direction", IntegerType(), True),
    StructField("Wind_Speed", FloatType(), True)
])
Enter fullscreen mode Exit fullscreen mode

This schema shortens some of the original column names in the CSV file.

We'll now create a Spark DataFrame using the Pandas data and the schema defined above:

spark_df = spark.createDataFrame(pandas_df, schema)
Enter fullscreen mode Exit fullscreen mode

We'll now check the number of rows:

spark_df.count()
Enter fullscreen mode Exit fullscreen mode

The output should be:

16743
Enter fullscreen mode Exit fullscreen mode

Now we'll use Plotly Express to create a line chart:

def plot_data(df, x_col, y_cols, title):
    df = df.orderBy(x_col)
    fig = px.line(
        df.toPandas(),
        x = x_col,
        y = y_cols, 
        title = title
    )
    fig.show()
Enter fullscreen mode Exit fullscreen mode

and create a plot using the Spark DataFrame:

plot_data(
    spark_df.filter(spark_df["City"] == "San Francisco"),
    "Date",
    ["Max", "Min"],
    "Max and Min Temperatures in San Francisco (Fahrenheit)"
)
Enter fullscreen mode Exit fullscreen mode

The output should be as shown in Figure 2.

Figure 2. Max and Min in Fahrenheit for San Francisco.

Figure 2. Max and Min in Fahrenheit for San Francisco.

A database is required, so we'll create one:

DROP DATABASE IF EXISTS spark_demo;
CREATE DATABASE IF NOT EXISTS spark_demo;
Enter fullscreen mode Exit fullscreen mode

We'll now prepare the connection to SingleStore:

from sqlalchemy import *

db_connection = create_engine(connection_url)
url = db_connection.url
Enter fullscreen mode Exit fullscreen mode

Now we'll create the Spark connection to SingleStore:

password = get_secret("password")
host = url.host
port = url.port
cluster = host + ":" + str(port)
Enter fullscreen mode Exit fullscreen mode

We also need to set some configuration parameters:

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
Enter fullscreen mode Exit fullscreen mode

We'll now write the DataFrame:

(spark_df.write
    .format("singlestore")
    .option("loadDataCompression", "LZ4")
    .mode("overwrite")
    .save("spark_demo.weather")
)
Enter fullscreen mode Exit fullscreen mode

In this case, the weather table will be created for us.

Next, we'll read the data back into a new DataFrame:

new_df = (spark.read
    .format("singlestore")
    .load("spark_demo.weather")
)
Enter fullscreen mode Exit fullscreen mode

Next, we'll create a temporary Spark table, as follows:

new_df.createOrReplaceTempView("temperatures")
Enter fullscreen mode Exit fullscreen mode

We'll now create and register a Python UDF to convert temperatures from Fahrenheit to Celsius:

def convert_to_c(f):
    c = (f - 32) * (5 / 9)
    return int(round(c))

spark.udf.register("convert_to_c", convert_to_c, IntegerType())
Enter fullscreen mode Exit fullscreen mode

Now we'll formulate a query that uses the Python UDF:

temp_df = spark.sql(
    "SELECT Date, convert_to_c(Max) as Max_C, convert_to_c(Min) as Min_C FROM temperatures WHERE City = 'San Francisco'"
)
Enter fullscreen mode Exit fullscreen mode

Adding .explain() to the end of the query shows us the Physical Query Plan:

temp_df.explain()
Enter fullscreen mode Exit fullscreen mode

The output should look similar to the following:

== Physical Plan ==
*(2) Project [Date#91, pythonUDF0#125 AS Max_C#118, pythonUDF1#126 AS Min_C#119]
+- BatchEvalPython [convert_to_c(Max#100)#120, convert_to_c(Min#101)#121], [pythonUDF0#125, pythonUDF1#126]
   +- *(1) Scan 
---------------
SingleStore Query
Variables: (San Francisco)
SQL:
SELECT `Date#1` , `Max#4` , `Min#5` 
FROM (

  SELECT `Date#1` , `Max#4` , `Min#5` 
  FROM (

    SELECT * 
    FROM (
      SELECT ( `Precipitation` ) AS `Precipitat#8` , ( `Date` ) AS `Date#1` , ( `Month` ) AS `Month#9` , ( `Week` ) AS `Week#10` , ( `Year` ) AS `Year#11` , ( `City` ) AS `City#12` , ( `Code` ) AS `Code#13` , ( `Location` ) AS `Location#14` , ( `State` ) AS `State#15` , ( `Avg` ) AS `Avg#16` , ( `Max` ) AS `Max#4` , ( `Min` ) AS `Min#5` , ( `Wind_Direction` ) AS `Wind_Direc#17` , ( `Wind_Speed` ) AS `Wind_Speed#18` 
      FROM (
        SELECT * FROM `spark_demo`.`weather`
      ) AS `a2`
    ) AS `a3` 
    WHERE ( ( `City#12` = ? ) AND ( `City#12` ) IS NOT NULL )
  ) AS `a4`
) AS `a5`

EXPLAIN:
Gather partitions:all alias:remote_0 parallelism_level:segment
Project [a5.Date AS `Date#1`, a5.Max AS `Max#4`, a5.Min AS `Min#5`]
ColumnStoreFilter [a5.City = 'San Francisco' AND a5.City IS NOT NULL]
ColumnStoreScan spark_demo.weather AS a5, SORT KEY Precipitation (Precipitation) table_type:sharded_columnstore
---------------
       [Date#91,Max#100,Min#101] PushedFilters: [], ReadSchema: struct<Date:string,Max:int,Min:int>
Enter fullscreen mode Exit fullscreen mode

The final plan shows a single projection on top of a scan. We can also see query pushdown to SingleStore.

Finally, we can plot the data:

plot_data(
    temp_df,
    "Date",
    ["Max_C", "Min_C"],
    "Max and Min Temperatures in San Francisco (Celsius)"
)
Enter fullscreen mode Exit fullscreen mode

The output should be as shown in Figure 3.

Figure 3. Max and Min in Celsius for San Francisco.

Figure 3. Max and Min in Celsius for San Francisco.

Finally, we'll stop the SparkSession:

# Stop the Spark session
spark.stop()
Enter fullscreen mode Exit fullscreen mode

Summary

In this example, we've seen the ease with which we can use the SingleStore Spark Connector for query pushdown.

The key benefits of the SingleStore Spark Connector can be summarised as follows:

  • Implemented as a native Spark SQL plugin
  • Accelerates ingest from Spark via compression
  • Supports data loading and extraction from database tables and Spark DataFrames
  • Integrates with the Catalyst query optimiser and supports robust SQL pushdown
  • Accelerates ML workloads
singlestoredb Article's
30 articles in total
Favicon
Quick tip: Visualising the Air Quality Index (AQI) across Punjab, Pakistan and India
Favicon
Quick tip: Using SingleStore with OpenAI's Swarm
Favicon
Quick tip: Using SingleStore and WebAssembly for Sentiment Analysis of Stack Overflow Comments
Favicon
Quick tip: Building Predictive Analytics for Loan Approvals
Favicon
Quick tip: Build Vector Embeddings for Video via Python Notebook & OpenAI CLIP
Favicon
Quick tip: SingleStore Kai support for MongoDB $vectorSearch
Favicon
Quick tip: Using SingleStore with PyIceberg
Favicon
Quick tip: Using SingleStore for Iceberg Catalog Storage
Favicon
Quick tip: Using picoGPT in the SingleStore portal
Favicon
Quick tip: Ollama + SingleStore - LangChain = :-(
Favicon
Quick tip: How to Build Local LLM Apps with Ollama and SingleStore
Favicon
Quick tip: Using R, OpenAI and SingleStore Notebooks
Favicon
Quick tip: Write numpy arrays directly to the SingleStore VECTOR data type
Favicon
Quick tip: Using R, Rayshader and SingleStore Notebooks
Favicon
Quick tip: Using R with SingleStore Notebooks
Favicon
Quick tip: Using Apache Spark and GraphFrames with SingleStore Notebooks
Favicon
Quick tip: Using Apache Spark Structured Streaming with SingleStore Notebooks
Favicon
Quick tip: Using SingleStore Spark Connector's Query Pushdown with SingleStore Notebooks
Favicon
Quick tip: Using the SingleStore Spark Connector with SingleStore Notebooks
Favicon
Quick tip: Using Apache Spark with SingleStore Notebooks for Fraud Detection
Favicon
Quick tip: Cosine Similarity revisited in SingleStore
Favicon
Quick tip: Using Apache Spark with SingleStore Notebooks
Favicon
Quick tip: Using Approximate Nearest Neighbor (ANN) Search with SingleStoreDB
Favicon
Quick tip: Using the new VECTOR data type and Infix Operators in SingleStoreDB
Favicon
Quick tip: Dot Product, Euclidean Distance and Cosine Similarity in SingleStoreDB
Favicon
Vector Databases & AI Applications for Dummies
Favicon
Quick tip: Analysing Stock Tick Data in SingleStoreDB using LangChain and OpenAI's Whisper
Favicon
Quick tip: Replicating JSON data from MongoDB to SingleStore Kai and creating OpenAI embeddings
Favicon
Quick tip: Streaming data from MongoDB Atlas to SingleStore Kai using Kafka and CDC
Favicon
Quick tip: Using LangChain's SQLDatabaseToolkit with SingleStoreDB

Featured ones: