dev-resources.site
for different kinds of informations.
Langgraph Human In The Loop with socket
langgraph ์ interruption ๊ธฐ๋ฅ์ ํตํด์ Agent์ ์ํ ์ค๊ฐ์ human์ด ๊ฐ์ ํ ์ ์๋ค๋ ๊ฒ์ ์์๋ค.
ํ์ง๋ง ์์๋ค์ ๋ณด๋ฉด ์ ๋ถ human interaction์ ํ ์ ์น๊ณ ~ ๋์ด๊ฐ๋ค. ์ค์ ๋ก User์๊ฒ ํ์ธ์ ๋ฐ์ผ๋ ค๋ฉด ์ด๋ป๊ฒ ํด์ผํ ๊น? ํฌ๊ฒ ์ธ๊ฐ์ง ๋ฐฉ๋ฒ์ด ์์ ๊ฒ ๊ฐ๋ค.
Langgraph API ์๋ฒ ์ฌ์ฉ
langgraph cli ๋ก langgraph API ์๋ฒ๋ฅผ docker๋ก ์คํํ ํ langgraph SDK๋ก ๊ทธ๋ํ๋ฅผ ์คํํ๊ณ , ์คํ ์ดํธ๋ฅผ ๋ณ๊ฒฝํ๊ณ , ์ฌ๊ฒํ๊ณ ํ ์ ์๋ค.
langgraph์์ ์ ๊ณตํ๋ ๊ฒ๋ค์ ์ ๊ณตํ๋ ๋ฐฉ๋ฒ๋๋ก ์ฌ์ฉํด์ผํ๋ค. ๋ญ๊ฐ ์ค์ ์ด ๋ง์์ง๊ณ , ๋ด ์ฝ๋๋ ์ตํฉํ๊ธฐ ๊น๋ค๋ก์ธ ์ ์์ด๋ณด์ธ๋ค.
์๋ฒ์์ ๊ทธ๋ํ ๊ด๋ฆฌ
์์ Langgraph API ์๋ฒ์์ ํ์ํ ๋ถ๋ถ๋ง ๋ด ์ปค์คํ ์๋ฒ์ ๊ตฌํํ๋ ๋ฐฉ๋ฒ์ด๋ค. ์๋ฅผ ๋ค์ด ๊ทธ๋ํ ์คํํ๋ฉด ๊ทธ๋ํ๋ฅผ ์คํํ ํด๋ผ์ด์ธํธ์ ๊ทธ๋ํ ์ฒดํฌํฌ์ธํธ๋ฅผ ์ ์ฅํด์ผํ๊ณ , ์ ์ ์ ํ์ธ ํ์ ๋ค์ ๊ทธ๋ํ๋ฅผ ๋ถ๋ฌ์์ ์ ์ ์ ์๋ต์ ๋ง๊ฒ ์ํ๋ฅผ ๋ณ๊ฒฝํด์ ์ฌ๊ฒํด์ผํ๋ค.
์ง์ผํ ๊ฒ ์๊ทผ ๋ง์ ์๋ ์๋ค.
์์ผ ์ฐ๊ฒฐ
Agent์คํ ์์ ์์ผ์ ์ฐ๊ฒฐํ๊ณ ์์ผ์ ํตํด์ ์ ์ ์ ์ธํฐ๋ ์ ํ๋ ๊ฒ์ด๋ค. ๊ธฐ์กด ์์ ์ฝ๋์์ ์์ผ์ฐ๊ฒฐ๊ณผ ์์ผ ํต์ ์ผ๋ก ์ ์ ํ์ธ ๋ฐ๋ ๋จ๊ณ๋ง ์ถ๊ฐํ๋ฉด ๋์ํ๋ค.
๋์ , ๊ธ์ ํ์ดํํ๋ฏ ์ณ์ง๋ streaming์ ๊ตฌํํ๊ธฐ ๊น๋ค๋ก์ธ ์๋ ์๋ค.
์์ผ ์ฐ๊ฒฐ๋ก ๊ตฌํ
์ผ๋จ ์ต๋ํ ๋ณต์ก์ฑ์ ๋๋ฆฌ์ง ์๋ ๋ฐฉํฅ์์ ๊ตฌํ์ ํด๋ณด๊ณ ์ถ์ด์ ์์ผ์ฐ๊ฒฐ๋ก ๊ตฌํํด๋ณด์๋ค.
์๋ฒ๋ NestJs๋ฅผ ์ฌ์ฉํ๊ณ ํด๋ผ์ด์ธํธ๋ NextJs๋ฅผ ์ฌ์ฉํ๋ค.
์๋ฒ
์ผ๋จ Websocket ์ฐ๊ฒฐ์ ์ํด Gateway๋ฅผ ๋ง๋ ๋ค. agent/start ์์ ์ปค๋ฅ์
์ ๋ง๋ค๊ณ ๋ฐ๋ก agent๋ฅผ ์ํํ๋๋ก ํ๋ค.
@WebSocketGateway({
namespace: "/",
transport: ["websocket", "polling"],
path: "/agent/start",
cors: {
origin: "*",
methods: ["GET", "POST"],
credentials: true,
},
})
export class AgentGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
protected readonly logger = new Logger(this.constructor.name);
constructor(
private readonly agentFactory: AgentFactory
) {}
private pendingConfirmations = new Map<string, (response: boolean) => void>();
// Handle new connections
handleConnection(client: Socket) {
console.log(`Client connected: ${client.id}`);
// Option 1: Get actionData from query parameters
const actionData: { agent: AgentName } = client.handshake.query.actionData
? JSON.parse(client.handshake.query.actionData as string)
: null;
if (actionData) {
this.startAgentProcess(client, actionData);
} else {
// If no actionData is provided, you can wait for an event
client.emit("error", "No action data provided");
client.disconnect();
}
}
// Handle disconnections
handleDisconnect(client: Socket) {
console.log(`Client disconnected: ${client.id}`);
this.pendingConfirmations.delete(client.id);
}
// Send confirmation request
async sendConfirmationRequest(clientId: string, data: any): Promise<boolean> {
return new Promise((resolve) => {
this.pendingConfirmations.set(clientId, resolve);
this.server.to(clientId).emit("confirmation_request", data);
// Optional timeout for response
setTimeout(() => {
if (this.pendingConfirmations.has(clientId)) {
this.pendingConfirmations.delete(clientId);
resolve(false); // Default to 'false' if timeout occurs
}
}, 3000000); // 3000 seconds timeout
});
}
// Handle client's confirmation response
@SubscribeMessage("confirmation_response")
handleClientResponse(
@MessageBody() data: { confirmed: boolean },
@ConnectedSocket() client: Socket
) {
const resolve = this.pendingConfirmations.get(client.id);
if (resolve) {
resolve(data.confirmed);
this.pendingConfirmations.delete(client.id);
}
}
// Start the agent process
private async startAgentProcess(
client: Socket,
actionData: { agent: AgentName }
) {
const graph = await this.agentFactory.create({
agentName: actionData.agent,
});
const initialInput = { input: "hello world" };
// Thread
const graphStateConfig = {
configurable: { thread_id: "1" },
streamMode: "values" as const,
};
// Run the graph until the first interruption
for await (const event of await graph.stream(
initialInput,
graphStateConfig
)) {
this.logAndEmit(client, `--- ${event.input} ---`);
}
// Will log when the graph is interrupted, after step 2.
this.logAndEmit(client, "---GRAPH INTERRUPTED---");
const userConfirmed = await this.sendConfirmationRequest(client.id, {
message: "Do you want to proceed with this action?",
actionData,
});
if (userConfirmed) {
// If approved, continue the graph execution. We must pass `null` as
// the input here, or the graph will
for await (const event of await graph.stream(null, graphStateConfig)) {
this.logAndEmit(client, `--- ${event.input} ---`);
}
this.logAndEmit(client, "---ACTION EXECUTED---");
} else {
this.logAndEmit(client, "---ACTION CANCELLED---");
}
// Optionally disconnect the client
client.disconnect();
}
private logAndEmit(client: Socket, message: string) {
console.log(message);
client.emit("message", { message });
}
}
ํต์ฌ์ ๊ฐ๋จํ๋ค. Socket์ด ์ฐ๊ฒฐ๋๋ฉด ๋ฐ๋ก agent๋ฅผ ์์ฑํ์ฌ ์ํํ๊ณ , ์ํํด์ interrupt ๋นํ๋ฉด Client์๊ฒ confirmation request message๋ฅผ ๋ณด๋ด๊ณ ๊ธฐ๋ค๋ฆฐ๋ค. confirmation์ด resolve๋๋ฉด ์ด์ด์ graph๋ฅผ ์งํํ๋ค.
์ ์ฝ๋์์ ์ฌ์ฉํ agent๋ langgraph ๋ฌธ์์ ์๋ ์๋ ์คํ
1 2 3 ์ ์์ฐจ์ ์ผ๋ก ์ฌ์ฉํ๋ ์์ด์ ํธ์ด๋ค.
const GraphState = Annotation.Root({
input: Annotation<string>,
});
const step1 = (state: typeof GraphState.State) => {
console.log("---Step 1---");
return state;
};
const step2 = (state: typeof GraphState.State) => {
console.log("---Step 2---");
return state;
};
const step3 = (state: typeof GraphState.State) => {
console.log("---Step 3---");
return state;
};
const builder = new StateGraph(GraphState)
.addNode("step1", step1)
.addNode("step2", step2)
.addNode("step3", step3)
.addEdge(START, "step1")
.addEdge("step1", "step2")
.addEdge("step2", "step3")
.addEdge("step3", END);
// Set up memory
const graphStateMemory = new MemorySaver();
const graph = builder.compile({
checkpointer: graphStateMemory,
interruptBefore: ["step3"],
});
return graph;
ํด๋ผ์ด์ธํธ
ํด๋ผ์ด์ธํธ์์๋ ํ
์ ๋ง๋ค์ด์ agent start์ ๊ทธ ์ํ๋ฅผ ๊ด๋ฆฌํ๋ค.
import { useRef, useState } from "react";
import io, { Socket } from "socket.io-client";
export const useAgentSocket = () => {
const socketRef = useRef<Socket | null>(null);
const [confirmationRequest, setConfirmationRequest] = useState<any>(null);
const [messages, setMessages] = useState<string[]>([]);
const connectAndRun = (actionData: any) => {
return new Promise((resolve, reject) => {
socketRef.current = io("http://localhost:8000", {
path: "/agent/start",
transports: ["websocket", "polling"],
query: {
actionData: JSON.stringify(actionData),
},
});
socketRef.current.on("connect", () => {
console.log("Connected:", socketRef.current?.id);
resolve(void 0);
});
socketRef.current.on("connect_error", (error) => {
console.error("Connection error:", error);
reject(error);
});
// Listen for confirmation requests
socketRef.current.on("confirmation_request", (data) => {
setConfirmationRequest(data);
});
// Listen for messages
socketRef.current.on("message", (data) => {
console.log("Received message:", data);
setMessages((prevMessages) => [...prevMessages, data.message]);
});
socketRef.current.on("disconnect", () => {
console.log("Disconnected from server");
});
});
};
const sendConfirmationResponse = (confirmed: boolean) => {
if (socketRef.current) {
socketRef.current.emit("confirmation_response", { confirmed });
setConfirmationRequest(null);
}
};
const disconnectSocket = () => {
if (socketRef.current) {
socketRef.current.disconnect();
}
};
const clearMessages = () => {
setMessages([]);
};
return {
confirmationRequest,
sendConfirmationResponse,
connectAndRun,
disconnectSocket,
messages,
clearMessages,
};
};
์ปค๋ฅ์ ์ ๋งบ๊ณ , confirmation request๊ฐ ์ค๋ฉด confirmationRequest ์ํ๋ฅผ ์ ๋ฐ์ดํธํ๋ค. UI component์์ confirmationRequest ์ํ๋ฅผ ๋ณด๊ณ ์ ์ ์๊ฒ ์ฐฝ์ ๋์์ฃผ๋ฉด ๋๋ค.
Featured ones: