Logo

dev-resources.site

for different kinds of informations.

Langgraph Human In The Loop with socket

Published at
11/13/2024
Categories
langgraph
webdev
javascript
Author
siisee11
Categories
3 categories in total
langgraph
open
webdev
open
javascript
open
Author
8 person written this
siisee11
open
Langgraph Human In The Loop with socket

langgraph ์˜ interruption ๊ธฐ๋Šฅ์„ ํ†ตํ•ด์„œ Agent์˜ ์ˆ˜ํ–‰ ์ค‘๊ฐ„์— human์ด ๊ฐœ์ž…ํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ๊ฒƒ์„ ์•Œ์•˜๋‹ค.

ํ•˜์ง€๋งŒ ์˜ˆ์‹œ๋“ค์„ ๋ณด๋ฉด ์ „๋ถ€ human interaction์€ ํ•œ ์…ˆ์น˜๊ณ ~ ๋„˜์–ด๊ฐ„๋‹ค. ์‹ค์ œ๋กœ User์—๊ฒŒ ํ™•์ธ์„ ๋ฐ›์œผ๋ ค๋ฉด ์–ด๋–ป๊ฒŒ ํ•ด์•ผํ• ๊นŒ? ํฌ๊ฒŒ ์„ธ๊ฐ€์ง€ ๋ฐฉ๋ฒ•์ด ์žˆ์„ ๊ฒƒ ๊ฐ™๋‹ค.

Langgraph API ์„œ๋ฒ„ ์‚ฌ์šฉ

langgraph cli ๋กœ langgraph API ์„œ๋ฒ„๋ฅผ docker๋กœ ์‹คํ–‰ํ•œ ํ›„ langgraph SDK๋กœ ๊ทธ๋ž˜ํ”„๋ฅผ ์‹คํ–‰ํ•˜๊ณ , ์Šคํ…Œ์ดํŠธ๋ฅผ ๋ณ€๊ฒฝํ•˜๊ณ , ์žฌ๊ฒŒํ•˜๊ณ  ํ•  ์ˆ˜ ์žˆ๋‹ค.

langgraph์—์„œ ์ œ๊ณตํ•˜๋Š” ๊ฒƒ๋“ค์„ ์ œ๊ณตํ•˜๋Š” ๋ฐฉ๋ฒ•๋Œ€๋กœ ์‚ฌ์šฉํ•ด์•ผํ•œ๋‹ค. ๋ญ”๊ฐ„ ์„ค์ •์ด ๋งŽ์•„์ง€๊ณ , ๋‚ด ์ฝ”๋“œ๋ž‘ ์œตํ•ฉํ•˜๊ธฐ ๊นŒ๋‹ค๋กœ์šธ ์ˆ˜ ์žˆ์–ด๋ณด์ธ๋‹ค.

์„œ๋ฒ„์—์„œ ๊ทธ๋ž˜ํ”„ ๊ด€๋ฆฌ

์œ„์˜ Langgraph API ์„œ๋ฒ„์—์„œ ํ•„์š”ํ•œ ๋ถ€๋ถ„๋งŒ ๋‚ด ์ปค์Šคํ…€ ์„œ๋ฒ„์— ๊ตฌํ˜„ํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ๊ทธ๋ž˜ํ”„ ์‹คํ–‰ํ•˜๋ฉด ๊ทธ๋ž˜ํ”„๋ฅผ ์‹คํ–‰ํ•œ ํด๋ผ์ด์–ธํŠธ์™€ ๊ทธ๋ž˜ํ”„ ์ฒดํฌํฌ์ธํŠธ๋ฅผ ์ €์žฅํ•ด์•ผํ•˜๊ณ , ์œ ์ €์˜ ํ™•์ธ ํ›„์— ๋‹ค์‹œ ๊ทธ๋ž˜ํ”„๋ฅผ ๋ถˆ๋Ÿฌ์™€์„œ ์œ ์ €์˜ ์‘๋‹ต์— ๋งž๊ฒŒ ์ƒํƒœ๋ฅผ ๋ณ€๊ฒฝํ•ด์„œ ์žฌ๊ฒŒํ•ด์•ผํ•œ๋‹ค.

์งœ์•ผํ• ๊ฒŒ ์€๊ทผ ๋งŽ์„ ์ˆ˜๋„ ์žˆ๋‹ค.

์†Œ์ผ“ ์—ฐ๊ฒฐ

Agent์‹คํ–‰ ์‹œ์— ์†Œ์ผ“์„ ์—ฐ๊ฒฐํ•˜๊ณ  ์†Œ์ผ“์„ ํ†ตํ•ด์„œ ์œ ์ €์™€ ์ธํ„ฐ๋ ‰์…˜ ํ•˜๋Š” ๊ฒƒ์ด๋‹ค. ๊ธฐ์กด ์˜ˆ์‹œ ์ฝ”๋“œ์—์„œ ์†Œ์ผ“์—ฐ๊ฒฐ๊ณผ ์†Œ์ผ“ ํ†ต์‹ ์œผ๋กœ ์œ ์ € ํ™•์ธ ๋ฐ›๋Š” ๋‹จ๊ณ„๋งŒ ์ถ”๊ฐ€ํ•˜๋ฉด ๋™์ž‘ํ•œ๋‹ค.

๋Œ€์‹ , ๊ธ€์ž ํƒ€์ดํ•‘ํ•˜๋“ฏ ์ณ์ง€๋Š” streaming์„ ๊ตฌํ˜„ํ•˜๊ธฐ ๊นŒ๋‹ค๋กœ์šธ ์ˆ˜๋„ ์žˆ๋‹ค.

์†Œ์ผ“ ์—ฐ๊ฒฐ๋กœ ๊ตฌํ˜„

์ผ๋‹จ ์ตœ๋Œ€ํ•œ ๋ณต์žก์„ฑ์„ ๋Š˜๋ฆฌ์ง€ ์•Š๋Š” ๋ฐฉํ–ฅ์—์„œ ๊ตฌํ˜„์„ ํ•ด๋ณด๊ณ  ์‹ถ์–ด์„œ ์†Œ์ผ“์—ฐ๊ฒฐ๋กœ ๊ตฌํ˜„ํ•ด๋ณด์•˜๋‹ค.

์„œ๋ฒ„๋Š” NestJs๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ํด๋ผ์ด์–ธํŠธ๋Š” NextJs๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

์„œ๋ฒ„

์ผ๋‹จ Websocket ์—ฐ๊ฒฐ์„ ์œ„ํ•ด Gateway๋ฅผ ๋งŒ๋“ ๋‹ค. agent/start ์‹œ์— ์ปค๋„ฅ์…˜์„ ๋งŒ๋“ค๊ณ  ๋ฐ”๋กœ agent๋ฅผ ์‹œํ–‰ํ•˜๋„๋ก ํ–ˆ๋‹ค.

@WebSocketGateway({
  namespace: "/",
  transport: ["websocket", "polling"],
  path: "/agent/start",
  cors: {
    origin: "*",
    methods: ["GET", "POST"],
    credentials: true,
  },
})
export class AgentGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
  protected readonly logger = new Logger(this.constructor.name);

  constructor(
    private readonly agentFactory: AgentFactory
  ) {}

  private pendingConfirmations = new Map<string, (response: boolean) => void>();

  // Handle new connections
  handleConnection(client: Socket) {
    console.log(`Client connected: ${client.id}`);

    // Option 1: Get actionData from query parameters
    const actionData: { agent: AgentName } = client.handshake.query.actionData
      ? JSON.parse(client.handshake.query.actionData as string)
      : null;

    if (actionData) {
      this.startAgentProcess(client, actionData);
    } else {
      // If no actionData is provided, you can wait for an event
      client.emit("error", "No action data provided");
      client.disconnect();
    }
  }

  // Handle disconnections
  handleDisconnect(client: Socket) {
    console.log(`Client disconnected: ${client.id}`);
    this.pendingConfirmations.delete(client.id);
  }

  // Send confirmation request
  async sendConfirmationRequest(clientId: string, data: any): Promise<boolean> {
    return new Promise((resolve) => {
      this.pendingConfirmations.set(clientId, resolve);
      this.server.to(clientId).emit("confirmation_request", data);

      // Optional timeout for response
      setTimeout(() => {
        if (this.pendingConfirmations.has(clientId)) {
          this.pendingConfirmations.delete(clientId);
          resolve(false); // Default to 'false' if timeout occurs
        }
      }, 3000000); // 3000 seconds timeout
    });
  }

  // Handle client's confirmation response
  @SubscribeMessage("confirmation_response")
  handleClientResponse(
    @MessageBody() data: { confirmed: boolean },
    @ConnectedSocket() client: Socket
  ) {
    const resolve = this.pendingConfirmations.get(client.id);
    if (resolve) {
      resolve(data.confirmed);
      this.pendingConfirmations.delete(client.id);
    }
  }

  // Start the agent process
  private async startAgentProcess(
    client: Socket,
    actionData: { agent: AgentName }
  ) {
    const graph = await this.agentFactory.create({
      agentName: actionData.agent,
    });

    const initialInput = { input: "hello world" };

    // Thread
    const graphStateConfig = {
      configurable: { thread_id: "1" },
      streamMode: "values" as const,
    };

    // Run the graph until the first interruption
    for await (const event of await graph.stream(
      initialInput,
      graphStateConfig
    )) {
      this.logAndEmit(client, `--- ${event.input} ---`);
    }

    // Will log when the graph is interrupted, after step 2.
    this.logAndEmit(client, "---GRAPH INTERRUPTED---");

    const userConfirmed = await this.sendConfirmationRequest(client.id, {
      message: "Do you want to proceed with this action?",
      actionData,
    });

    if (userConfirmed) {
      // If approved, continue the graph execution. We must pass `null` as
      // the input here, or the graph will
      for await (const event of await graph.stream(null, graphStateConfig)) {
        this.logAndEmit(client, `--- ${event.input} ---`);
      }
      this.logAndEmit(client, "---ACTION EXECUTED---");
    } else {
      this.logAndEmit(client, "---ACTION CANCELLED---");
    }

    // Optionally disconnect the client
    client.disconnect();
  }

  private logAndEmit(client: Socket, message: string) {
    console.log(message);
    client.emit("message", { message });
  }
}
Enter fullscreen mode Exit fullscreen mode

ํ•ต์‹ฌ์€ ๊ฐ„๋‹จํ•˜๋‹ค. Socket์ด ์—ฐ๊ฒฐ๋˜๋ฉด ๋ฐ”๋กœ agent๋ฅผ ์ƒ์„ฑํ•˜์—ฌ ์ˆ˜ํ–‰ํ•˜๊ณ , ์ˆ˜ํ–‰ํ•ด์„œ interrupt ๋‹นํ•˜๋ฉด Client์—๊ฒŒ confirmation request message๋ฅผ ๋ณด๋‚ด๊ณ  ๊ธฐ๋‹ค๋ฆฐ๋‹ค. confirmation์ด resolve๋˜๋ฉด ์ด์–ด์„œ graph๋ฅผ ์ง„ํ–‰ํ•œ๋‹ค.

์œ„ ์ฝ”๋“œ์—์„œ ์‚ฌ์šฉํ•œ agent๋Š” langgraph ๋ฌธ์„œ์— ์žˆ๋Š” ์•„๋ž˜ ์Šคํ… 1 2 3 ์„ ์ˆœ์ฐจ์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๋Š” ์—์ด์ „ํŠธ์ด๋‹ค.

  const GraphState = Annotation.Root({
    input: Annotation<string>,
  });

  const step1 = (state: typeof GraphState.State) => {
    console.log("---Step 1---");
    return state;
  };

  const step2 = (state: typeof GraphState.State) => {
    console.log("---Step 2---");
    return state;
  };

  const step3 = (state: typeof GraphState.State) => {
    console.log("---Step 3---");
    return state;
  };

  const builder = new StateGraph(GraphState)
    .addNode("step1", step1)
    .addNode("step2", step2)
    .addNode("step3", step3)
    .addEdge(START, "step1")
    .addEdge("step1", "step2")
    .addEdge("step2", "step3")
    .addEdge("step3", END);

  // Set up memory
  const graphStateMemory = new MemorySaver();

  const graph = builder.compile({
    checkpointer: graphStateMemory,
    interruptBefore: ["step3"],
  });
  return graph;
Enter fullscreen mode Exit fullscreen mode

ํด๋ผ์ด์–ธํŠธ

ํด๋ผ์ด์–ธํŠธ์—์„œ๋Š” ํ›…์„ ๋งŒ๋“ค์–ด์„œ agent start์™€ ๊ทธ ์ƒํƒœ๋ฅผ ๊ด€๋ฆฌํ•œ๋‹ค.

import { useRef, useState } from "react";
import io, { Socket } from "socket.io-client";

export const useAgentSocket = () => {
  const socketRef = useRef<Socket | null>(null);
  const [confirmationRequest, setConfirmationRequest] = useState<any>(null);
  const [messages, setMessages] = useState<string[]>([]);

  const connectAndRun = (actionData: any) => {
    return new Promise((resolve, reject) => {
      socketRef.current = io("http://localhost:8000", {
        path: "/agent/start",
        transports: ["websocket", "polling"],
        query: {
          actionData: JSON.stringify(actionData),
        },
      });

      socketRef.current.on("connect", () => {
        console.log("Connected:", socketRef.current?.id);
        resolve(void 0);
      });

      socketRef.current.on("connect_error", (error) => {
        console.error("Connection error:", error);
        reject(error);
      });

      // Listen for confirmation requests
      socketRef.current.on("confirmation_request", (data) => {
        setConfirmationRequest(data);
      });

      // Listen for messages
      socketRef.current.on("message", (data) => {
        console.log("Received message:", data);
        setMessages((prevMessages) => [...prevMessages, data.message]);
      });

      socketRef.current.on("disconnect", () => {
        console.log("Disconnected from server");
      });
    });
  };

  const sendConfirmationResponse = (confirmed: boolean) => {
    if (socketRef.current) {
      socketRef.current.emit("confirmation_response", { confirmed });
      setConfirmationRequest(null);
    }
  };

  const disconnectSocket = () => {
    if (socketRef.current) {
      socketRef.current.disconnect();
    }
  };

  const clearMessages = () => {
    setMessages([]);
  };

  return {
    confirmationRequest,
    sendConfirmationResponse,
    connectAndRun,
    disconnectSocket,
    messages,
    clearMessages,
  };
};
Enter fullscreen mode Exit fullscreen mode

์ปค๋„ฅ์…˜์„ ๋งบ๊ณ , confirmation request๊ฐ€ ์˜ค๋ฉด confirmationRequest ์ƒํƒœ๋ฅผ ์—…๋ฐ์ดํŠธํ•œ๋‹ค. UI component์—์„œ confirmationRequest ์ƒํƒœ๋ฅผ ๋ณด๊ณ  ์œ ์ €์—๊ฒŒ ์ฐฝ์„ ๋„์›Œ์ฃผ๋ฉด ๋œ๋‹ค.

Featured ones: