dev-resources.site
for different kinds of informations.
Building Scalable Event Processing with Fan-out Pattern using the Serverless Framework
Want to process the same event across multiple services simultaneously? The fan-out pattern using AWS Serverless services might be exactly what you need. In this tutorial, weβll build a robust asynchronous fan-out system using AWS Lambda, SNS, and SQS, all orchestrated with the Serverless Framework.
What is the Fan-out Pattern?
The fan-out pattern is a messaging pattern where a single message triggers multiple parallel processing flows. The fan out pattern implements a single topic that will push each received message to multiple queues. That way, by only sending a single message from your code, it will be sent into multiple queues, so multiple functions will be invoked asynchronously. Think of it like a broadcaster sending the same message to multiple receivers simultaneously. This pattern is particularly useful when you need to:
- Process the same data in different ways
- Trigger multiple workflows from a single event
- Distribute notifications to multiple subscribers
- Scale your event processing horizontally
Prerequisites
- Node.js installed on your machine
- AWS account with appropriate permissions
- Basic understanding of JavaScript/Node.js
- AWS CLI installed and configured
- Serverless Framework CLI installed
Architecture Overview
The Serverless Framework allows you to easily define the resources of an asynchronous microservice, define the triggers (i.e. a trigger could be a message in a queue or a message sent to a topic), and define the middleware components that are essential for realizing the asynchronous communication.
Our solution uses these AWS services:
- AWS Lambda: For processing events
- Amazon SNS (Simple Notification Service): For message broadcasting
- Amazon SQS (Simple Queue Service): For reliable message delivery
- Serverless Framework: For infrastructure as code
Below is the high level architectural diagram of the implementation:
- A trigger Lambda receives an event
- The event is published to an SNS topic
- SNS broadcasts the message to multiple SQS queues
- Separate Lambda functions process messages from each queue
Common Use Cases
- Event notification systems
- Data processing pipelines
- Microservices communication
- Log processing and analytics
- Real-time data distribution
Implementation
Step 1: Project Set Up
First, letβs create our project structure and install necessary dependencies:
mkdir serverless-async-fanout
cd serverless-async-fanout
npm init -y
npm install aws-sdk serverless
Once the above dependencies are installed, we proceed with the project structure creation which should look like below:
.
βββ serverless.yml
βββ src
β βββ trigger.js
β βββ processorOne.js
β βββ processorTwo.js
βββ package.json
Project Structure:
- serverless.yml: Configuration for functions, resources, and permissions. It is the backbone of the application, defining our infrastructure as code.
- src/trigger.js: Handles incoming requests and publishes to SNS
- src/processorOne.js: Processes individual messages from SQS One
- src/processorTwo.js: Processes individual messages from SQS Two
- package.json: Project dependencies
Step 2: Configure Serverless.yml
The serverless.yml file contains the configuration of runtime, trigger functions, aws region, SNS and SQS resources. Below is how my file looks like:
service: serverless-async-fanout
provider:
name: aws
runtime: nodejs18.x
region: eu-central-1
iam:
role:
statements:
- Effect: Allow
Action:
- sns:Publish
- sqs:SendMessage
- sqs:ReceiveMessage
- sqs:DeleteMessage
Resource:
- !Ref NotificationTopic
- !GetAtt ProcessingQueueOne.Arn
- !GetAtt ProcessingQueueTwo.Arn
functions:
trigger:
handler: src/trigger.handler
events:
- http:
path: /trigger
method: post
environment:
SNS_TOPIC_ARN: !Ref NotificationTopic
processorOne:
handler: src/processorOne.handler
events:
- sqs:
arn: !GetAtt ProcessingQueueOne.Arn
batchSize: 1
processorTwo:
handler: src/processorTwo.handler
events:
- sqs:
arn: !GetAtt ProcessingQueueTwo.Arn
batchSize: 1
resources:
Resources:
NotificationTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: ${self:service}-notification-topic
ProcessingQueueOne:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:service}-processing-queue-one
VisibilityTimeout: 60
ProcessingQueueTwo:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:service}-processing-queue-two
VisibilityTimeout: 60
QueueOnePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref ProcessingQueueOne
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: sns.amazonaws.com
Action: sqs:SendMessage
Resource: !GetAtt ProcessingQueueOne.Arn
Condition:
ArnEquals:
aws:SourceArn: !Ref NotificationTopic
QueueTwoPolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref ProcessingQueueTwo
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: sns.amazonaws.com
Action: sqs:SendMessage
Resource: !GetAtt ProcessingQueueTwo.Arn
Condition:
ArnEquals:
aws:SourceArn: !Ref NotificationTopic
NotificationTopicSubscriptionOne:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref NotificationTopic
Protocol: sqs
Endpoint: !GetAtt ProcessingQueueOne.Arn
NotificationTopicSubscriptionTwo:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref NotificationTopic
Protocol: sqs
Endpoint: !GetAtt ProcessingQueueTwo.Arn
Step 3: Implement the Lambda Functions
Create your trigger function (src/trigger.js):
const AWS = require('aws-sdk');
const sns = new AWS.SNS();
module.exports.handler = async (event) => {
try {
const body = JSON.parse(event.body);
const items = body.items || [];
// Publish each message to SNS - it will automatically fan out to all subscribed queues
const publishPromises = items.map(item =>
sns.publish({
TopicArn: process.env.SNS_TOPIC_ARN,
Message: JSON.stringify(item)
}).promise()
);
await Promise.all(publishPromises);
return {
statusCode: 200,
body: JSON.stringify({
message: `Successfully initiated processing for ${items.length} items`
})
};
} catch (error) {
console.error('Error:', error);
return {
statusCode: 500,
body: JSON.stringify({
message: 'Error processing request',
error: error.message
})
};
}
};
Create your receiver functions (src/processorOne.js):
module.exports.handler = async (event) => {
try {
for (const record of event.Records) {
const item = JSON.parse(record.body);
console.log('Processing item in Queue One:', item);
// Queue One specific processing
await processItemInQueueOne(item);
}
return {
statusCode: 200,
body: JSON.stringify({
message: 'Successfully processed messages in Queue One'
})
};
} catch (error) {
console.error('Error in Queue One processor:', error);
throw error;
}
};
async function processItemInQueueOne(item) {
// Add your Queue One specific processing logic
console.log('Queue One processing:', item);
await new Promise(resolve => setTimeout(resolve, 1000));
}
Create your receiver functions (src/processorTwo.js):
module.exports.handler = async (event) => {
try {
for (const record of event.Records) {
const item = JSON.parse(record.body);
console.log('Processing item in Queue Two:', item);
// Queue Two specific processing
await processItemInQueueTwo(item);
}
return {
statusCode: 200,
body: JSON.stringify({
message: 'Successfully processed messages in Queue Two'
})
};
} catch (error) {
console.error('Error in Queue Two processor:', error);
throw error;
}
};
async function processItemInQueueTwo(item) {
// Add your Queue Two specific processing logic
console.log('Queue Two processing:', item);
await new Promise(resolve => setTimeout(resolve, 1000));
}
Step 4: Deploy and Test
Deploy the service using below commands:
# Configure AWS credentials
aws configure
# Deploy to AWS
serverless deploy
Note: You may encounter below error on deployment:
serverless.ps1 cannot be loaded. The file ..\npm\serverless.ps1 is not digitally signed.
To resolve this error, you may execute the below command and retry deployment:
Set-ExecutionPolicy -Scope Process -ExecutionPolicy Bypass
Once the deployment is successful, the API endpoint will be generated along with the functions (topic & queues):
endpoint: POST - https://xxxxx.execute-api.eu-central-1.amazonaws.com/dev/trigger
functions:
trigger: async-fanout-service-dev-trigger (23 MB)
processorOne: async-fanout-service-dev-processorOne (23 MB)
processorTwo: async-fanout-service-dev-processorTwo (23 MB)
Test the fan-out pattern using curl or Postman:
#create an event/message
curl -X POST -H "Content-Type: application/json" \
-d '{"items":[{"id":1,"data":"test data 1"},
{"id":2,"data":"test data 2"}
]}' \
https://your-api-endpoint/dev/trigger
{
"message": "Successfully initiated processing for 2 items"
}
When you publish a message to SNS, it will automatically be delivered to both SQS queues, and each processor will handle the message according to its own logic.
Verifying the Implementation
To verify that your fan-out pattern is working:
- Check CloudWatch Logs in AWS Console for both processor functions: Go to AWS Console β CloudWatch β Log groups Look for two log groups:
- /aws/lambda/async-fanout-service-dev-processorOne
/aws/lambda/async-fanout-service-dev-processorTwo
You should see matching messageIds in both log groupsMonitor SQS queue metrics in CloudWatch
View SNS topic delivery metrics
Cleanup
To remove all deployed resources:
serverless remove
This command will (check your AWS Console):
- Delete the Lambda functions
- Remove the API Gateway endpoints
- Delete the SNS Topic and SQS Queues
- Remove the IAM roles and policies
- Clean up any CloudWatch log groups
- Remove any other AWS resources that were created as part of your service
Conclusion
The fan-out pattern using AWS Serverless services provides a powerful way to build scalable, event-driven architectures. By combining SNS and SQS, we get the benefits of both broadcast-style messaging and reliable message delivery.
Please donβt forget to clap and follow me on LinkedIn and Github for more such posts.
Featured ones: