Logo

dev-resources.site

for different kinds of informations.

Building Scalable Event Processing with Fan-out Pattern using the Serverless Framework

Published at
1/3/2025
Categories
serverless
serverlessframework
sns
sqs
Author
Deepak Sharma
Building Scalable Event Processing with Fan-out Pattern using the Serverless Framework

Want to process the same event across multiple services simultaneously? The fan-out pattern using AWS Serverless services might be exactly what you need. In this tutorial, we’ll build a robust asynchronous fan-out system using AWS Lambda, SNS, and SQS, all orchestrated with the Serverless Framework.

What is the Fan-out Pattern?
The fan-out pattern is a messaging pattern where a single message triggers multiple parallel processing flows. The fan out pattern implements a single topic that will push each received message to multiple queues. That way, by only sending a single message from your code, it will be sent into multiple queues, so multiple functions will be invoked asynchronously. Think of it like a broadcaster sending the same message to multiple receivers simultaneously. This pattern is particularly useful when you need to:

  • Process the same data in different ways
  • Trigger multiple workflows from a single event
  • Distribute notifications to multiple subscribers
  • Scale your event processing horizontally

Prerequisites

  • Node.js installed on your machine
  • AWS account with appropriate permissions
  • Basic understanding of JavaScript/Node.js
  • AWS CLI installed and configured
  • Serverless Framework CLI installed

Architecture Overview
The Serverless Framework allows you to easily define the resources of an asynchronous microservice, define the triggers (i.e. a trigger could be a message in a queue or a message sent to a topic), and define the middleware components that are essential for realizing the asynchronous communication.

Our solution uses these AWS services:

  • AWS Lambda: For processing events
  • Amazon SNS (Simple Notification Service): For message broadcasting
  • Amazon SQS (Simple Queue Service): For reliable message delivery
  • Serverless Framework: For infrastructure as code

Below is the high level architectural diagram of the implementation:

  1. A trigger Lambda receives an event
  2. The event is published to an SNS topic
  3. SNS broadcasts the message to multiple SQS queues
  4. Separate Lambda functions process messages from each queue

Image description

Common Use Cases

  • Event notification systems
  • Data processing pipelines
  • Microservices communication
  • Log processing and analytics
  • Real-time data distribution

Implementation
Step 1: Project Set Up
First, let’s create our project structure and install necessary dependencies:

mkdir serverless-async-fanout
cd serverless-async-fanout
npm init -y
npm install aws-sdk serverless

Once the above dependencies are installed, we proceed with the project structure creation which should look like below:

.
β”œβ”€β”€ serverless.yml
β”œβ”€β”€ src
β”‚   β”œβ”€β”€ trigger.js
β”‚   β”œβ”€β”€ processorOne.js
β”‚   └── processorTwo.js
└── package.json

Project Structure:

  • serverless.yml: Configuration for functions, resources, and permissions. It is the backbone of the application, defining our infrastructure as code.
  • src/trigger.js: Handles incoming requests and publishes to SNS
  • src/processorOne.js: Processes individual messages from SQS One
  • src/processorTwo.js: Processes individual messages from SQS Two
  • package.json: Project dependencies

Step 2: Configure Serverless.yml
The serverless.yml file contains the configuration of runtime, trigger functions, aws region, SNS and SQS resources. Below is how my file looks like:

service: serverless-async-fanout

provider:
  name: aws
  runtime: nodejs18.x
  region: eu-central-1
  iam:
    role:
      statements:
        - Effect: Allow
          Action:
            - sns:Publish
            - sqs:SendMessage
            - sqs:ReceiveMessage
            - sqs:DeleteMessage
          Resource:
            - !Ref NotificationTopic
            - !GetAtt ProcessingQueueOne.Arn
            - !GetAtt ProcessingQueueTwo.Arn

functions:
  trigger:
    handler: src/trigger.handler
    events:
      - http:
          path: /trigger
          method: post
    environment:
      SNS_TOPIC_ARN: !Ref NotificationTopic

  processorOne:
    handler: src/processorOne.handler
    events:
      - sqs:
          arn: !GetAtt ProcessingQueueOne.Arn
          batchSize: 1

  processorTwo:
    handler: src/processorTwo.handler
    events:
      - sqs:
          arn: !GetAtt ProcessingQueueTwo.Arn
          batchSize: 1

resources:
  Resources:
    NotificationTopic:
      Type: AWS::SNS::Topic
      Properties:
        TopicName: ${self:service}-notification-topic

    ProcessingQueueOne:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: ${self:service}-processing-queue-one
        VisibilityTimeout: 60

    ProcessingQueueTwo:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: ${self:service}-processing-queue-two
        VisibilityTimeout: 60

    QueueOnePolicy:
      Type: AWS::SQS::QueuePolicy
      Properties:
        Queues:
          - !Ref ProcessingQueueOne
        PolicyDocument:
          Version: "2012-10-17"
          Statement:
            - Effect: Allow
              Principal:
                Service: sns.amazonaws.com
              Action: sqs:SendMessage
              Resource: !GetAtt ProcessingQueueOne.Arn
              Condition:
                ArnEquals:
                  aws:SourceArn: !Ref NotificationTopic

    QueueTwoPolicy:
      Type: AWS::SQS::QueuePolicy
      Properties:
        Queues:
          - !Ref ProcessingQueueTwo
        PolicyDocument:
          Version: "2012-10-17"
          Statement:
            - Effect: Allow
              Principal:
                Service: sns.amazonaws.com
              Action: sqs:SendMessage
              Resource: !GetAtt ProcessingQueueTwo.Arn
              Condition:
                ArnEquals:
                  aws:SourceArn: !Ref NotificationTopic

    NotificationTopicSubscriptionOne:
      Type: AWS::SNS::Subscription
      Properties:
        TopicArn: !Ref NotificationTopic
        Protocol: sqs
        Endpoint: !GetAtt ProcessingQueueOne.Arn

    NotificationTopicSubscriptionTwo:
      Type: AWS::SNS::Subscription
      Properties:
        TopicArn: !Ref NotificationTopic
        Protocol: sqs
        Endpoint: !GetAtt ProcessingQueueTwo.Arn

Step 3: Implement the Lambda Functions
Create your trigger function (src/trigger.js):

const AWS = require('aws-sdk');
const sns = new AWS.SNS();

module.exports.handler = async (event) => {
    try {
        const body = JSON.parse(event.body);
        const items = body.items || [];

        // Publish each message to SNS - it will automatically fan out to all subscribed queues
        const publishPromises = items.map(item =>
            sns.publish({
                TopicArn: process.env.SNS_TOPIC_ARN,
                Message: JSON.stringify(item)
            }).promise()
        );

        await Promise.all(publishPromises);

        return {
            statusCode: 200,
            body: JSON.stringify({
                message: `Successfully initiated processing for ${items.length} items`
            })
        };
    } catch (error) {
        console.error('Error:', error);
        return {
            statusCode: 500,
            body: JSON.stringify({
                message: 'Error processing request',
                error: error.message
            })
        };
    }
};

Create your receiver functions (src/processorOne.js):

module.exports.handler = async (event) => {
    try {
        for (const record of event.Records) {
            const item = JSON.parse(record.body);
            console.log('Processing item in Queue One:', item);

            // Queue One specific processing
            await processItemInQueueOne(item);
        }

        return {
            statusCode: 200,
            body: JSON.stringify({
                message: 'Successfully processed messages in Queue One'
            })
        };
    } catch (error) {
        console.error('Error in Queue One processor:', error);
        throw error;
    }
};

async function processItemInQueueOne(item) {
    // Add your Queue One specific processing logic
    console.log('Queue One processing:', item);
    await new Promise(resolve => setTimeout(resolve, 1000));
}

Create your receiver functions (src/processorTwo.js):

module.exports.handler = async (event) => {
    try {
        for (const record of event.Records) {
            const item = JSON.parse(record.body);
            console.log('Processing item in Queue Two:', item);

            // Queue Two specific processing
            await processItemInQueueTwo(item);
        }

        return {
            statusCode: 200,
            body: JSON.stringify({
                message: 'Successfully processed messages in Queue Two'
            })
        };
    } catch (error) {
        console.error('Error in Queue Two processor:', error);
        throw error;
    }
};

async function processItemInQueueTwo(item) {
    // Add your Queue Two specific processing logic
    console.log('Queue Two processing:', item);
    await new Promise(resolve => setTimeout(resolve, 1000));
}

Step 4: Deploy and Test
Deploy the service using below commands:

# Configure AWS credentials
aws configure

# Deploy to AWS
serverless deploy

Note: You may encounter below error on deployment:
serverless.ps1 cannot be loaded. The file ..\npm\serverless.ps1 is not digitally signed.
To resolve this error, you may execute the below command and retry deployment:

Set-ExecutionPolicy -Scope Process -ExecutionPolicy Bypass

Once the deployment is successful, the API endpoint will be generated along with the functions (topic & queues):

endpoint: POST - https://xxxxx.execute-api.eu-central-1.amazonaws.com/dev/trigger
functions:
  trigger: async-fanout-service-dev-trigger (23 MB)
  processorOne: async-fanout-service-dev-processorOne (23 MB)
  processorTwo: async-fanout-service-dev-processorTwo (23 MB)

Test the fan-out pattern using curl or Postman:

#create an event/message
curl -X POST -H "Content-Type: application/json" \
  -d '{"items":[{"id":1,"data":"test data 1"}, 
                {"id":2,"data":"test data 2"}
               ]}' \
  https://your-api-endpoint/dev/trigger
{
    "message": "Successfully initiated processing for 2 items"
}

When you publish a message to SNS, it will automatically be delivered to both SQS queues, and each processor will handle the message according to its own logic.

Verifying the Implementation
To verify that your fan-out pattern is working:

  1. Check CloudWatch Logs in AWS Console for both processor functions: Go to AWS Console β†’ CloudWatch β†’ Log groups Look for two log groups:
  2. /aws/lambda/async-fanout-service-dev-processorOne
  3. /aws/lambda/async-fanout-service-dev-processorTwo
    You should see matching messageIds in both log groups

  4. Monitor SQS queue metrics in CloudWatch

  5. View SNS topic delivery metrics

Cleanup
To remove all deployed resources:

serverless remove

This command will (check your AWS Console):

  • Delete the Lambda functions
  • Remove the API Gateway endpoints
  • Delete the SNS Topic and SQS Queues
  • Remove the IAM roles and policies
  • Clean up any CloudWatch log groups
  • Remove any other AWS resources that were created as part of your service

Conclusion
The fan-out pattern using AWS Serverless services provides a powerful way to build scalable, event-driven architectures. By combining SNS and SQS, we get the benefits of both broadcast-style messaging and reliable message delivery.

Please don’t forget to clap and follow me on LinkedIn and Github for more such posts.

Featured ones: