订阅以接收新文章的通知:

使用 Knock 和 Cloudflare 的 Agents SDK 构建一个让人类参与最新情况的 AI 代理

2025-06-03

8 分钟阅读时间
这篇博文也有 English 版本。

这是 Knock 首席技术官 Chris Bell 的客座帖子

现在有很多关于构建 AI 代理的讨论,但却很少讨论如何使这些代理真正有用

代理是一种自治系统,旨在做出决策和执行行动以实现特定目标或一组目标,而无需人工输入。

无论您的智能体多么擅长决策,您都需要有人为它实现目标提供指导或建议。毕竟,一个无法与外界互动或响应的智能体及管理它的系统将是有限的,它能够解决的问题将会是有限的。

这就是“人机交互”交互模式的用武之地了。您让一个人进入智能体的控制循环,并要求人类进行输入,然后智能体才能继续执行其任务。

在本篇博客文章中,我们将使用 Knock 和 Cloudflare Agents SDK 为虚拟发卡工作流程构建 AI 代理,该工作流程在请求新信用卡时需要人工批准。

您可以在存储库中找到此示例的完整代码。

什么是 Knock?

Knock 是一种消息传递基础设施,可让您用于在应用内、电子邮件、短信、推送和 Slack 中发送多渠道消息,无需编写任何集成代码。

使用 Knock,您可以完全了解发送给用户的消息,同时处理可靠交付、用户通知首选项等。

您可以使用 Knock 的 Agent Toolkit为您的智能体提供人机交互流程,这是一套工具,向您的 AI 智能体公开 Knock 的 API 和消息传递功能。

使用 Agent SDK 作为 AI 智能体的基础

Agents SDK 提供了一个抽象概念,用于在 Durable Object 之上构建有状态的实时代理,这些代理是全局可寻址的,并使用嵌入式、零延迟 SQLite 数据库持久状态。

使用 Agents SDK 和 Cloudflare 平台之外构建 AI 智能体意味着我们需要考虑 WebSocket 服务器、状态持久性以及如何水平扩展服务。由于 Durable Object 支持 Agents SDK,我们免费获得了这些好处,同时拥有一个全球可寻址的内置存储的计算资源,完全无服务器,可扩展至零。

在本示例中,我们将使用这些功能来构建一个代理。用户可以通过聊天与之实时交互,并且可以根据需要暂停和恢复聊天。Agents SDK 是支持异步智能体工作流程的理想平台,例如人机交互交互中所需的工作流程。

设置 Knock 消息传递工作流程

在 Knock 中,我们使用可视化工作流构建器来创建跨渠道消息传递逻辑,从而设计审批工作流。然后,我们制作与要将消息发送到的每个渠道相关联的通知模板。

作为工作流执行的一部分,Knock 会自动应用用户的首选项,确保用户的通知设置得到尊重。

您可以在存储库中找到我们已经为此演示创建的示例工作流程。您可以通过 Knock CLI 将这个工作流模板导入您的帐户,以使用该工作流模板。

构建聊天 UI

我们在 Cloudflare Agents SDK 的 AIChatAgent 抽象基础上构建了这个聊天界面的 AI 智能体(文档)。Agents SDK 在这里解决了大部分复杂问题,剩下来就是利用系统提示词实现 LLM 调用代码。

// src/index.ts

import { AIChatAgent } from "agents/ai-chat-agent";
import { openai } from "@ai-sdk/openai";
import { createDataStreamResponse, streamText } from "ai";

export class AIAgent extends AIChatAgent {
  async onChatMessage(onFinish) {
    return createDataStreamResponse({
      execute: async (dataStream) => {
        try {
          const stream = streamText({
            model: openai("gpt-4o-mini"),
            system: `You are a helpful assistant for a financial services company. You help customers with credit card issuing.`,
            messages: this.messages,
            onFinish,
            maxSteps: 5,
          });

          stream.mergeIntoDataStream(dataStream);
        } catch (error) {
          console.error(error);
        }
      },
    });
  }
}

在客户端,我们使用 agents/ai-react 包中的 useAgentChat hook 来支持用户与智能体的实时聊天。

我们已将代理建模为每个用户的聊天,通过将进程的名称指定为 userId ,来使用 useAgent 钩子来设置代理。

// src/index.ts

import { useAgent } from "agents/react";
import { useAgentChat } from "agents/ai-react";

function Chat({ userId }: { userId: string }) {
  const agent = useAgent({ agent: "AIAgent", name: userId });
  const { messages, input, handleInputChange, handleSubmit, isLoading } = useAgentChat({ agent });
  // ... 
}

这意味着我们有一个代理进程,因此每个用户的对象也有一个持久化。对于我们的人机交互用例,这在稍后我们要恢复延迟的工具调用时会变得很重要。

推迟对 Knock 的工具调用

我们通过公开 issueCard 工具,为代理提供我们的发卡能力。但是,我们没有自己编写审批流程和跨渠道逻辑,而是通过将问题卡工具包装在 requireHumanInput 方法中,将其完全委托给 Knock。

现在,当用户要求申请新卡片时,我们会调用 Knock 以发起我们的卡片请求,这将通知组织中相应的管理员以请求批准。

为此,我们需要使用 Knock 的智能体工具包,该工具包提供了在我们的 AI 智能体中与 Knock 协同工作的方法,并支持跨渠道消息传递。

import { createKnockToolkit } from "@knocklabs/agent-toolkit/ai-sdk";
import { tool } from "ai";
import { z } from "zod";

import { AIAgent } from "./index";
import { issueCard } from "./api";
import { BASE_URL } from "./constants";

async function initializeToolkit(agent: AIAgent) {
  const toolkit = await createKnockToolkit({ serviceToken: agent.env.KNOCK_SERVICE_TOKEN });

  const issueCardTool = tool({
    description: "Issue a new credit card to a customer.",
    parameters: z.object({
      customerId: z.string(),
    }),
    execute: async ({ customerId }) => {
      return await issueCard(customerId);
    },
  });

  const { issueCard } = toolkit.requireHumanInput(
    { issueCard: issueCardTool },
    {
      workflow: "approve-issued-card",
      actor: agent.name,
      recipients: ["admin_user_1"],
      metadata: {
        approve_url: `${BASE_URL}/card-issued/approve`,
        reject_url: `${BASE_URL}/card-issued/reject`,
      },
    }
  );
  
  return { toolkit, tools: { issueCard } };  
}

这里包含很多内容,让我们过一下关键部分:

  • 我们将 issueCard 工具包装在 requireHumanInput 方法中,由 Knock Agent 工具包公开

  • 我们希望调用消息传递工作流,成为批准发放卡工作流

  • 我们传递 Agent.name 作为请求的参与者,后者会转换为用户 ID

  • 我们将此工作流的收件人设置为用户 admin_user_1

  • 我们传递批准和拒绝 URL,以便在我们的消息模板中使用

  • 包装的工具将作为issueCard返回

在底层,这些选项被传递到Knock 工作流触发器 API以调用每个收件人的工作流。此处列出的收件人集可以是动态的,或者转到通过Knock 的订阅 API的一组用户。

然后,我们可以将打包的发行卡工具传递给代理上 onChatMessage 方法中的 LLM 调用,以便可以在与代理进行交互的过程中调用工具调用。

export class AIAgent extends AIChatAgent {
  // ... other methods

  async onChatMessage(onFinish) {
    const { tools } = await initializeToolkit(this);

    return createDataStreamResponse({
      execute: async (dataStream) => {
        const stream = streamText({
          model: openai("gpt-4o-mini"),
          system: "You are a helpful assistant for a financial services company. You help customers with credit card issuing.",
          messages: this.messages,
          onFinish,
          tools,
          maxSteps: 5,
        });

        stream.mergeIntoDataStream(dataStream);
      },
    });
  }
}

现在,当智能体调用issueCardTool 时,我们调用 Knock 来发送审批通知,从而推迟了工具调用以发行卡片,直到收到批准为止。Knock 的工作流程负责将消息发送到一组指定的收件人,根据每个用户的偏好生成并传递消息。

将 Knock 工作流程用于审批消息,可以轻松构建跨渠道消息传递,以根据用户的通信 偏好到达用户。我们还可以利用延迟限流批处理条件来编排更复杂的消息传递。

处理审批

消息发送给批准者后,下一步就是处理返回的批准,将人工交互带入智能体的循环中。

批准请求是异步的,这意味着响应可以在未来的任何时候到达。幸运的是,Knock 已经为您处理这里的繁重工作,通过一个跟踪与底层消息交互的 webhook 将事件路由到智能体 worker。在我们的例子中,只需点击“批准”或“拒绝”按钮。

首先,我们在 Knock 仪表板中设置一个 message.interacted webhook 处理程序,将交互转发给我们的 Worker,并最终转发给我们的代理进程。

在我们的示例中,我们将批准点击路由回 Worker 进行处理,将 Knock 消息 ID 附加到批准 URL拒绝 URL的末尾,以跟踪对所发送特定消息的参与度。我们通过 Knock 中消息模板中的 Liquid 来实现这一点:{{ data.approve_url }}?messageId={{ current_message.id }} 。需要注意的是,如果这是一个生产应用程序,我们很可能会在另一个应用程序中处理批准点击,而不是运行此代理。我们将其放在这里仅为了演示之目的。

点击链接后,worker 中的处理程序将使用 Knock 的消息交互 API将消息标记为已交互,并将状态作为元数据传递,以便以后使用。

import Knock from '@knocklabs/node';
import { Hono } from "hono";

const app = new Hono();
const client = new Knock();

app.get("/card-issued/approve", async (c) => {
  const { messageId } = c.req.query();
  
  if (!messageId) return c.text("No message ID found", { status: 400 });

  await client.messages.markAsInteracted(messageId, {
    status: "approved",
  });

  return c.text("Approved");
});

消息交互将通过我们设置的 webhook 从 Knock 进行,确保这个过程完全异步。webhook 的恶意有效负载包括完整的消息,其中包括有关生成原始请求的用户的元数据,并保留了有关请求本身的详细信息(在我们的例子中包含了工具调用)。

import { getAgentByName, routeAgentRequest } from "agents";
import { Hono } from "hono";

const app = new Hono();

app.post("/incoming/knock/webhook", async (c) => {
  const body = await c.req.json();
  const env = c.env as Env;

  // Find the user ID from the tool call for the calling user
  const userId = body?.data?.actors[0];

  if (!userId) {
    return c.text("No user ID found", { status: 400 });
  }

  // Find the agent DO for the user
  const existingAgent = await getAgentByName(env.AIAgent, userId);

  if (existingAgent) {
    // Route the request to the agent DO to process
    const result = await existingAgent.handleIncomingWebhook(body);

    return c.json(result);
  } else {
    return c.text("Not found", { status: 404 });
  }
});

我们利用代理通过命名标识符寻址的能力,将请求从 Worker 路由到代理。在我们的例子中,就是 userId 。因为代理由一个 Durable 对象提供支持,所以从传入 Worker 请求到查找并恢复代理的过程非常简单。

恢复推迟的工具调用

然后,我们使用原始工具调用的上下文(传递给 Knock 并往返回给智能体),恢复工具执行并发放卡片。

export class AIAgent extends AIChatAgent {
  // ... other methods

  async handleIncomingWebhook(body: any) {
    const { toolkit } = await initializeToolkit(this);

    const deferredToolCall = toolkit.handleMessageInteraction(body);

    if (!deferredToolCall) {
      return { error: "No deferred tool call given" };
    }

    // If we received an "approved" status then we know the call was approved 
    // so we can resume the deferred tool call execution
    if (result.interaction.status === "approved") {
      const toolCallResult = 
	      await toolkit.resumeToolExecution(result.toolCall);

      const { response } = await generateText({
        model: openai("gpt-4o-mini"),
        prompt: `You were asked to issue a card for a customer. The card is now approved. The result was: ${JSON.stringify(toolCallResult)}.`,
      });

      const message = responseToAssistantMessage(
        response.messages[0],
        result.toolCall,
        toolCallResult
      );

      // Save the message so that it's displayed to the user
      this.persistMessages([...this.messages, message]);
    }

    return { status: "success" };
  }
}

同样,这里发生了很多事情,所以我们来逐步了解重要的部分:

  • 我们尝试通过 handleMessageInteraction 方法将正文(来自 Knock 的 webhook 有效负载)转换为延迟工具调用

  • 如果我们传递给之前交互调用的元数据状态为“approved”,那么我们通过 restoreToolExecution 方法恢复工具调用

  • 最后,我们从 LLM 生成一条消息并将其保留,以确保用户了解已批准的信用卡

完成最后一部分后,我们现在可以要求签发新卡,从代理发出批准请求,发送批准消息,并将这些批准路由回我们的代理进行处理。只需很少的代码,代理将异步处理我们的发卡请求,并为我们恢复延迟的工具调用。

防止重复批准

上述实现的一个问题是,如果有人点击批准按钮以上,我们很容易发放多张卡。为纠正这一问题,我们要跟踪正在发出的工具调用,并确保最多处理一次调用。

为了驱动这一点,我们利用 代理的内置 state,它可用于持久化信息,而无需访问其他持久性存储(如数据库或 Redis),尽管如果我们愿意,我们绝对可以这样做。我们可以通过工具 ID 跟踪工具调用,并在代理进程内捕获其当前状态。

type ToolCallStatus = "requested" | "approved" | "rejected";

export interface AgentState {
  toolCalls: Record<string, ToolCallStatus>;
}

class AIAgent extends AIChatAgent<Env, AgentState> {
  initialState: AgentState = {
    toolCalls: {},
  };
  
  setToolCallStatus(toolCallId: string, status: ToolCallStatus) {
    this.setState({
      ...this.state,
      toolCalls: { ...this.state.toolCalls, [toolCallId]: status },
    });
  } 
  // ... 
}

在这里,我们将工具调用的初始状态创建为空对象。我们还添加了一个快速设置辅助方法,以便更轻松地交互。

下一步,我们需要记录正在进行的工具调用。为此,我们可以使用 requireHumanInput 帮助程序中的 onAfterCallKnock 选项来捕获已为用户请求工具调用。

const { issueCard }  = toolkit.requireHumanInput(
  { issueCard: issueCardTool },
  {
    // Keep track of the tool call state once it's been sent to Knock
    onAfterCallKnock: async (toolCall) => 
      agent.setToolCallStatus(toolCall.id, "requested"),
    // ... as before
  }
);

最后,我们需要在处理传入 webhook 时检查状态,并将工具调用标记为已批准(为简洁起见,省略了一些代码)。

export class AIAgent extends AIChatAgent {
  async handleIncomingWebhook(body: any) {
    const { toolkit } = await initializeToolkit(this);
    const deferredToolCall = toolkit.handleMessageInteraction(body);
    const toolCallId = result.toolCall.id;

    // Make sure this is a tool call that can be processed
    if (this.state.toolCalls[toolCallId] !== "requested") {
      return { error: "Tool call is not requested" };
    }

    if (result.interaction.status === "approved") {
      const toolCallResult = await toolkit.resumeToolExecution(result.toolCall);
      this.setToolCallStatus(toolCallId, "approved");
      // ... rest as before
    }
  }
}

总结

使用 Agents SDK 和 Knock,可轻松构建推迟工具调用的高级人机交互体验。

Knock 的工作流构建器和通知引擎为您提供构建块,为您的代理创建复杂的跨渠道消息传递。您能轻松创建升级流程,通过短信、推送、电子邮件或 Slack 发送消息,同时尊重用户的通知偏好。Knock 还可以让您对用户接收的消息拥有完全的可见性。

Agents SDK 下方的 Durable Object 抽象意味着我们获得了一个可全局寻址的代理进程,很容易让出并恢复到这个进程。利用 Durable Object 中的持久性存储,我们可以保留每个用户的完整聊天历史记录,以及在智能体进程中恢复智能体所需的任何其他状态(例如我们的工具调用)。最后,底层 Durable Object 的无服务器特性意味着我们能够水平扩展,毫不费力地支持大量用户。

如果您希望利用多人循环体验构建自己的 AI 智能体聊天体验,可在GitHub中找到本指南中的完整代码。

我们保护整个企业网络,帮助客户高效构建互联网规模的应用程序,加速任何网站或互联网应用程序抵御 DDoS 攻击,防止黑客入侵,并能协助您实现 Zero Trust 的过程

从任何设备访问 1.1.1.1,以开始使用我们的免费应用程序,帮助您更快、更安全地访问互联网。要进一步了解我们帮助构建更美好互联网的使命,请从这里开始。如果您正在寻找新的职业方向,请查看我们的空缺职位
AI智能体Cloudflare WorkersDurable Objects开发人员平台开发人员

在 X 上关注

Chris Bell (Guest author)|cjbell_
Cloudflare|@cloudflare

相关帖子