dev-resources.site
for different kinds of informations.
Setting up a Queue Ingestion system for S3 to Redshift Transfer
I've always been curious about data pipelines and how we are able to work with data engineering in the middle of everything in my company.
Literally one object moves in 500 different formats through 500 different systems for perhaps 500 different purposes.
The base object is a transaction that has all the raw L0 data stored in Object Storage. However we cannot pull out Object reads all day for analytics, so this needs to be moved into a separate database. Let me correct myself - moved into multiple databases in different forms, for different reasons - Confirmations, Regulatory Reporting, etc.
I was trying to see if I could develop a PoC of sorts, that can simulate this pipeline with the help of our friends in AWS.
Here's what we have -
- Amazon S3: Object Storage
- Amazon SQS: Message Queue
- Amazon Redshift: SQL-ish database for preserving transformed data for further analysis.
- AWS Lambda: Compute for performing business logic to enrich the base data, push this to SQS and then pull it from SQS again to send it out to Redshift.
--
Let me bring up the architecture diagram from the cover image again.
We're storing the initial piece of information in S3. This is where our first Lambda function gets triggered.
We take the baseline object and perform some analysis and transformation on it. Consider this as your business logic that you'd perform at L1 before the data is sent further downstream.
The transformed object is then sent out via the SQS Queue. This message gets fetched by the 2nd Lambda which is virtually polling the queue. Finally, this message is transformed into an SQL Query and the data is inserted into a database in Redshift serverless.
Now that we have a better picture, let's hop into action.
Setting up the Queue
We can setup a simple FIFO Queue in Amazon SQS to ensure ordered delivery of messages. I'll walk you through the code separately where we avoid message duplication.
The 1st Lambda Function
We're writing a function that is waiting for an object to be loaded into S3 so that it can be processed.
I didn't find an exact method to set the SQS queue as the destination, I am not sure if it's a permission issue because I wouldn't be able to touch it via boto3 either, but I was. Here, the S3 is our trigger and SQS is our programmatic destination.
The code is available on GitHub.
Here's where the business logic for our transactions goes in. Given that this is serverless and decoupled fully, this can be as complex as it can get. I've attached a simplified version below -
Object Storage via S3
It can be a standard bucket in the same region, just make sure you add this event notification in the S3 Properties to invoke the Lambda we just created.
The 2nd Lambda
We're now inside the analytics side of things where it'a all about ETL and further processing in Redshift. For starters, we need a function that can trigger from an SQS message and subsequently pass it on to Amazon Redshift.
Here's what this function looks like. Waits for a message in the SQS queue, performs a validation and pushes it to Redshift. Since we are using a FIFO Queue, we get access to a Deduplication value from the SQS Message which we can use to filter out duplicate messages.
We can also perform type-validations easily because "thanks Python!"
Redshift - the last checkpoint
You can make use of the default workgroup in Redshift serverless and the base "dev" database where you can create a new table "transactions" to insert all of this data.
After that, all you need to do is upload a JSON file to the S3 Bucket that looks like this and see the magic:)
I hope you found this thread informative. Cheers, Happy Holidays!
Featured ones: