Logo

dev-resources.site

for different kinds of informations.

Orchestrate AWS Lambdas using MongoDB - Part 2

Published at
11/15/2023
Categories
eventbridge
statemachine
orchestration
jobs
Author
sateeshm
Author
8 person written this
sateeshm
open
Orchestrate AWS Lambdas using MongoDB - Part 2

Continuation to the first part. This post assumes you are a developer with working knowledge on AWS, Lambda, EventBridge, MongoDB, NodeJs.

Technical Implementation:

1. Design State-Machine
This step you need to figure out, how do you want to orchestrate your jobs and create an object like below.

const stateMachine = {
  id: '1',
  purposeId: 'catch_the_fish',
  currentPhase: 'thinking',
  phases: {
    thinking: {
      steps: {
        `thinking`: {
          jobs: ['buy_fishing_gear'],
        },
        buy_fishing_gear: {
          jobs: ['rent_a_boat'],
        },
        rent_a_boat: {
          jobs: ['go_to_fishing_ground'],
        },
        go_to_fishing_ground: {
          jobs: ['cast_fishing_pole', 'drink_beer', 'catch_fish'],
        },
        catch_fish: {
          jobs: ['go_home'],
        },
      },
      finished: [],
      next: 'end',
      waitFor: [
        'buy_fishing_gear',
        'rent_a_boat',
        'go_to_fishing_ground',
        'cast_fishing_pole',
        'drink_beer',
        'catch_fish',
        'go_home',
      ],
    },
  },
};

Enter fullscreen mode Exit fullscreen mode

In the above example we are running buy_fishing_gear after thinking job is finished in a sequential manner, but we run cast_fishing_pole, drink_beer and catch_fish in parallel after go_to_fishing_ground is successful.

2. Start the Process:
Creation of the statemachine could be anything from API to a cron job. Lets take API as an example as POST /process/catch-the-fish.

router.post('/process/catch-the-fish', async (req, res, next) => {
  const state = {}; // above mentioned state
  // Insert into state-machines
  const createdState = await db.collection('state-machines').insertOne(state);
  // Start the process by sending first event. As thinking is success we start the process of catching fish
  await db.collection('statuses').insertOne({
    stateId: createdState._id,
    status: 'success',
    job: 'thinking',
    date: new Date(),
    // additional parameters as needed
  });
});

Enter fullscreen mode Exit fullscreen mode

3. State Machine Job:

exports.handler = async (event, ctx, callback) => {
  // Mongodb Document from the trigger
  const document = event.detail.fullDocument;
  // Find the state machine
  const state = await db
    .collection('state-machines')
    .findOne({ _id: document.stateId });
  // if its ended do nothing
  if (state?.currentPhase === 'end') {
    callback();
  }
  // Update the job as finished for the state machine
  const currentPhase = state.phases[state.currentPhase];
  currentPhase.finished.push(document.job);
  await db
    .collection('state-machines')
    .findOneAndUpdate(
      { _id: document.stateId },
      { $set: { [`phases.${state.currentPhase}`]: currentPhase } }
    );
  const jobsRemaining = currentPhase.waitFor.filter(
    (s) => !currentPhase.finished.includes(s)
  );
  // If there are no remaining jobs in the current phase
  if (jobsRemaining.length === 0) {
    // Update the currentPhase to be the next Phase.
    await db.collection('state-machines').findOneAndUpdate(
      { _id: document.stateId },
      {
        $set: {
          currentPhase: currentPhase.next,
        },
      }
    );
    // If the next phase is not end trigger the phase.
    if (currentPhase.next !== 'end') {
      await db.collection('statuses').insertOne({
        ...document,
        job: `${currentPhase.next}Start`,
        status: 'success',
      });
    }
  }
};

Enter fullscreen mode Exit fullscreen mode

Above, whenever a job succeeds this will update the finished jobs and check if the phase is finished and move onto next phase until it meets the end phase.
4. Orchestrator Job:

exports.handler = async (event, ctx, callback) => {
  ctx.callbackWaitsForEmptyEventLoop = false;
  const document = event.detail.fullDocument;
  const state = await db
    .collection('pipeline-state-machines')
    .findOne({ _id: document.stateId });
  const currentPhase = state[state.currentPhase];
  if (!document.stateId) {
    console.log('Cannot execute the pipeline without the state id:', state);
    return { success: false };
  }
  if (state.currentPhase === 'end') {
    console.log('Cannot execute the pipeline without the state id:', state);
    return { success: false };
  }
  if (!currentPhase) {
    console.log('Cannot execute the pipeline without the state id:', state);
    return { success: false };
  }
  const step = currentPhase.steps[document.job];
  if (step && step.jobs && step.jobs.length > 0) {
    await Promise.all(
      step.jobs.map(async (job) => {
        const payload = {}
        const event = {
          FunctionName: job,
          InvocationType: 'Event',
          LogType: 'Tail',
          Payload: JSON.stringify(payload),
        };
        return lambda.invoke(event);
      })
    );
  }
  callback();
};
Enter fullscreen mode Exit fullscreen mode

The above job receives the success event and finds the next jobs to trigger and invoke them.
5. Notifications Job:
As above the job receives all the events so we can use the data and structure the message to send.

What if there are multiple phases:
Then we just have to add another phase to existing state machine configuration.

const stateMachine = {
  id: '1',
  purposeId: 'catch_the_fish',
  currentPhase: 'thinking',
  phases: {
    thinking: {
      // Prev things
    },
    cooking: {
      steps: {
        cookingStart: {
          jobs: ['clean'],
        },
        clean: {
          jobs: ['cook'],
        },
      },
      waitFor: ['cook', 'clean'],
      finished: [],
      next: 'end',
    },
  },
};

Enter fullscreen mode Exit fullscreen mode

I hope the above code and explanation gives you a way to implement your own solutions. For anymore details please do comment, I would be happy to help. Thanks.

eventbridge Article's
30 articles in total
Favicon
API Destinations with Amazon EventBridge
Favicon
How to Leverage EventBridge for Building Decoupled Event-Driven Systems
Favicon
Creating Serverless Webhooks on AWS CDK
Favicon
Building Faster Event-Driven Architectures: Exploring Amazon EventBridge’s New Latency Gains
Favicon
Disaster recovery for AWS Aurora
Favicon
Building a Scalable Job Queue System with AWS and Laravel
Favicon
AWS Serverless: How to Stop EC2 using Event Bridge and Lambda
Favicon
Monitoring AWS ECS Deployment failures
Favicon
Amazon EventBridge Pipes now supports customer managed KMS keys
Favicon
This stranger EventBus Mesh
Favicon
An Alternative to Batch Jobs: Scheduling Events with EventBridge Scheduler
Favicon
Momento added as an Amazon EventBridge API destination!
Favicon
EventBridge: working around API Destination 5s maximum client timeout constraint, using Lambda PowerTools idempotency
Favicon
Event-Driven Magic: Exploring AWS EventBridge
Favicon
Event-Driven Architecture: reconcile Notification and Event-Carried State Transfer patterns
Favicon
Architecture orientée événement : réconcilier Notifications et Evénements "Complets"
Favicon
Executing long running tasks with AppSync
Favicon
How To Run A Serverless Scheduled Function Using AWS Lambda & EventBridge
Favicon
Automate AWS Cost & Usage report using Event Bridge, Lambda, SES, S3 & AWS Cost Explorer API
Favicon
Leveraging the SDK to Publish an Event to EventBridge with Lambda and Rust
Favicon
How Epilot Builds a Powerful Webhook Feature with AWS
Favicon
Setting up AppSync subscriptions for out-of-band updates with Eventbridge pipes
Favicon
How CloudWatch Network Monitor Performs Connectivity Test to EC2 Instances
Favicon
How to Get Custom Email Notification for EC2 State Changes Using EventBridge & Lambda
Favicon
Lambda Scheduling & Event Filtering with EventBridge using Serverless Framework
Favicon
Orchestrate AWS Lambdas using MongoDB - Part 2
Favicon
Solving problems 1: ECS, Event Bridge Scheduler, PHP, migrations
Favicon
Integration testing EventBridge events
Favicon
3 ways to catch all the events going through the EventBridge Event Bus
Favicon
Buses and queues: Head-on

Featured ones: