dev-resources.site
for different kinds of informations.
Real-Time Data Processing with MySQL, Redpanda, MinIO, and Apache Spark Using Delta Lake
In this article, you will learn how to set up a real-time data processing and analytics environment using Docker, MySQL, Redpanda, MinIO, and Apache Spark. We will create a system that generates fake data simulating sensors on a bridge that flash car plates at each passage. The data will be stored in a MySQL database, and processed in real-time using Redpanda and Kafka Connect. We will then use MinIO as a distributed object storage and Apache Spark to further process and analyze the data. Additionally, we will integrate the Twilio API for real-time notifications.
Table of Contents
Introduction
Setting up the environment
Docker Compose configuration
Data generation and storage in MySQL
Creating an API for data ingestion
Setting up connectors for data streaming and storage
- Real-time data processing with Apache Spark
Reading data from MinIO
Data transformation and storage in the data warehouse
Integrating Twilio for real-time notifications
- Conclusion
1. Introduction
In this article, we will walk through the process of setting up a real-time data processing and analytics environment for vehicle plate recognition. We will use Docker to manage our services, MySQL for data storage, Redpanda as a streaming platform, MinIO as an object storage server, and Apache Spark for data processing and analysis. We will also integrate the Twilio API to send SMS notifications in real-time based on the processed data.
2. Setting up the environment
Docker Compose configuration
To begin, we will create a Docker Compose file that defines all the necessary services, networks, and volumes for our environment. The services include Redpanda, MinIO, MySQL, Kafka Connect, Adminer, Spark Master, Spark Workers, Jupyter Notebook, a data generator, and an API.
version: "3.7"
services:
redpanda:
image: vectorized/redpanda
container_name: redpanda
ports:
- "9092:9092"
- "29092:29092"
command:
- redpanda
- start
- --overprovisioned
- --smp
- "1"
- --memory
- "1G"
- --reserve-memory
- "0M"
- --node-id
- "0"
- --kafka-addr
- PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr
- PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092
- --check=false
networks:
- spark_network
redpanda-console:
image: vectorized/console
container_name: redpanda_console
depends_on:
- redpanda
ports:
- "5000:8080"
env_file:
- .env
networks:
- spark_network
minio:
hostname: minio
image: "minio/minio"
container_name: minio
ports:
- "9001:9001"
- "9000:9000"
command: [ "server", "/data", "--console-address", ":9001" ]
volumes:
- ./minio/data:/data
env_file:
- .env
networks:
- spark_network
mc:
image: minio/mc
container_name: mc
hostname: mc
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; exit 0; "
depends_on:
- minio
networks:
- spark_network
mysql:
image: debezium/example-mysql:1.6
container_name: mysql
volumes:
- ./mysql/data:/var/lib/mysql
ports:
- "3306:3306"
env_file:
- .env
networks:
- spark_network
kafka-connect:
build:
context: ./kafka
dockerfile: ./Dockerfile
container_name: kafka_connect
depends_on:
- redpanda
ports:
- "8083:8083"
env_file:
- .env
networks:
- spark_network
adminer:
image: adminer:latest
ports:
- 8085:8080/tcp
deploy:
restart_policy:
condition: on-failure
networks:
- spark_network
spark-master:
build:
context: ./spark
dockerfile: ./Dockerfile
container_name: "spark-master"
environment:
- SPARK_MODE=master
- SPARK_LOCAL_IP=spark-master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- "7077:7077"
- "8080:8080"
volumes:
- ./spark/spark-defaults.conf:/opt/bitnami/spark/conf/spark-defaults.conf
networks:
- spark_network
spark-worker-1:
image: docker.io/bitnami/spark:3.3
container_name: "spark-worker-1"
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- spark_network
spark-worker-2:
image: docker.io/bitnami/spark:3.3
container_name: "spark-worker-2"
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- spark_network
spark-notebook:
build:
context: ./notebooks
dockerfile: ./Dockerfile
container_name: "spark-notebook"
user: root
environment:
- JUPYTER_ENABLE_LAB="yes"
- GRANT_SUDO="yes"
volumes:
- ./notebooks:/home/jovyan/work
- ./notebooks/spark-defaults.conf:/usr/local/spark/conf/spark-defaults.conf
ports:
- "8888:8888"
- "4040:4040"
networks:
- spark_network
generate_data:
build: ./generate_data
container_name: generate_data
command: python generate_data.py
depends_on:
- mysql
networks:
- spark_network
api:
build: ./api
ports:
- "8000:8000"
depends_on:
- mysql
networks:
spark_network:
driver: bridge
name: spark_network
docker-compose up --build -d
Data generation and storage in MySQL
Once our environment is set up, we will generate fake data simulating sensors on a bridge that flash car plates at each passage. The data will include vehicle and owner information, subscription status, and other relevant fields. This data will be stored in a MySQL database and serve as the source of our real-time data processing pipeline.
import random
import uuid
from faker import Faker
import pandas as pd
import mysql.connector
from datetime import datetime, timedelta
# Initialize Faker
fake = Faker()
# Number of data points to generate
num_records = 1000
# Generate synthetic data
data = []
for _ in range(num_records):
unique_id = str(uuid.uuid4())
plate_number = f"{random.randint(1000, 9999)}-{fake.random_element(elements=('AAA', 'BBB', 'CCC', 'DDD', 'EEE', 'FFF', 'GGG', 'HHH', 'III', 'JJJ', 'KKK', 'LLL', 'MMM', 'NNN', 'OOO', 'PPP', 'QQQ', 'RRR', 'SSS', 'TTT', 'UUU', 'VVV', 'WWW', 'XXX', 'YYY', 'ZZZ'))}"
car_info = {
"make": fake.random_element(elements=("Toyota", "Honda", "Ford", "Chevrolet", "Nissan", "Volkswagen", "BMW", "Mercedes-Benz")),
"year": random.randint(2000, 2023)
}
owner_info = {
"name": fake.name(),
"address": fake.address(),
"phone_number": fake.phone_number().replace("x", " ext. ") # Modify phone number format
}
subscription_status = fake.random_element(elements=("active", "expired", "none"))
if subscription_status != "none":
subscription_start = fake.date_between(start_date='-3y', end_date='today')
subscription_end = subscription_start + timedelta(days=365)
else:
subscription_start = None
subscription_end = None
balance = round(random.uniform(0, 500), 2)
timestamp = fake.date_time_between(start_date='-30d', end_date='now').strftime('%Y-%m-%d %H:%M:%S')
record = {
"id": unique_id,
"plate_number": plate_number,
"car_make": car_info["make"],
"car_year": car_info["year"],
"owner_name": owner_info["name"],
"owner_address": owner_info["address"],
"owner_phone_number": owner_info["phone_number"],
"subscription_status": subscription_status,
"subscription_start": subscription_start,
"subscription_end": subscription_end,
"balance": balance,
"timestamp": timestamp
}
data.append(record)
# Convert data to a pandas DataFrame
df = pd.DataFrame(data)
# Connect to the MySQL database
db_config = {
"host": "mysql",
"user": "root",
"password": "debezium",
"database": "inventory"
}
conn = mysql.connector.connect(**db_config)
# Create a cursor
cursor = conn.cursor()
# Create the 'customers' table if it doesn't exist
create_table_query = '''
CREATE TABLE IF NOT EXISTS customers (
id VARCHAR(255) NOT NULL,
plate_number VARCHAR(255) NOT NULL,
car_make VARCHAR(255) NOT NULL,
car_year INT NOT NULL,
owner_name VARCHAR(255) NOT NULL,
owner_address TEXT NOT NULL,
owner_phone_number VARCHAR(255) NOT NULL,
subscription_status ENUM('active', 'expired', 'none') NOT NULL,
subscription_start DATE,
subscription_end DATE,
balance DECIMAL(10, 2) NOT NULL,
timestamp TIMESTAMP NOT NULL
)
'''
cursor.execute(create_table_query)
# Store the synthetic data in the 'customers' table
for index, row in df.iterrows():
insert_query = '''
INSERT INTO customers (id, plate_number, car_make, car_year, owner_name, owner_address, owner_phone_number, subscription_status, subscription_start, subscription_end, balance, timestamp)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
'''
cursor.execute(insert_query, (
row['id'],
row['plate_number'],
row['car_make'],
row['car_year'],
row['owner_name'],
row['owner_address'],
row['owner_phone_number'],
row['subscription_status'],
row['subscription_start'],
row['subscription_end'],
row['balance'],
row['timestamp']
))
# Commit the changes and close the cursor
conn.commit()
cursor.close()
# Close the database connection
conn.close()
print("Synthetic data stored in the 'customers' table in the MySQL database")
Creating an API for data ingestion
To facilitate data ingestion, we will create an API that allows us to send data as JSON objects. This API will be used to insert new data into the MySQL database, simulating the real-time data flow from the sensors on the bridge.
from flask import Flask, request, jsonify, render_template
import mysql.connector
import pandas as pd
app = Flask(__name__, template_folder='template')
db_config = {
"host": "10.0.0.25",
"user": "root",
"password": "debezium",
"database": "inventory"
}
@app.route('/send_data', methods=['POST'])
def send_data():
data = request.get_json()
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
insert_query = '''
INSERT INTO customers (id, plate_number, car_make, car_year, owner_name, owner_address, owner_phone_number, subscription_status, subscription_start, subscription_end, balance, timestamp)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
'''
cursor.execute(insert_query, (
data['id'],
data['plate_number'],
data['car_make'],
data['car_year'],
data['owner_name'],
data['owner_address'],
data['owner_phone_number'],
data['subscription_status'],
data['subscription_start'],
data['subscription_end'],
data['balance'],
data['timestamp']
))
conn.commit()
cursor.close()
conn.close()
return jsonify({"status": "success"}), 200
@app.route('/customers', methods=['GET'])
def customers():
plate_number = request.args.get('plate_number', '')
page = int(request.args.get('page', 1))
items_per_page = 10
conn = mysql.connector.connect(**db_config)
# Create a cursor
cursor = conn.cursor()
# Fetch customers filtered by plate_number and apply pagination
select_query = '''
SELECT * FROM customers
WHERE plate_number LIKE %s
LIMIT %s OFFSET %s
'''
cursor.execute(select_query, (f"%{plate_number}%", items_per_page, (page - 1) * items_per_page))
customers = cursor.fetchall()
# Get the total number of customers
cursor.execute("SELECT COUNT(*) FROM customers WHERE plate_number LIKE %s", (f"%{plate_number}%",))
total_customers = cursor.fetchone()[0]
# Close the cursor and connection
cursor.close()
conn.close()
return render_template('customers.html', customers=customers, plate_number=plate_number, page=page, total_pages=(total_customers // items_per_page) + 1)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
Test api
import requests
data = {
"id": "5a5c562e-4386-44ad-bf6f-bab91081781e",
"plate_number": "7695-OOO",
"car_make": "Ford",
"car_year": 2012,
"owner_name": "Stefen",
"owner_address": "92834 Kim Unions\nPort Harryport, MD 61729",
"owner_phone_number": "your number phone",
"subscription_status": "active",
"subscription_start": None,
"subscription_end": None,
"balance": 100.0,
"timestamp": "2023-03-03T14:37:49",
}
response = requests.post("http://0.0.0.0:8000/send_data", json=data)
print(response.status_code)
print(response.json())
python request.py
my initial balance is $100
Setting up connectors for data streaming and storage
With our data stored in MySQL, we will set up Kafka Connect connectors to stream the data from MySQL to Redpanda and then store it in MinIO, which will serve as our distributed object storage. This data storage will act as the ābronzeā table in our data warehouse.
# create connector source for MySQL
curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "src-mysql",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.include.list": "inventory",
"decimal.handling.mode": "double",
"topic.prefix": "dbserver1",
"schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}'
# create connector sink MySQL to S3
curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "sink_aws-s3",
"config": {
"topics.regex": "dbserver1.inventory.*",
"topics.dir": "inventory",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"flush.size": "1",
"store.url": "http://minio:9000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.region": "us-east-1",
"s3.bucket.name": "warehouse",
"aws.access.key.id": "minio",
"aws.secret.access.key": "minio123"
}
}'
3. Real-time data processing with Apache Spark
Reading data from MinIO
Using Apache Spark, we will read the data stored in MinIO and process it further. This processing will involve selecting relevant fields and transforming the data into a more suitable format for analysis.
Data transformation and storage in the data warehouse
Once we have processed the data, we will store it in a āsilverā table in our data warehouse. This table will be used for further analysis and processing.
Integrating Twilio for real-time notifications
To enhance our real-time data processing pipeline, we will integrate the Twilio API, allowing us to send SMS notifications based on specific conditions or events. For example, we could send an SMS to the vehicle owner when their subscription is about to expire or when their
from datetime import datetime as dt, timedelta, timezone
import pytz
from twilio.rest import Client
from pyspark.sql import Row
from datetime import datetime, timezone
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import BooleanType
import datetime
import mysql.connector
from typing import Optional
# Additional imports
from mysql.connector import Error
TWILIO_ACCOUNT_SID = ''
TWILIO_AUTH_TOKEN = ''
TWILIO_PHONE_NUMBER = ''
client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
silver_data = spark.read.parquet("s3a://warehouse/inventory/silver_data")
def get_rate_for_customer(timestamp, subscription_status):
if subscription_status == 'active':
if 0 <= timestamp.hour < 6 or 11 <= timestamp.hour < 16:
return 2.99
elif 6 <= timestamp.hour < 11 or 16 <= timestamp.hour < 23:
return 3.99
else:
return 9.99
# Add a default rate value to avoid NoneType issues
return 0.0
def is_subscription_active(subscription_start: dt, subscription_end: dt, current_time: dt) -> bool:
return subscription_start <= current_time <= subscription_end
def get_subscription_status(subscription_end: dt, current_time: dt) -> bool:
grace_period = timedelta(days=7)
return current_time <= subscription_end + grace_period
def send_sms(phone_number, message):
try:
client.messages.create(
body=message,
from_=TWILIO_PHONE_NUMBER,
to=phone_number
)
print(f"SMS sent to {phone_number}: {message}")
except Exception as e:
print(f"Error sending SMS: {e}")
from pyspark.sql.functions import col
def is_valid_balance(value):
try:
float(value)
return True
except ValueError:
return False
valid_balance_udf = udf(is_valid_balance, BooleanType())
silver_data = silver_data.filter(valid_balance_udf(col("balance")))
# Database configuration
db_config = {
"host": "mysql",
"user": "root",
"password": "debezium",
"database": "inventory"
}
def update_customer_balance(customer_id, new_balance):
try:
connection = mysql.connector.connect(**db_config)
cursor = connection.cursor()
update_query = "UPDATE customers SET balance = %s WHERE id = %s"
cursor.execute(update_query, (new_balance, customer_id))
connection.commit()
print(f"Updated balance for customer {customer_id}: {new_balance}")
except Error as e:
print(f"Error updating balance: {e}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
from datetime import datetime, timezone
def safe_date_conversion(date_string: Optional[str]) -> dt:
if date_string is None or not isinstance(date_string, str):
return dt(1970, 1, 1, tzinfo=timezone.utc)
try:
return dt.fromisoformat(date_string[:-1]).replace(tzinfo=timezone.utc)
except ValueError:
return dt(1970, 1, 1, tzinfo=timezone.utc)
def process_plate(row: Row) -> None:
print(f"Processing plate: {row.plate_number}")
current_time = dt.now(timezone.utc)
try:
plate_timestamp = dt.fromisoformat(row.timestamp[:-1]).replace(tzinfo=timezone.utc)
except ValueError:
plate_timestamp = dt.fromtimestamp(0, timezone.utc)
subscription_start = safe_date_conversion(row.subscription_start)
subscription_end = safe_date_conversion(row.subscription_end)
is_active = is_subscription_active(subscription_start, subscription_end, current_time)
rate = get_rate_for_customer(plate_timestamp, row.subscription_status)
balance = float(row.balance)
new_balance = balance - rate
if row.subscription_status == 'none':
message = f"Dear {row.owner_name}, your car with plate number {row.plate_number} is not registered. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
send_sms(row.owner_phone_number, message)
elif is_active: # Changed from row.subscription_status == 'active'
message = f"Dear {row.owner_name}, your subscription is active. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
send_sms(row.owner_phone_number, message)
elif not get_subscription_status(subscription_end, current_time):
message = f"Dear {row.owner_name}, your subscription has expired. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
send_sms(row.owner_phone_number, message)
update_customer_balance(row.id, new_balance)
silver_data.foreach(process_plate)
This script is designed to process a dataset containing information about car passages and their owners, including subscription status, balance, plate numbers, and owner details. It reads data from a āsilverā table in a data warehouse, processes the data in real-time, sends SMS notifications to the car owners via the Twilio API, and updates the customerās balance in a MySQL database.
Hereās a breakdown of the script:
Import necessary libraries and modules for the script.
Define Twilio credentials (account SID, auth token, and phone number) for sending SMS notifications.
Create a SparkSession to read data from the āsilverā table.
Define utility functions:
get_rate_for_customer: Calculate the rate based on timestamp and subscription status.
is_subscription_active: Check if a subscription is active.
get_subscription_status: Check if a subscription is within the grace period.
send_sms: Send an SMS using the Twilio API.
is_valid_balance: Check if a given balance is valid (convertible to a float).
update_customer_balance: Update the customer balance in the MySQL database.
safe_date_conversion: Convert a date string to a datetime object, handling errors and missing values.
process_plate: Process each plate record, calculate the rate, send SMS notifications, and update the customer balance.
Register a User-Defined Function (UDF) valid_balance_udf that filters records with valid balance values.
Filter the dataset to keep records with valid balances using the valid_balance_udf.
Define database configuration for connecting to the MySQL database.
Use the foreach action to process each plate record using the process_plate function. This includes checking subscription status, calculating the rate, sending SMS notifications, and updating the customer balance.
gold_data.write.parquet("s3a://warehouse/inventory/gold_data", mode="overwrite")
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
class MetricsAdapter:
def __init__(self, silver_table, warehouse_path):
self.silver_table = silver_table
self.warehouse_path = warehouse_path
def show_metrics(self):
daily_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/daily_metrics')
weekly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/weekly_metrics')
monthly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/monthly_metrics')
quarterly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/quarterly_metrics')
yearly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/yearly_metrics')
subscription_status_count = silver_data.groupBy("subscription_status").count()
print("Daily Metrics:")
daily_metrics.show(5)
print("Weekly Metrics:")
weekly_metrics.show(5)
print("Monthly Metrics:")
monthly_metrics.show(5)
print("Quarterly Metrics:")
quarterly_metrics.show(5)
print("Yearly Metrics:")
yearly_metrics.show(5)
def transform(self):
# Calculate the week, month, quarter, and year from the timestamp
time_based_metrics = self.silver_table.withColumn("date", F.to_date("timestamp")) \
.withColumn("year", F.year("timestamp")) \
.withColumn("quarter", F.quarter("timestamp")) \
.withColumn("month", F.month("timestamp")) \
.withColumn("week_of_year", F.weekofyear("timestamp")) \
.withColumn("total_passages", F.lit(1)) \
.withColumn("total_revenue", F.when(self.silver_table.timestamp.substr(12, 2).cast("int") < 12, 2.99).otherwise(3.99))
# Daily metrics
daily_metrics = time_based_metrics.groupBy("date").agg(
F.count("*").alias("total_passages"),
F.sum(F.when(time_based_metrics.timestamp.substr(12, 2).cast("int") < 12, 2.99).otherwise(3.99)).alias("total_revenue")
)
daily_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/daily_metrics')
# Weekly metrics
weekly_metrics = time_based_metrics.groupBy("year", "week_of_year").agg(
F.sum("total_passages").alias("total_passages"),
F.sum("total_revenue").alias("total_revenue")
)
weekly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/weekly_metrics')
# Monthly metrics
monthly_metrics = time_based_metrics.groupBy("year", "month").agg(
F.sum("total_passages").alias("total_passages"),
F.sum("total_revenue").alias("total_revenue")
)
monthly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/monthly_metrics')
# Quarterly metrics
quarterly_metrics = time_based_metrics.groupBy("year", "quarter").agg(
F.sum("total_passages").alias("total_passages"),
F.sum("total_revenue").alias("total_revenue")
)
quarterly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/quarterly_metrics')
# Yearly metrics
yearly_metrics = time_based_metrics.groupBy("year").agg(
F.sum("total_passages").alias("total_passages"),
F.sum("total_revenue").alias("total_revenue")
)
yearly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/yearly_metrics')
# Example usage
spark = SparkSession.builder.getOrCreate()
silver_data = spark.read.parquet("s3a://warehouse/inventory/silver_data")
warehouse_path = "s3a://warehouse/inventory/gold_data"
metrics_adapter = MetricsAdapter(silver_data, warehouse_path)
metrics_adapter.transform()
metrics_adapter.show_metrics()
The code calculates daily, weekly, monthly, quarterly, and yearly metrics, such as total passages and total revenue. It also defines a MetricsAdapter class that encapsulates the data transformation and metrics display logic.
The first line of code:
gold_data.write.parquet("s3a://warehouse/inventory/gold_data", mode="overwrite")
writes the gold_data DataFrame to the specified S3 bucket in Parquet format, with the overwrite mode, which replaces any existing data in the destination.
The MetricsAdapter class has two primary methods: transform() and show_metrics().
transform() method:
Calculates the date, year, quarter, month, and week of the year from the timestamp.
Aggregates the data based on different time granularities (daily, weekly, monthly, quarterly, and yearly) using the groupBy and agg functions.
Writes the aggregated metrics into Parquet format on the specified S3 bucket using Delta Lake format, which provides ACID transactions, versioning, and schema evolution for large-scale data lakes.
show_metrics() method:
Reads the metrics data from the S3 bucket and formats it as Delta Lake.
Displays the top 5 records of daily, weekly, monthly, quarterly, and yearly metrics using the show() function.
Finally, the example usage part of the code initializes a SparkSession, reads the silver_data from the S3 bucket, creates a MetricsAdapter instance with silver_data and the warehouse path, calls the transform() method to aggregate the data, and then calls the show_metrics() method to display the results.
Conclusion
In this article, we have demonstrated how to set up a real-time data processing and analytics environment using Docker, MySQL, Redpanda, MinIO, and Apache Spark. We created a system that generates fake data simulating a sensor, stores it in a MySQL database, and processes it in real-time using Redpanda and Kafka Connect. We then utilized MinIO as a distributed object storage and Apache Spark to further process and analyze the data. Additionally, we integrated the Twilio API for real-time notifications.
This project showcases the potential of using modern data processing tools to handle real-time scenarios, such as monitoring car passages on a bridge and notifying car owners about their subscription status and balance. The combination of these technologies enables scalable and efficient data processing, as well as the ability to respond quickly to changes in the data.
The knowledge gained from this project can be applied to various other real-time data processing and analytics use cases. By understanding and implementing these technologies, you can build powerful and efficient systems that are able to handle large amounts of data and provide valuable insights in real-time.
https://github.com/Stefen-Taime/stream-ingestion-redpanda-minio.git
Featured ones: