Logo

dev-resources.site

for different kinds of informations.

RAG Step-by-Step

Published at
3/25/2024
Categories
Author
Sophia Parafina
Categories
1 categories in total
open
RAG Step-by-Step

Frank Denneman of VMWare recently published RAG Architecture Deep Dive which defines the process for RAG applications. The article defines a Load-Transform-Embed-Store workflow. Let's implement the workflow in practice. For this tutorial, I'm using OpenAI to generate embeddings, Langchain for processing text, and Pinecone to store embeddings.

For the impatient: Code available on Github.

Load

Load is the data-gathering process. This information is collected to augment large language models. I was interested in the future of AI from the viewpoint of AI companies' CEOs, CTOs, and product managers. I downloaded interview transcripts from YouTube to gather the latest data and installed the youtube-transcript-api package into the project environment.

pip install youtube-transcript-api

This script downloads the video transcript and can format it as plain text or JSON. The JSON format includes time stamps, which can help extract a section of a transcript.

Download the data to skip this step.

from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import TextFormatter
from youtube_transcript_api.formatters import JSONFormatter
import sys, json

# usage: get_transcript video_id output_filename output_type

transcript = YouTubeTranscriptApi.get_transcript(sys.argv[1])

ext = ''
if sys.argv[3] == 'json':
   formatter = JSONFormatter()
   ext = '.json'
elif sys.argv[3] == 'text':
   formatter = TextFormatter()
   ext = '.txt'

formatted = formatter.format_transcript(transcript)

f = open(sys.argv[2] + ext, "w")
f.write(formatted)
f.close

This is a snippet from a transcript.

that was going on in my mind how many
gpus can we buy for 7
trillion well apparently all the
gpus I I I think this is one thing I'm
I'm waiting to ask Sam about because
it's it's a really big number talk about
ambition we have a lot of ambition here
in the UA we don't lack ambition but is
there a view that you can give the
government leaders today with regards to
compute capabilities and artificial

Transcripts lack formatting and this can be problematic when transforming the text into vectors. In the next section, we'll look at strategies for splitting text and encoding them as vectors.

Transform

This step splits the raw text into logical units such as sentences or paragraphs. Beyond splitting or chunking text based on punctuation, there are many ways to chunk text based on the data type. There are text splitters for structured data such as JSON or code, semi-structured data such as Markdown, and plain text.

This example implements several text chunking methods ranging from naive methods such as splitting on punctuation or a fixed number of characters to methods based on Natural Language Processing (NLP) such as Natural Language Toolkit (NLTK) and spaCy. The context window of the AI used to tokenize data sets a limit on the size of a chunk. The chunk size can be set manually, or we can use OpenAI's tiktoken package to calculate the number of tokens for a particular chunk.

This example has a caveat: Some of the Langchain text splitters return the chunks as a Langchain Document. For some of the chunking methods, I've added code to extract the text and return it as an array of strings that will be used to create the JSON document for embedding in Pinecone.

Which chunking method should you choose? Naive chunking methods would be the simplest way to split text. However, the transcripts do not have punctuation or newline characters to split by sentence. Both NLTK and spaCy chunking methods can extract sentences, but they can generate tokens that are too large for a context. The recursive character text splitter can generally chunk the text into sentences with a more consistent token length. For this example, the recursive character text splitter can split the text into chunks that fit within the embedding model's context.

from openai import OpenAI
import os, textwrap, json
import string, random
from langchain.text_splitter import NLTKTextSplitter
from langchain.text_splitter import CharacterTextSplitter
from langchain.text_splitter import SpacyTextSplitter
from langchain_text_splitters import RecursiveCharacterTextSplitter
import nltk
import tiktoken

# used to estimate tokens used
#https://python.langchain.com/docs/modules/data_connection/document_transformers/split_by_token#tiktoken
def tiktoken_length(text):
    encoding = tiktoken.encoding_for_model(MODEL)
    num_tokens = len(encoding.encode(text))

    return num_tokens

# split on character, e.g. '.'
def naive_chunking(text):
    doc = text.split(".")

    return doc

# split by a fixed number of characters
def textwrap_chunking(text):
    doc = textwrap.wrap(
        text, 
        2500, 
        replace_whitespace=False
        )

    return doc

# langchain character text splitter
# https://python.langchain.com/docs/modules/data_connection/document_transformers/character_text_splitter
def fixed_sized_chunking(text):

    # get the number of tokens
    tiktoken_len = tiktoken_length(text)

    text_splitter = CharacterTextSplitter(
        separator = "\n",
        chunk_size = 1024,
        chunk_overlap  = 200,    
        length_function=tiktoken_length,
        is_separator_regex=False
        )
    data = text_splitter.create_documents([text])

    # text splitter returns langchain Document class
    # reformat into an array of strings
    doc =[]
    for d in data:
        d = d.page_content
        doc.append(d)

    return doc

# langchain NLTK splitter
# https://python.langchain.com/docs/modules/data_connection/document_transformers/split_by_token#nltk
def NLTK_chunking(text):
    nltk.download('punkt')
    text_splitter = NLTKTextSplitter()
    doc = text_splitter.split_text(text)

    return doc

# langchain semantic text splitter
# https://python.langchain.com/docs/modules/data_connection/document_transformers/split_by_token#spacy
def spaCy_chunking(text):
    text_splitter = SpacyTextSplitter(chunk_size=2000)
    doc = text_splitter.split_text(text)

    return doc

# langchaing recursive character text splitter
# https://python.langchain.com/docs/modules/data_connection/document_transformers/recursive_text_splitter
def recursive_chracter_splitter_chunking(text):

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=100,
        chunk_overlap=20,
        length_function=tiktoken_length,
        is_separator_regex=False,
    )

    data = text_splitter.create_documents([text])

    # text splitter returns langchain Document class
    # reformat into an array of strings
    doc =[]
    for d in data:
        d = d.page_content
        doc.append(d)

    return doc
    ```
{% endraw %}


# create random ten-character alpha ids for pinecone upsert
def random_id():
    N = 10
    res = ''.join(random.choices(string.ascii_uppercase +
                             string.digits, k=N))
    return res
{% raw %}

Embed

The next step in the workflow is to convert the text chunks into vectors or embeddings. OpenAI recommends the text-embedding-3-small model, which performs more efficiently than previous models at a reduced price.

We'll store or upsert the data into Pinecone and write the embeddings to a JSON file with this format. Note that id uses alphabetic characters generated randomly by the random_id function. Id can include additional information such as the project name, but each id must be unique, or subsequent upserts will overwrite existing records. Metadata enables querying a specific article and retrieving the text of the embedding.


json
{"id" : id , 
 "values" : embedding, 
 "metadata": { 
    "source" : filename, 
    "text" : chunk}
}


The code calls the create_embeddings function which chunks the text and tokenizes the chunks. In production, you wouldn't create an intermediary data file. However, we generate an artifact for this example that lets you examine the data before upserting into the database.


python
client = OpenAI(
    api_key="my_OpenAI_key"
)

MODEL = "text-embedding-3-small"

def create_embbeddings(directory, files, chunking_method):
    data = []
    for filename in files:
        file_path = os.path.join(directory,filename)
        with open(file_path, 'r') as file:
            document_text = file.read()

            # choose the chunking method
            match chunking_method:
                case "naive_chunking":
                    chunks = naive_chunking(document_text)
                case "textwrap_chunking":
                    chunks = textwrap_chunking(document_text)    
                case "fix_sized_chunking":
                    chunks = fixed_sized_chunking(document_text)
                case "NLTK_chunking":
                    chunks = NLTK_chunking(document_text)
                case "spaCY_chunking":
                    chunks = spaCy_chunking(document_text)
                case "recursive_character_splitter_chunking":
                    chunks = recursive_chracter_splitter_chunking(document_text)
                case default:
                    chunks = naive_chunking(document_text)        

            for chunk in chunks:
                # chunk = chunk.replace('\n',' ')
                id = random_id()
                embedding = client.embeddings.create(
                    model = MODEL, 
                    input = chunk
                ).data[0].embedding
                pc_dict = {"id" : id , "values" : embedding, "metadata": { "source" : filename, "text" : chunk}}
                data.append(pc_dict)

    return data

input_directory = "./vector_store"
file_list = sorted(os.listdir(input_directory))
# file_list = file_list[:1]  <- set this to the number of files to process when testing
MODEL = "text-embedding-3-small"

chunking_method = "recursive_character_splitter_chunking"

embeddings = create_embbeddings(input_directory, file_list, chunking_method)

with open("transcript_embeddings.json", "w") as outfile:
    json.dump(embeddings, outfile)



Store

Upserting the data into Pinecone is the final step in this workflow. This example uses Pinecone's serverless vector database on AWS. First we create and index then upsert the embeddings. This example is basic but functional. Check out Pinecone's documentation for production-optimized deployments.


python
pc = Pinecone(api_key="my_pinecone_api_key")

f = open('transcript_embeddings.json', "r")
embeddings = json.load(f)

index_name = "transcripts"

if index_name not in pc.list_indexes().names():
    pc.create_index(
    name=index_name,
    dimension=1536,
    metric="cosine",
    spec=ServerlessSpec(
        cloud="aws",
        region="us-west-2"
    )
)

index = pc.Index(index_name)

index.upsert(
    vectors =  embeddings
)


Next Steps

Using the Load-Transform-Embed-Store workflow breaks up RAG into a series of easy-to-understand steps. This example uses OpenAI and Pinecone SaaS, but we can implement the same workflow with open-source software. The example code is available on Github, please leave a comment if you find it useful.

The follow-up article will demonstrate how to use this implementation with a conversational chatbot.

Featured ones: