dev-resources.site
for different kinds of informations.
Batch Processing using PySpark on AWS EMR
Are you a Data Engineer and want to do hands-on on AWS services? This blog is about batch data processing using AWS services, you will learn to do batch processing using AWS services: S3 as storage, EMR as processing cluster, and Athena for querying the processed results.
About Batch Data Pipeline:
The Wikipedia Activity data will be put into a folder in the S3 bucket. We will have PySpark code that will run on the EMR cluster. This code will fetch the data from the S3 bucket, perform filtering and aggregation on this data, and push the processed data back into S3 in another folder. We will then use Athena to query this processed data present in S3. We will create a table on top of the processed data by providing the relevant schema and then use ANSI SQL to query the data.
Architecture Diagram:
Languages - Python
Package - PySpark
Services - AWS EMR, AWS S3, AWS Athena.
Dataset:
We'll be using the Wikipedia activity logs JSON dataset that has a huge payload comprising 15+ fields
NOTE: In our Script created we'll take two conditions into consideration that we want only those payloads where isRobot is False & user country is from United Estate
Steps of Working:
1- Create an S3 bucket with a suitable name i:e., emr-batchprocessing-raw-useast1-Account ID-dev & inside the bucket create folders i:e.,
input-source (upload your dataset here it will be your source folder),
output-destination (According to scenarios processed from AWS EMR data will be dumped here for further processing) &
logs (AWS EMR logs will be saved here. We'll specify this directory during creation of EMR)
2- Goto EC2-keypair & create Key pair for using that into EMR cluster creation
3- Now, We have to create an AWS EMR cluster for that go to AWS EMR from AWS Console & choose EMR on EC2: Clusters
4- During the Cluster creation provide some suitable name in my case I have provided "emr-batch-processing" & selected spark as the processing engine.
5- As EMR stands for Elastic Map Reduce & it works under the rule of distributed processing we need master node and woker/core nodes for processing. Note: I removed the Task nodes here during creation
6- For cluster scaling and provisioning let's go with 2 woker/core nodes since our working in minimal and realistic
NOTE: Keep default setting for _Networking _& Cluster termination
7- Select EC2 key pair that we created in Step 2 so that we can do SSH using the terminal
8- For cluster logs Let's choose the Log folder that we created during the Bucket creation step.
9- Lastly we Need to create the Amazon EMR service role for Identity and Access Management (IAM) & Similarly Instance Profile for EC2 instance profile for Amazon EMR. After that review steps & Click on Create Create EMR Cluster.
10- Let's create a script that we want to run into our EMR cluster. NOTE: The code given below is only one filtering script for full scripts please refer to GitHub.
Github link: https://github.com/aiwithqasim/emr-batch-processing
from pyspark.sql import SparkSession
S3_DATA_INPUT_PATH="<<bucket link to source dataset>>"
S3_DATA_OUTPUT_PATH_FILTERED="<<bucket link to output folder>>/filtered"
def main():
spark = SparkSession.builder.appName('EMRBathcProcessing').getOrCreate()
df = spark.read.json(S3_DATA_INPUT_PATH)
print(f'The total number of records in the source data set is {df.count()}')
filtered_df = df.filter((df.isRobot == False) & (df.countryName == 'United States'))
print(f'The total number of records in the filtered data set is {filtered_df.count()}')
filtered_df.show(10)
filtered_df.printSchema()
filtered_df.write.mode('overwrite').parquet(S3_DATA_OUTPUT_PATH_FILTERED)
print('The filtered output is uploaded successfully')
if __name__ == '__main__':
main()
11- Make sure the EMR cluster you created has SSH port 22 open for cluster connection from local _Terminal _ or _Putty _
12- Connect to your AWS EMR EC2 instance using the connection command as shown below:
13- Create the main script (include the script that we created above) using the Linux command and submit the code using spark-submit main.py
14- After completion validate that the code ran successfully & & the terminal has print schema as shown below
and also & the S3 bucket has processed data.
15- Goto AWS Athena query editor
Create a table using data in the S3 output folder (processed data)
Make sure the table& databaseare created properly
Created a new queryto select data from the table created
Make sure the query is returning the result properly.
Conclusion
In a professional data engineering career, you have various scenarios where data gets collected every day. The data can be processed once a day, i.e., batch processed, and the processed results are stored in a location to derive insights and take appropriate action based on the insights. In this blog, we have implemented a batch-processing pipeline using AWS services. We have taken a day’s worth of data related to Wikipedia, and performed batch processing on it.
For more such content please follow:
LinkedIn: https://www.linkedin.com/in/qasimhassan/
GitHub: https://github.com/aiwithqasim
Join our AWS Data Engineering WhastApp Group
Featured ones: