dev-resources.site
for different kinds of informations.
Create an agent and build a deployable notebook from it in watsonx.ai β Part 2
Introduction
In the previous article (Create an agent and build a Notebook from it in watsonx.ai β Part 1), all the necessary steps to prepare the watsonx.ai studio are described. This article will go through the agent creation and deployment from the built-in capacities of the platform.
In this article we will discuss the capacity of watsonx.ai platform to deploy a custom agentic services and applications.
New agent creation
From the menu bar on the top right side, click on the βAgentsβ icon.
If the icon is not visible, it means that the feature is not activated yet for your profile. Refer to the following article to enable it: How to access Agent interface in watsonx.ai (Beta)!
Once the icon clicked, the interface of agent creation appears.
At this time, the only available framework is LangGraph with ReAct architecture.
For this example, a new vector index is added as shown. A sample PDF is ingested into the system.
As shown, currently 3 options are available; a) In memory, b) Milvus and c) Elasticsearch.
Other integrations could be activated as well.
Once the desired integrations are activated the agent is ready to be used.
Agent execution based on the LLM capacities
As a first test, the agent could be used for any sort of information.
Agent execution based on the RAG capacities
As a vector index was created for this example, we will use the same agent to answer a question regarding the ingested informations.
Make a deployment from the agent
Now that the agent function fine and has been tested, we are going to make a deployment of it.
First step is to save the agent and then select which type of deployment we need. In this example we will select Deployment notebook. Also, when creating a deployment two resources are required; a) an API key and b) a deployment space.
The following screens show the steps to fulfill these requirements.
Hit the βCreateβ button.
Create new deployment space with the βNew deployment spaceβ.
Give a name and associate with the previously created service and hit βCreateβ.
You have a generated notebook ready to be deployed.
AI Service Deployment Notebook
This notebook contains steps and code to test, promote, and deploy an AI Service capturing logic to implement RAG pattern for grounded chats.
Note: Notebook code generated using Prompt Lab will execute successfully. If code is modified or reordered, there is no guarantee it will successfully execute. For details, see: Saving your work in Prompt Lab as a notebook.
Some familiarity with Python is helpful. This notebook uses Python 3.11.
Contents
This notebook contains the following parts:
Setup
Initialize all the variables needed by the AI Service
Define the AI service function
Deploy an AI Service
Test the deployed AI Service
1. Set up the environment
Before you can run this notebook, you must perform the following setup tasks:
Connection to WML
This cell defines the credentials required to work with watsonx API for both the execution in the project, as well as the deployment and runtime execution of the function.
Action: Provide the IBM Cloud personal API key. For details, see documentation.
import os
import getpass
import requests
def get_credentials():
return {
"url" : "https://us-south.ml.cloud.ibm.com",
"apikey" : getpass.getpass("Please enter your api key (hit enter): ")
}
def get_bearer_token():
url = "https://iam.cloud.ibm.com/identity/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = f"grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey={credentials['apikey']}"
response = requests.post(url, headers=headers, data=data)
return response.json().get("access_token")
credentials = get_credentials()
from ibm_watsonx_ai import APIClient
client = APIClient(credentials)
Connecting to a space
A space will be be used to host the promoted AI Service.
space_id = "xxxxxxxx"
client.set.default_space(space_id)
Promote asset(s) to space
We will now promote assets we will need to stage in the space so that we can access their data from the AI service.
source_project_id = "xxxxxx"
vector_index_id = client.spaces.promote("xxxxxx", source_project_id, space_id)
print(vector_index_id)
2. Create the AI service function
We first need to define the AI service function
2.1 Define the function
params = {
"space_id": space_id,
"vector_index_id": vector_index_id
}
def gen_ai_service(context, params = params, **custom):
# import dependencies
from langchain_ibm import ChatWatsonx
from ibm_watsonx_ai import APIClient
from langchain_core.messages import AIMessage, HumanMessage
from langchain.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_community.tools import DuckDuckGoSearchRun
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
import json
model = "mistralai/mistral-large"
service_url = "https://us-south.ml.cloud.ibm.com"
# Get credentials token
credentials = {
"url": service_url,
"token": context.generate_token()
}
# Setup client
client = APIClient(credentials)
space_id = params.get("space_id")
client.set.default_space(space_id)
vector_index_id = params.get("vector_index_id")
# Get vector index details
vector_index_details = client.data_assets.get_details(vector_index_id)
vector_index_properties = vector_index_details["entity"]["vector_index"]
top_n = 20 if vector_index_properties["settings"].get("rerank") else int(vector_index_properties["settings"]["top_k"])
def rerank( client, documents, query, top_n ):
from ibm_watsonx_ai.foundation_models import Rerank
reranker = Rerank(
model_id="cross-encoder/ms-marco-minilm-l-12-v2",
api_client=client,
params={
"return_options": {
"top_n": top_n
},
"truncate_input_tokens": 512
}
)
reranked_results = reranker.generate(query=query, inputs=documents)["results"]
new_documents = []
for result in reranked_results:
result_index = result["index"]
new_documents.append(documents[result_index])
return new_documents
import gzip
import chromadb
import random
import string
def hydrate_chromadb():
data = client.data_assets.get_content(vector_index_id)
content = gzip.decompress(data)
stringified_vectors = str(content, "utf-8")
vectors = json.loads(stringified_vectors)
chroma_client = chromadb.Client()
# make sure collection is empty if it already existed
collection_name = "my_collection"
try:
collection = chroma_client.delete_collection(name=collection_name)
except:
print("Collection didn't exist - nothing to do.")
collection = chroma_client.create_collection(name=collection_name)
vector_embeddings = []
vector_documents = []
vector_metadatas = []
vector_ids = []
for vector in vectors:
vector_embeddings.append(vector["embedding"])
vector_documents.append(vector["content"])
metadata = vector["metadata"]
lines = metadata["loc"]["lines"]
clean_metadata = {}
clean_metadata["asset_id"] = metadata["asset_id"]
clean_metadata["asset_name"] = metadata["asset_name"]
clean_metadata["url"] = metadata["url"]
clean_metadata["from"] = lines["from"]
clean_metadata["to"] = lines["to"]
vector_metadatas.append(clean_metadata)
asset_id = vector["metadata"]["asset_id"]
random_string = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
id = "{}:{}-{}-{}".format(asset_id, lines["from"], lines["to"], random_string)
vector_ids.append(id)
collection.add(
embeddings=vector_embeddings,
documents=vector_documents,
metadatas=vector_metadatas,
ids=vector_ids
)
return collection
chroma_collection = hydrate_chromadb()
from ibm_watsonx_ai.foundation_models.embeddings.sentence_transformer_embeddings import SentenceTransformerEmbeddings
emb = SentenceTransformerEmbeddings('sentence-transformers/all-MiniLM-L6-v2')
def proximity_search( question, inner_client ):
query_vectors = emb.embed_query(question)
query_result = chroma_collection.query(
query_embeddings=query_vectors,
n_results=top_n,
include=["documents", "metadatas", "distances"]
)
documents = list(reversed(query_result["documents"][0]))
if vector_index_properties["settings"].get("rerank"):
documents = rerank(inner_client, documents, question, vector_index_properties["settings"]["top_k"])
return "\n".join(documents)
def create_chat_model(watsonx_client):
parameters = {
"frequency_penalty": 0,
"max_tokens": 2000,
"presence_penalty": 0,
"temperature": 0,
"top_p": 1
}
chat_model = ChatWatsonx(
model_id=model,
url=service_url,
space_id=space_id,
params=parameters,
watsonx_client=watsonx_client,
)
return chat_model
def get_remote_tool_descriptions():
remote_tool_descriptions = {}
import requests
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f'Bearer {context.generate_token()}'
}
tool_url = "https://private.api.dataplatform.cloud.ibm.com"
remote_tools_response = requests.get(f'{tool_url}/wx/v1/agent_tools', headers = headers)
remote_tools = remote_tools_response.json()
for resource in remote_tools["resources"]:
tool_name = resource["name"]
tool_description = resource["description"]
remote_tool_descriptions[tool_name] = tool_description
return remote_tool_descriptions
tool_descriptions = get_remote_tool_descriptions()
def create_remote_tool(tool_name, context):
from langchain_core.tools import Tool
import requests
def call_tool( tool_input ):
body = {
"tool_name": tool_name,
"input": tool_input
}
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f'Bearer {context.get_token()}'
}
tool_url = "https://private.api.dataplatform.cloud.ibm.com"
tool_response = requests.post(f'{tool_url}/wx/v1/agent_tools/run', headers = headers, json = body)
if (tool_response.status_code > 400):
raise Exception(f'Error calling remote tool: {tool_response.json()}' )
tool_output = tool_response.json()
return tool_response.json().get("output")
tool = Tool(
name=tool_name,
description = tool_descriptions[tool_name],
func=call_tool
)
return tool
def create_tools(inner_client, context):
tools = []
wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
tools.append(wikipedia)
search = DuckDuckGoSearchRun()
tools.append(search)
def proximity_search_wrapper(question):
return proximity_search(question, inner_client)
from langchain_core.tools import Tool
rag_query = Tool(
name="RAGQuery",
description="Search information in documents to provide context to a user query. Useful when asked to ground the answer in specific knowledge about TestVectorIndex.",
func=proximity_search_wrapper
)
tools.append(rag_query)
tools.append(create_remote_tool("GoogleSearch", context))
tools.append(create_remote_tool("WebCrawler", context))
tools.append(create_remote_tool("PythonInterpreter", context))
return tools
def create_agent(model, tools, messages):
memory = MemorySaver()
instructions = """
# Notes
- Use markdown syntax for formatting code snippets, links, JSON, tables, images, files.
- Any HTML tags must be wrapped in block quotes, for example ```
<html>
```.
- When returning code blocks, specify language.
- Sometimes, things don't go as planned. Tools may not provide useful information on the first few tries. You should always try a few different approaches before declaring the problem unsolvable.
- When the tool doesn't give you what you were asking for, you must either use another tool or a different tool input.
- When using search engines, you try different formulations of the query, possibly even in a different language.
- You cannot do complex calculations, computations, or data manipulations without using tools.
- If you need to call a tool to compute something, always call it instead of saying you will call it.
If a tool returns an IMAGE in the result, you must include it in your answer as Markdown.
Example:
Tool result: IMAGE({commonApiUrl}/wx/v1/agent_tools/cache/images/plt-04e3c91ae04b47f8934a4e6b7d1fdc2c.png)
Markdown to return to user: ![Generated image]({commonApiUrl}/wx/v1/agent_tools/cache/images/plt-04e3c91ae04b47f8934a4e6b7d1fdc2c.png)
You are a helpful assistant that uses tools to answer questions in detail.
When greeted, say \"Hi, I am watsonx.ai agent. How can I help you?\""""
for message in messages:
if message["role"] == "system":
instruction += message["content"]
graph = create_react_agent(model, tools=tools, checkpointer=memory, state_modifier=instructions)
return graph
def convert_messages(messages):
converted_messages = []
for message in messages:
if (message["role"] == "user"):
converted_messages.append(HumanMessage(content=message["content"]))
elif (message["role"] == "assistant"):
converted_messages.append(AIMessage(content=message["content"]))
return converted_messages
def generate(context):
payload = context.get_json()
messages = payload.get("messages")
inner_credentials = {
"url": service_url,
"token": context.get_token()
}
inner_client = APIClient(inner_credentials)
model = create_chat_model(inner_client)
tools = create_tools(inner_client, context)
agent = create_agent(model, tools, messages)
generated_response = agent.invoke(
{ "messages": convert_messages(messages) },
{ "configurable": { "thread_id": "42" } }
)
last_message = generated_response["messages"][-1]
generated_response = last_message.content
execute_response = {
"headers": {
"Content-Type": "application/json"
},
"body": {
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": generated_response
}
}]
}
}
return execute_response
def generate_stream(context):
print("Generate stream", flush=True)
payload = context.get_json()
messages = payload.get("messages")
inner_credentials = {
"url": service_url,
"token": context.get_token()
}
inner_client = APIClient(inner_credentials)
model = create_chat_model(inner_client)
tools = create_tools(inner_client, context)
agent = create_agent(model, tools, messages)
response_stream = agent.stream(
{ "messages": messages },
{ "configurable": { "thread_id": "42" } },
stream_mode=["updates", "messages"]
)
for chunk in response_stream:
chunk_type = chunk[0]
finish_reason = ""
usage = None
if (chunk_type == "messages"):
message_object = chunk[1][0]
if (message_object.type == "AIMessageChunk" and message_object.content != ""):
message = {
"role": "assistant",
"delta": message_object.content
}
else:
continue
elif (chunk_type == "updates"):
update = chunk[1]
if ("agent" in update):
agent = update["agent"]
agent_result = agent["messages"][0]
if (agent_result.additional_kwargs):
kwargs = agent["messages"][0].additional_kwargs
tool_call = kwargs["tool_calls"][0]
message = {
"role": "assistant",
"tool_calls": [
{
"id": tool_call["id"],
"type": "function",
"function": {
"name": tool_call["function"]["name"],
"arguments": tool_call["function"]["arguments"]
}
}
]
}
elif (agent_result.response_metadata):
# Final update
message = {
"role": "assistant",
"content": agent_result.content
}
finish_reason = agent_result.response_metadata["finish_reason"]
usage = {
"completion_tokens": agent_result.usage_metadata["output_tokens"],
"prompt_tokens": agent_result.usage_metadata["input_tokens"],
"total_tokens": agent_result.usage_metadata["total_tokens"]
}
elif ("tools" in update):
tools = update["tools"]
tool_result = tools["messages"][0]
message = {
"role": "tool",
"id": tool_result.id,
"tool_call_id": tool_result.tool_call_id,
"name": tool_result.name,
"content": tool_result.content
}
else:
continue
chunk_response = {
"choices": [{
"index": 0,
"message": message
}]
}
if (finish_reason):
chunk_response["choices"][0]["finish_reason"] = finish_reason
if (usage):
chunk_response["usage"] = usage
yield chunk_response
return generate, generate_stream
2.2 Test locally
# Initialize AI Service function locally
from ibm_watsonx_ai.deployments import RuntimeContext
context = RuntimeContext(api_client=client)
streaming = False
findex = 1 if streaming else 0
local_function = gen_ai_service(context, vector_index_id=vector_index_id, space_id=space_id)[findex]
messages = []
local_question = "Change this question to test your function"
messages.append({ "role" : "user", "content": local_question })
context = RuntimeContext(api_client=client, request_payload_json={"messages": messages})
response = local_function(context)
result = ''
if (streaming):
for chunk in response:
print(chunk, end="\n\n", flush=True)
else:
print(response)
3. Store and deploy the AI Service
Before you can deploy the AI Service, you must store the AI service in your watsonx.ai repository.
# Look up software specification for the AI service
software_spec_id_in_project = "xxxxxxxxx"
software_spec_id = ""
try:
software_spec_id = client.software_specifications.get_id_by_name("ai-service-v2-b1-software-specification")
except:
software_spec_id = client.spaces.promote(software_spec_id_in_project, source_project_id, space_id)
# Define the request and response schemas for the AI service
request_schema = {
"application/json": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"messages": {
"title": "The messages for this chat session.",
"type": "array",
"items": {
"type": "object",
"properties": {
"role": {
"title": "The role of the message author.",
"type": "string",
"enum": ["user","assistant"]
},
"content": {
"title": "The contents of the message.",
"type": "string"
}
},
"required": ["role","content"]
}
}
},
"required": ["messages"]
}
}
response_schema = {
"application/json": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"choices": {
"title": "A list of chat completion choices",
"type": "array",
"items": {
"type": "object",
"properties": {
"index": {
"type": "integer",
"title": "The index of this result."
},
"message": {
"title": "A message result.",
"type": "object",
"properties": {
"role": {
"const": "assistant"
},
"delta": {
"title": "Information from event.",
"type": "object",
"properties": {
"content": {
"title": "Message content.",
"type": "string"
}
},
"required": ["content"]
},
"content": {
"title": "Message content.",
"type": "string"
}
},
"required": ["role"]
}
},
"required": ["index","message"]
}
}
},
"required": ["choices"]
}
}
# Store the AI service in the repository
ai_service_metadata = {
client.repository.AIServiceMetaNames.NAME: "AgentDeploymentNoteBookfromRAG",
client.repository.AIServiceMetaNames.DESCRIPTION: "AgentDeploymentNoteBookfromRAG",
client.repository.AIServiceMetaNames.SOFTWARE_SPEC_ID: software_spec_id,
client.repository.AIServiceMetaNames.CUSTOM: {},
client.repository.AIServiceMetaNames.REQUEST_DOCUMENTATION: request_schema,
client.repository.AIServiceMetaNames.RESPONSE_DOCUMENTATION: response_schema,
client.repository.AIServiceMetaNames.TAGS: ["wx-agent-builder"]
}
ai_service_details = client.repository.store_ai_service(meta_props=ai_service_metadata, ai_service=gen_ai_service)
# Get the AI Service ID
ai_service_id = client.repository.get_ai_service_id(ai_service_details)
# Deploy the stored AI Service
deployment_metadata = {
client.deployments.ConfigurationMetaNames.NAME: "AgentDeploymentNoteBookfromRAG",
client.deployments.ConfigurationMetaNames.DESCRIPTION: "AgentDeploymentNoteBookfromRAG",
client.deployments.ConfigurationMetaNames.ONLINE: {},
client.deployments.ConfigurationMetaNames.CUSTOM: {}
}
function_deployment_details = client.deployments.create(ai_service_id, meta_props=deployment_metadata, space_id=space_id)
4. Test AI Service
# Get the ID of the AI Service deployment just created
deployment_id = client.deployments.get_id(function_deployment_details)
print(deployment_id)
messages = []
remote_question = "Change this question to test your function"
messages.append({ "role" : "user", "content": remote_question })
payload = { "messages": messages }
result = client.deployments.run_ai_service(deployment_id, payload)
if "error" in result:
print(result["error"])
else:
print(result)
Next steps
You successfully deployed and tested the AI Service! You can now view your deployment and test it as a REST API endpoint.
## Conclusion
Through the **[watsonx.ai](https://www.ibm.com/products/watsonx-ai)** prompt studio, we could build an agent and make a deployable ready solution.
Thanks for reading π
Featured ones: