Logo

dev-resources.site

for different kinds of informations.

How did I contribute for OpenAI’s Xmas Bonus before cutting 50% costs while scaling 10x with GenAI processing

Published at
12/24/2024
Categories
openai
chatgpt
lambda
costsavings
Author
Felipe Malaquias
Categories
4 categories in total
openai
open
chatgpt
open
lambda
open
costsavings
open
How did I contribute for OpenAI’s Xmas Bonus before cutting 50% costs while scaling 10x with GenAI processing

Yet another screw-up.

TLDR: use OpenAI’s Files and Batch APIs for async non-time-sensitive processing. Code below.

I screw up so often that I actually learned to love the process of screwing up!

Please don’t confuse it with being reckless, though. Think of it as a fast incremental learning process, just like fine-tuning a model.

The Most Recent Screw-Up

At the beginning of this year (2024), I created my first automation using GenAI for prototyping a travel app I used for giving a talk at the AWS User Group Berlin meetup showing case the new Amplify Gen 2 services (link here). At that time, unfortunately, there was only one way to generate chat completions using OpenAI’s */v1/chat/completions* API.

Recently, I needed to automate another process for which GenAI was a good fit. Therefore, I reused the same approach I had before and created the following state machine using AWS Step Functions:

Step Function iterating over DB records for assembling GenAI prompts and invoking OpenAI chat completion

I somehow have a fetish for diagrams, so I found myself proud and smart after I finished it - a feeling that didn’t last that long until I realized I had neither the money for my brother’s Xmas gift nor was I smart.

Without going into details on the workflow itself, there are mainly two issues with this approach:

  • Although step functions are great for automation, they have their limitations (e.g.: maximum number of history events — steps— of 25000)

  • AWS offers 4000 free state transitions as part of their free tear. Anything above that, you pay.

  • APIs are rate-limited in order to avoid abuse. OpenAI is, of course, no different (especially as it is being explored by so many people around the world right now). Therefore, it greatly restricts your ability to parallelize.

  • GenAI is generally still very slow considering low latency APIs all around us nowadays, especially if you require more complex models like o1. Therefore, if you need to iterate over 25000 prompts without fine-tuning your model, you will see yourself waiting more than 6 hours for it to complete and eventually failing.

  • Models like o1 are still relatively expensive when a large number of tokens are requested.

So, after my first frustrated run for my new use case, I had:

  • Waited for 05:45:36.299 hours until the step function failed with a runtime error: The execution reached the maximum number of history events (25000).

  • Spent $90 on tokens

  • Exceeded my free 4000 state transitions limit in my AWS account

Step Function Failure after more than 5h…

The Fix

As I was, of course, in disbelief there wouldn’t be a better way to achieve what I wanted, I started re-reading the documentation.… to my happy surprise, I see a shiny new batch API I had overlooked, launched in April this year (2024).

So I wrote the following lambda (in typescript) instead:

import OpenAI, { toFile } from 'openai';
import { BatchWriteCommand, BatchWriteCommandInput, DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb';
import { DynamoDBClient, GetItemCommand, QueryCommand } from '@aws-sdk/client-dynamodb';
import { v4 as uuidv4 } from 'uuid';
import { GenerateTipsEvent } from '../shared/types/tips';
import { FileLike } from 'openai/uploads.mjs';
import { ChatCompletionCreateParamsNonStreaming } from 'openai/resources/chat/completions.mjs';

// ParamsAndSecretsLayerVersion used for secrets retrieval/caching
const AWS_SECRETS_EXTENTION_SERVER_ENDPOINT = "http://localhost:2773/secretsmanager/get?secretId="

let openai: OpenAI | undefined;

const BATCH_REQUESTS_TABLE = process.env.BATCH_REQUESTS_TABLE || 'BatchRequests';

const ddbClient = new DynamoDBClient({});
const ddb = DynamoDBDocumentClient.from(ddbClient);

// one single request within the batch
interface BatchRequestLineItem {
  custom_id: string;
  method: string;
  url: string;
  body: ChatCompletionCreateParamsNonStreaming;
}

interface BatchRequestInput {
  lineItem: BatchRequestLineItem;
  custom_id: string;
  // any other field you may want use later in post processing
}

// split db command chunks to avoid exceeding max limits
const chunk = <T>(arr: T[], size: number): T[][] => {
  return Array.from({ length: Math.ceil(arr.length / size) }, (_, i) =>
    arr.slice(i * size, i * size + size)
  );
};

async function initOpenAi() {
  if (!openai) {
    const openAiSecret = JSON.parse(await getSecretValue(process.env.OPEN_AI_SECRET_NAME!))

    openai = new OpenAI({
      apiKey: openAiSecret.apiKey,
      organization: openAiSecret.orgId,
    });
  }
}

const getSecretValue = async (secretName: string) => {
  const url = `${AWS_SECRETS_EXTENTION_SERVER_ENDPOINT}${secretName}`;
  const response = await fetch(url, {
    method: "GET",
    headers: {
      "X-Aws-Parameters-Secrets-Token": process.env.AWS_SESSION_TOKEN!,
    },
  });

  if (!response.ok) {
    throw new Error(
      `Error occured while requesting secret ${secretName}. Responses status was ${response.status}`
    );
  }

  const secretContent = (await response.json()) as { SecretString: string };
  return secretContent.SecretString;
};

const getTopicsForCategory = async (categoryId: string) => {
  const result = await ddb.send(new QueryCommand({
    // ... boring stuff
  }));
  return result.Items || [];
};

const getSectionsForTopic = async (topicId: string) => {
  const result = await ddb.send(new QueryCommand({
    // ... boring stuff
  }));
  return result.Items || [];
};

const getUnitsForSection = async (sectionId: string) => {
  const result = await ddb.send(new QueryCommand({
    // ... boring stuff
  }));
  return result.Items || [];
};

const getCategory = async (categoryId: string) => {
  const result = await ddb.send(new GetItemCommand({
    // ... boring stuff
  }));
  return result.Item;
};

async function generateBatchRequests(
  categoryId: string,
  model: string,
  numberOfTips: number
): Promise<BatchRequestInput[]> {
  try {
    // Fetch initial data in parallel
    const [category, topics] = await Promise.all([
      getCategory(categoryId),
      getTopicsForCategory(categoryId),
    ]);

    if (!category?.id.S) {
      throw new Error(`Category not found: ${categoryId}`);
    }

    console.log('Generating batch requests for category', category.id.S);
    console.log('Found topics:', topics.length);

    const batchRequests: BatchRequestInput[] = [];

    // Process topics
    for (const topic of topics) {
      if (!topic.id.S) continue;

      const sections = await getSectionsForTopic(topic.id.S);
      console.log(`Found ${sections.length} sections for topic ${topic.id.S}`);

      // Process sections
      for (const section of sections) {
        if (!section.id.S) continue;

        const units = await getUnitsForSection(section.id.S);
        console.log(`Found ${units.length} units for section ${section.id.S}`);

        // Process units
        for (const unit of units) {
          if (!unit.id.S) continue;

          const event = {
            // ... any custom data used in your prompts
            model: model,
          };

          const customId = uuidv4();
          batchRequests.push({
            custom_id: customId,
            lineItem: {
              custom_id: customId,
              method: 'POST',
              url: '/v1/chat/completions',
              body: {
                model: model,
                messages: [
                  { role: 'system', content: getSystemPrompt(event) },
                  { role: 'user', content: getUserPrompt(event) }
                ],
                response_format: { type: "json_object" },
                temperature: 0.3,
              },
            },
            // ... any other field you may want to correlate later on post processing
          });

          console.log(`Created batch request for unit ${unit.id.S}`);
        }
      }
    }

    console.log(`Total batch requests generated: ${batchRequests.length}`);
    return batchRequests;

  } catch (error) {
    console.error('Error generating batch requests:', error);
    throw error;
  }
}

interface GenerateTipsBatchRequestEvent {
  categoryId: string;
  model: string;
  numberOfTips: number;
}

export const handler = async (event: GenerateTipsBatchRequestEvent) => {
  try {
    const { categoryId, model, numberOfTips } = event;
    if (!model || !categoryId || !numberOfTips) {
      return {
        statusCode: 400,
        body: JSON.stringify({ error: 'Missing required parameters' }),
      };
    }

    await initOpenAi();

    console.log('Generating batch requests');
    const batchRequestInputs = await generateBatchRequests(categoryId, model, numberOfTips);

    if (batchRequestInputs.length === 0) {
      return {
        statusCode: 404,
        body: JSON.stringify({ error: 'No units found' }),
      };
    }

    const files = await createBatchFile(batchRequestInputs.map(batchRequestInput => batchRequestInput.lineItem));

    for (const file of files) {
      console.log('Uploading file ', file.name);
      const upload = await openai?.files.create({
        purpose: 'batch',
        file: file
      });
      if (!upload) continue;

      console.log('File uploaded', JSON.stringify(upload, null, 2));

      console.log('Creating batch');
      const requestedBatch = await openai?.batches.create({
        completion_window: '24h',
        endpoint: '/v1/chat/completions',
        input_file_id: upload.id,
        metadata: {
          // ... any metadata you may want to add
        }
      });
      console.log('Batch created', JSON.stringify(requestedBatch, null, 2));

      if (!requestedBatch) continue;

      console.log('Storing batch request', batchRequestInputs.length);
      await storeBatchRequest(batchRequestInputs.map(batchRequestInput => ({
        customId: batchRequestInput.custom_id,
        body: JSON.stringify(batchRequestInput.lineItem),
        batchId: requestedBatch.id,
        filename: upload?.filename,
        bytes: upload?.bytes,
        status: requestedBatch.status,
        // ... and more fields you may want to correlate later on post processing
      })));

      console.log('Batch request stored');
    }

    return {
      statusCode: 200,
      body: JSON.stringify({
        message: 'Batch request stored',
        batchRequestInputsLength: batchRequestInputs.length,
      }),
    };
  } catch (error) {
    console.error('Error:', error);
    throw error;
  }
}

async function storeBatchRequest(requests: {
  status: string;
  type: string;
  body: string;
  batchId: string | undefined;
  filename: string | undefined;
  bytes: number | undefined;
  customId: string;
  // ... and more properties you want to correlate later on post processing
}[]) {
  try {
    console.log('Starting to store batch requests:', requests.length);

    // Split requests into chunks of 25 (DynamoDB batch write limit)
    const batches = chunk(requests, 25);
    console.log('Split into', batches.length, 'batches');

    for (let i = 0; i < batches.length; i++) {
      const batch = batches[i];
      console.log(`Processing batch ${i + 1} of ${batches.length}`);

      const batchWriteParams: BatchWriteCommandInput = {
        RequestItems: {
          [BATCH_REQUESTS_TABLE]: batch.map(request => ({
            PutRequest: {
              Item: {
                id: request.customId,
                ...request,
                createdAt: new Date().toISOString(),
                updatedAt: new Date().toISOString()
              }
            }
          }))
        }
      };

      try {
        console.log(`Sending batch write command for batch ${i + 1}`);
        const result = await ddb.send(new BatchWriteCommand(batchWriteParams));

        // Check for unprocessed items
        if (result.UnprocessedItems && Object.keys(result.UnprocessedItems).length > 0) {
          console.warn('Unprocessed items:', result.UnprocessedItems);
          // Optionally retry unprocessed items
          await retryUnprocessedItems(result.UnprocessedItems);
        }

        console.log(`Successfully processed batch ${i + 1}`);
      } catch (error) {
        console.error(`Error writing batch ${i + 1}:`, error);
        throw error;
      }
    }

    console.log('Successfully stored all batch requests');
  } catch (error) {
    console.error('Error in storeBatchRequest:', error);
    throw error;
  }
}

// Helper function to retry unprocessed items
async function retryUnprocessedItems(unprocessedItems: Record<string, any>) {
  try {
    const retryParams: BatchWriteCommandInput = {
      RequestItems: unprocessedItems
    };
    await ddb.send(new BatchWriteCommand(retryParams));
  } catch (error) {
    console.error('Error retrying unprocessed items:', error);
    throw error;
  }
}

async function createBatchFile(items: BatchRequestLineItem[], maxSizeMB: number = 200): Promise<FileLike[]> {
  const MAX_FILE_SIZE = maxSizeMB * 1024 * 1024; // Convert MB to bytes
  const files: FileLike[] = [];
  let currentItems: BatchRequestLineItem[] = [];
  let currentSize = 0;
  let fileIndex = 0;

  const createBatchFile = async (items: BatchRequestLineItem[], index: number): Promise<FileLike> => {
    // Convert items to JSONL string
    const jsonlContent = items
      .map(item => JSON.stringify(item))
      .join('\n');

    // Create file using OpenAI's toFile utility
    return await toFile(
      new Blob([jsonlContent], { type: 'application/jsonl' }),
      `batch_${Date.now()}_${index}.jsonl`
    );
  };

  // Process items and create batches
  for (const item of items) {
    const line = JSON.stringify(item) + '\n';
    const itemSize = Buffer.byteLength(line, 'utf-8');

    // Check if adding this item would exceed the size limit
    if (currentSize + itemSize > MAX_FILE_SIZE) {
      // Create file from current batch
      const file = await createBatchFile(currentItems, fileIndex);
      files.push(file);

      // Reset for next batch
      currentItems = [];
      currentSize = 0;
      fileIndex++;
    }

    // Add item to current batch
    currentItems.push(item);
    currentSize += itemSize;
  }

  // Process remaining items
  if (currentItems.length > 0) {
    const file = await createBatchFile(currentItems, fileIndex);
    files.push(file);
  }

  return files;
}

const getSystemPrompt = (event: GenerateTipsEvent) => `You are... rest of prompt`

const getUserPrompt = (event: GenerateTipsEvent) => `Generate ... rest of prompt`

This function ran in 4518.0ms to upload one file of 4Mb with 391 prompts of a total of 1,301,368 tokens (in and out).
The processing of those prompts cost about $14, and it took 18 minutes for the batch to complete.

Yes, the numbers in the title don’t match, but the numbers above are only partial, which matches the generation of tips in the workflow screenshot above. For the sake of simplicity and deduplication, I omitted the processing of quizzes, which follows the same approach.

Please bear in mind the batch results may be available up to 24h. So it’s something you should consider only if you don’t need an answer right away.

Learning: Why do now what we can put off until later?! :)

Thanks for reading! Got any cool ideas or feedback you want to share? Drop a comment, send me a message or follow me and let’s keep moving things forward!

Featured ones: