Logo

dev-resources.site

for different kinds of informations.

Creating Serverless Webhooks on AWS CDK

Published at
12/21/2024
Categories
serverless
sqs
stepfunctions
eventbridge
Author
Katherine
Creating Serverless Webhooks on AWS CDK

In this post you'll learn how to create a resilient webhook using AWS CDK. This architecture will ensure scalability, reliability, and fault tolerance.

Source code of this project

AWS Service we'll use:

  • SQS
  • API Gateway
  • EventBridge Pipes
  • Step Functions

Serverless Webhook

Let’s break down the architecture step-by-step.

SQS

We’ll start by integrating API Gateway directly with SQS. This will allows us to handle incoming webhook requests at scale and buffer requests before they reach downstream services. Using SQS means our services don’t need to scale instantly in response to traffic spikes, which helps maintain stability. Attaching a Dead Letter Queue (DLQ) ensures that messages will never lost and can be inspected to be debug or retried if processing fails.
To send messages to our SQS we need to define a Role which later on we will be attaching it to the API Integration.


    /* ----------------- SQS ----------------- */
    const dlq = new Queue(this, 'WebhookDLQ', {
      queueName: 'webhook-dlq',
      retentionPeriod: Duration.days(14),
    });

    const webhookQueue = new Queue(this, 'WebhookQueue', {
      deadLetterQueue: {
        queue: dlq,
        maxReceiveCount: 3,
      },
      queueName: 'webhook-queue',
      retentionPeriod: Duration.days(7),
    });

    // We create a Role for the API Gateway to be able to send messages to the SQS queue
    const webhookSubscriptionAPIRole = new Role(
      this,
      'WebhookSubscriptionApiRole',
      {
        assumedBy: new ServicePrincipal('apigateway.amazonaws.com'),
        inlinePolicies: {
          sqsSendMessage: new PolicyDocument({
            statements: [
              new PolicyStatement({
                actions: ['sqs:SendMessage'],
                effect: Effect.ALLOW,
                resources: [webhookQueue.queueArn],
              }),
            ],
          }),
        },
      },
    );

    const sqsIntegration = new apigateway.AwsIntegration({
      integrationHttpMethod: 'POST',
      options: {
        credentialsRole: webhookSubscriptionAPIRole,
        passthroughBehavior: PassthroughBehavior.NEVER,
        integrationResponses: [
          {
            responseTemplates: {
              'application/json': '{}',
            },
            statusCode: '200',
          },
        ],
        requestParameters: {
          'integration.request.header.Content-Type': `'application/x-www-form-urlencoded'`,
        },
        // We extract the body from the request and send it to the SQS queue
        requestTemplates: {
          'application/json':
            'Action=SendMessage&MessageBody={"body": $util.urlEncode($input.body)}',
        },
      },
      path: `${this.account}/${webhookQueue.queueName}`,
      region: this.region,
      service: 'sqs',
    });

API Gateway

The point of entry and our first layer of protection. It provides a secure, scalable endpoint that can validate incoming requests before they hit our SQS queue.
In a previous post I talked about how to do this validation at the level of API Gateway with Zod.

I'm going to use a schema which represents a User.

import { z } from 'zod';

export const userSchema = z.object({
  name: z.string(), // Name must be a string
  age: z.number().int().min(18), // User's age must be an integer greater than 18
  email: z.string().email(), // Must be a valid email
});

export type UserEventType = z.infer<typeof userSchema>;

And use the library zod-to-json-schema to generate a Model to be used to do the validation in our Stack.


    /* ----------------- API Gateway ----------------- */

    // We create an API Gateway
    const myWebhookAPI = new RestApi(this, 'MyAPI', {
      restApiName: 'my-webhook-api',
    }); 
        // Add the POST method to the API Gateway with the SQS integration and the request validator
    myWebhookAPI.root.addMethod('POST', sqsIntegration, {
      requestValidatorOptions: {
        requestValidatorName: 'webhook-request-validator',
        validateRequestBody: true,
      },
      requestModels: {
        'application/json': new Model(this, 'webhook-request-model', {
          restApi: myWebhookAPI,
          contentType: 'application/json',
          description: 'Validation model for the request body',
          modelName: 'myRequestJsonSchema',
          schema: myRequestJsonSchema,
        }),
      },
      methodResponses: [
        {
          statusCode: '200',
          responseModels: {
            'application/json': Model.EMPTY_MODEL,
          },
        },
        {
          statusCode: '400',
          responseModels: {
            'application/json': Model.ERROR_MODEL,
          },
        },
        {
          statusCode: '500',
          responseModels: {
            'application/json': Model.ERROR_MODEL,
          },
        },
      ],
    });

    myWebhookAPI.addGatewayResponse('ValidationError', {
      type: apigateway.ResponseType.BAD_REQUEST_BODY,
      statusCode: '400',
      templates: {
        'application/json': JSON.stringify({
          errors: '$context.error.validationErrorString',
          details: '$context.error.message',
        }),
      },
    });

Step Function

Once our messages have passed through API Gateway and SQS, we’ll use a Step Function to process them.
As a first step on the State Machine we will make sure that the message received is valid, meaning we will validate the HMAC signature, perform basic authentication, or other logic to ensure the message source is trusted.
For this state machine, I've chosen as a type a Express State Machine as if it fails at any point our message will be delivered to the DLQ . This will not happen if it's an Standard State Machine as it will be executed by a Fire and Forget event which will not wait for a State Machine to be executed.

State Machine Definition

/* ----------------- Step Function ----------------- */
    const validateMessageLambda = new NodejsFunction(
      this,
      'ValidateMessageLambda',
      {
        code: Code.fromInline(`
        exports.handler = async (event) => {
            console.log('Validating message');
            console.log(JSON.parse(event[0].body)); 
            return true;
          };
        `),
        handler: 'index.handler',
        runtime: Runtime.NODEJS_22_X,
      },
    );

    const myStepFunction = new StateMachine(this, 'MyStepFunction', {
      stateMachineType: StateMachineType.EXPRESS,
      logs: {
        destination: new LogGroup(this, 'MyStepFunctionLogs', {
          logGroupName: '/aws/vendedlogs/states/my-step-function-logs',
          removalPolicy: RemovalPolicy.DESTROY,
          retention: RetentionDays.ONE_DAY,
        }),
        level: LogLevel.ERROR,
      },
      definitionBody: DefinitionBody.fromChainable(
        new LambdaInvoke(this, 'ValidateMessageTask', {
          lambdaFunction: validateMessageLambda,
          comment: 'Validate the message',
          stateName:
            'Validate Message (HMAC Signature, Basic Authentication, etc)',
        }).next(
          new Pass(this, 'ProcessMessageTask', {
            stateName: 'Process Message',
          }),
        ),
      ),
    });

EventBridge Pipes

We’ll consume messages from SQS and send them into the Step Functions state machine through EventBridge Pipes. If anything goes wrong, the message will be routed to the DLQ for reprocessing or debugging.
You can use the CFNPipe construct or be a little more risky and use the Alpha Constructs of EventBridge Pipes

import {
  CloudwatchLogsLogDestination,
  DesiredState,
  LogLevel as PipesLogLevel,
  Pipe,
} from '@aws-cdk/aws-pipes-alpha';
import { SqsSource } from '@aws-cdk/aws-pipes-sources-alpha';
import {
  SfnStateMachine,
  StateMachineInvocationType,
} from '@aws-cdk/aws-pipes-targets-alpha';

....
/* ----------------- Pipe ----------------- */
    const myIstateMachine = StateMachine.fromStateMachineArn(
      this,
      'myIStateMachine',
      myStepFunction.stateMachineArn,
    );
    new Pipe(this, 'MyPipe', {
      source: new SqsSource(webhookQueue),
      target: new SfnStateMachine(myIstateMachine, {
        invocationType: StateMachineInvocationType.REQUEST_RESPONSE,
      }),
      // Configure the pipe to send logs to CloudWatch Logs
      logLevel: PipesLogLevel.ERROR,
      logDestinations: [
        new CloudwatchLogsLogDestination(
          new LogGroup(this, 'PipeLogs', {
            logGroupName: '/aws/vendedlogs/states/pipe-logs',
            removalPolicy: RemovalPolicy.DESTROY,
            retention: RetentionDays.ONE_DAY,
          }),
        ),
      ],
      desiredState: DesiredState.RUNNING,
    });

Links

Featured ones: