Logo

dev-resources.site

for different kinds of informations.

Batch Processing using PySpark on AWS EMR

Published at
11/11/2023
Categories
datapipeline
pyspark
dataengineering
amazonwebservices
Author
aiwithqasim
Author
11 person written this
aiwithqasim
open
Batch Processing using PySpark on AWS EMR

Are you a Data Engineer and want to do hands-on on AWS services? This blog is about batch data processing using AWS services, you will learn to do batch processing using AWS services: S3 as storage, EMR as processing cluster, and Athena for querying the processed results.

About Batch Data Pipeline:

The Wikipedia Activity data will be put into a folder in the S3 bucket. We will have PySpark code that will run on the EMR cluster. This code will fetch the data from the S3 bucket, perform filtering and aggregation on this data, and push the processed data back into S3 in another folder. We will then use Athena to query this processed data present in S3. We will create a table on top of the processed data by providing the relevant schema and then use ANSI SQL to query the data.

Architecture Diagram:

Architecture Diagram

Languages - Python
Package - PySpark
Services - AWS EMR, AWS S3, AWS Athena.

Dataset:

We'll be using the Wikipedia activity logs JSON dataset that has a huge payload comprising 15+ fields

NOTE: In our Script created we'll take two conditions into consideration that we want only those payloads where isRobot is False & user country is from United Estate

dataset

Steps of Working:

1- Create an S3 bucket with a suitable name i:e., emr-batchprocessing-raw-useast1-Account ID-dev & inside the bucket create folders i:e.,

  • input-source (upload your dataset here it will be your source folder),

  • output-destination (According to scenarios processed from AWS EMR data will be dumped here for further processing) &

  • logs (AWS EMR logs will be saved here. We'll specify this directory during creation of EMR)

S3 bucket Creation

2- Goto EC2-keypair & create Key pair for using that into EMR cluster creation

EC2 key pair

3- Now, We have to create an AWS EMR cluster for that go to AWS EMR from AWS Console & choose EMR on EC2: Clusters

EMR clsuter creation

4- During the Cluster creation provide some suitable name in my case I have provided "emr-batch-processing" & selected spark as the processing engine.

Image description

5- As EMR stands for Elastic Map Reduce & it works under the rule of distributed processing we need master node and woker/core nodes for processing. Note: I removed the Task nodes here during creation

Image description

6- For cluster scaling and provisioning let's go with 2 woker/core nodes since our working in minimal and realistic

EMR Scaling

NOTE: Keep default setting for _Networking _& Cluster termination

7- Select EC2 key pair that we created in Step 2 so that we can do SSH using the terminal

8- For cluster logs Let's choose the Log folder that we created during the Bucket creation step.

EMR logs directory

9- Lastly we Need to create the Amazon EMR service role for Identity and Access Management (IAM) & Similarly Instance Profile for EC2 instance profile for Amazon EMR. After that review steps & Click on Create Create EMR Cluster.

Service Role & Instance Profile

10- Let's create a script that we want to run into our EMR cluster. NOTE: The code given below is only one filtering script for full scripts please refer to GitHub.

Github link: https://github.com/aiwithqasim/emr-batch-processing



from pyspark.sql import SparkSession

S3_DATA_INPUT_PATH="<<bucket link to source dataset>>"
S3_DATA_OUTPUT_PATH_FILTERED="<<bucket link to output folder>>/filtered"

def main():
    spark = SparkSession.builder.appName('EMRBathcProcessing').getOrCreate()
    df = spark.read.json(S3_DATA_INPUT_PATH)
    print(f'The total number of records in the source data set is {df.count()}')
    filtered_df = df.filter((df.isRobot == False) & (df.countryName == 'United States'))
    print(f'The total number of records in the filtered data set is {filtered_df.count()}')
    filtered_df.show(10)
    filtered_df.printSchema()
    filtered_df.write.mode('overwrite').parquet(S3_DATA_OUTPUT_PATH_FILTERED)
    print('The filtered output is uploaded successfully')

if __name__ == '__main__':
    main()


Enter fullscreen mode Exit fullscreen mode

11- Make sure the EMR cluster you created has SSH port 22 open for cluster connection from local _Terminal _ or _Putty _

12- Connect to your AWS EMR EC2 instance using the connection command as shown below:

connectin to AWS EMR

13- Create the main script (include the script that we created above) using the Linux command and submit the code using spark-submit main.py

creating & submitting spark script

14- After completion validate that the code ran successfully & & the terminal has print schema as shown below

code completetion Validation

and also & the S3 bucket has processed data.

Prcessed data in S3

15- Goto AWS Athena query editor

  • Create a table using data in the S3 output folder (processed data)

  • Make sure the table& databaseare created properly

  • Created a new queryto select data from the table created

  • Make sure the query is returning the result properly.

Querying using Athena

Conclusion

In a professional data engineering career, you have various scenarios where data gets collected every day. The data can be processed once a day, i.e., batch processed, and the processed results are stored in a location to derive insights and take appropriate action based on the insights. In this blog, we have implemented a batch-processing pipeline using AWS services. We have taken a day’s worth of data related to Wikipedia, and performed batch processing on it.

For more such content please follow:

pyspark Article's
30 articles in total
Favicon
Infraestrutura para análise de dados com Jupyter, Cassandra, Pyspark e Docker
Favicon
Intro to Data Analysis using PySpark
Favicon
Azure Synapse PySpark Toolbox Contents
Favicon
Azure Synapse PySpark Toolbox 001: Input/Output
Favicon
Mastering Dynamic Allocation in Apache Spark: A Practical Guide with Real-World Insights
Favicon
Auditoria massiva com Lineage Tables do UC no Databricks
Favicon
Platform to practice PySpark Questions
Favicon
Entendendo e aplicando estratégias de tunning Apache Spark
Favicon
[API Databricks como serviço interno] dbutils — notebook.run, widgets.getArgument, widgets.text e notebook_params
Favicon
Pytest Mocks, o que são?
Favicon
Achieving Clean and Scalable PySpark Code: A Guide to Avoiding Redundancy
Favicon
Real-Time Streaming Analytics with PySpark on AWS using Kinesis and Redshift.
Favicon
Hiring Alert!
Favicon
PySpark optimization techniques
Favicon
Creating a data pipeline using Dataproc workflow templates and cloud Schedule
Favicon
Running pyspark jobs on Google Cloud Dataproc
Favicon
Calling All Senior Data Engineering Innovators!
Favicon
Comprehensive Guide to Schema Inference with MongoDB Spark Connector in PySpark
Favicon
Checking object existence in large AWS S3 buckets using Python and PySpark (plus some grep comparison)
Favicon
Troubleshooting Kafka Connectivity with spark streaming
Favicon
PySpark: missing value
Favicon
Spark: Introduction
Favicon
Template for design document of Apache Spark project
Favicon
Building an Anime Recommendation System with PySpark in SageMaker
Favicon
PySpark & Apache Spark - Overview
Favicon
Batch Processing using PySpark on AWS EMR
Favicon
Running PySpark in JupyterLab on a Raspberry Pi
Favicon
Python Interpreter in Docker and Pyspark Tests in Docker
Favicon
Apply Function Only Works on the First 1000 Rows of PySpark.Pandas DF
Favicon
create UDF in pyspark to join 2 tables

Featured ones: