这是 Knock 首席技术官 Chris Bell 的客座帖子
现在有很多关于构建 AI 代理的讨论,但却很少讨论如何使这些代理真正有用。
代理是一种自治系统,旨在做出决策和执行行动以实现特定目标或一组目标,而无需人工输入。
无论您的智能体多么擅长决策,您都需要有人为它实现目标提供指导或建议。毕竟,一个无法与外界互动或响应的智能体及管理它的系统将是有限的,它能够解决的问题将会是有限的。
这就是“人机交互”交互模式的用武之地了。您让一个人进入智能体的控制循环,并要求人类进行输入,然后智能体才能继续执行其任务。
在本篇博客文章中,我们将使用 Knock 和 Cloudflare Agents SDK 为虚拟发卡工作流程构建 AI 代理,该工作流程在请求新信用卡时需要人工批准。
您可以在存储库中找到此示例的完整代码。
什么是 Knock?
Knock 是一种消息传递基础设施,可让您用于在应用内、电子邮件、短信、推送和 Slack 中发送多渠道消息,无需编写任何集成代码。
使用 Knock,您可以完全了解发送给用户的消息,同时处理可靠交付、用户通知首选项等。
您可以使用 Knock 的 Agent Toolkit为您的智能体提供人机交互流程,这是一套工具,向您的 AI 智能体公开 Knock 的 API 和消息传递功能。
使用 Agent SDK 作为 AI 智能体的基础
Agents SDK 提供了一个抽象概念,用于在 Durable Object 之上构建有状态的实时代理,这些代理是全局可寻址的,并使用嵌入式、零延迟 SQLite 数据库持久状态。
使用 Agents SDK 和 Cloudflare 平台之外构建 AI 智能体意味着我们需要考虑 WebSocket 服务器、状态持久性以及如何水平扩展服务。由于 Durable Object 支持 Agents SDK,我们免费获得了这些好处,同时拥有一个全球可寻址的内置存储的计算资源,完全无服务器,可扩展至零。
在本示例中,我们将使用这些功能来构建一个代理。用户可以通过聊天与之实时交互,并且可以根据需要暂停和恢复聊天。Agents SDK 是支持异步智能体工作流程的理想平台,例如人机交互交互中所需的工作流程。
设置 Knock 消息传递工作流程
在 Knock 中,我们使用可视化工作流构建器来创建跨渠道消息传递逻辑,从而设计审批工作流。然后,我们制作与要将消息发送到的每个渠道相关联的通知模板。
作为工作流执行的一部分,Knock 会自动应用用户的首选项,确保用户的通知设置得到尊重。
您可以在存储库中找到我们已经为此演示创建的示例工作流程。您可以通过 Knock CLI 将这个工作流模板导入您的帐户,以使用该工作流模板。
构建聊天 UI
我们在 Cloudflare Agents SDK 的 AIChatAgent
抽象基础上构建了这个聊天界面的 AI 智能体(文档)。Agents SDK 在这里解决了大部分复杂问题,剩下来就是利用系统提示词实现 LLM 调用代码。
// src/index.ts
import { AIChatAgent } from "agents/ai-chat-agent";
import { openai } from "@ai-sdk/openai";
import { createDataStreamResponse, streamText } from "ai";
export class AIAgent extends AIChatAgent {
async onChatMessage(onFinish) {
return createDataStreamResponse({
execute: async (dataStream) => {
try {
const stream = streamText({
model: openai("gpt-4o-mini"),
system: `You are a helpful assistant for a financial services company. You help customers with credit card issuing.`,
messages: this.messages,
onFinish,
maxSteps: 5,
});
stream.mergeIntoDataStream(dataStream);
} catch (error) {
console.error(error);
}
},
});
}
}
在客户端,我们使用 agents/ai-react
包中的 useAgentChat
hook 来支持用户与智能体的实时聊天。
我们已将代理建模为每个用户的聊天,通过将进程的名称指定为 userId
,来使用 useAgent
钩子来设置代理。
// src/index.ts
import { useAgent } from "agents/react";
import { useAgentChat } from "agents/ai-react";
function Chat({ userId }: { userId: string }) {
const agent = useAgent({ agent: "AIAgent", name: userId });
const { messages, input, handleInputChange, handleSubmit, isLoading } = useAgentChat({ agent });
// ...
}
这意味着我们有一个代理进程,因此每个用户的对象也有一个持久化。对于我们的人机交互用例,这在稍后我们要恢复延迟的工具调用时会变得很重要。
推迟对 Knock 的工具调用
我们通过公开 issueCard
工具,为代理提供我们的发卡能力。但是,我们没有自己编写审批流程和跨渠道逻辑,而是通过将问题卡工具包装在 requireHumanInput
方法中,将其完全委托给 Knock。
现在,当用户要求申请新卡片时,我们会调用 Knock 以发起我们的卡片请求,这将通知组织中相应的管理员以请求批准。
为此,我们需要使用 Knock 的智能体工具包,该工具包提供了在我们的 AI 智能体中与 Knock 协同工作的方法,并支持跨渠道消息传递。
import { createKnockToolkit } from "@knocklabs/agent-toolkit/ai-sdk";
import { tool } from "ai";
import { z } from "zod";
import { AIAgent } from "./index";
import { issueCard } from "./api";
import { BASE_URL } from "./constants";
async function initializeToolkit(agent: AIAgent) {
const toolkit = await createKnockToolkit({ serviceToken: agent.env.KNOCK_SERVICE_TOKEN });
const issueCardTool = tool({
description: "Issue a new credit card to a customer.",
parameters: z.object({
customerId: z.string(),
}),
execute: async ({ customerId }) => {
return await issueCard(customerId);
},
});
const { issueCard } = toolkit.requireHumanInput(
{ issueCard: issueCardTool },
{
workflow: "approve-issued-card",
actor: agent.name,
recipients: ["admin_user_1"],
metadata: {
approve_url: `${BASE_URL}/card-issued/approve`,
reject_url: `${BASE_URL}/card-issued/reject`,
},
}
);
return { toolkit, tools: { issueCard } };
}
这里包含很多内容,让我们过一下关键部分:
我们将
issueCard
工具包装在requireHumanInput
方法中,由 Knock Agent 工具包公开我们希望调用消息传递工作流,成为
批准发放卡
工作流我们传递 Agent.name 作为请求的
参与者
,后者会转换为用户 ID我们将此工作流的收件人设置为用户
admin_user_1
我们传递批准和拒绝 URL,以便在我们的消息模板中使用
包装的工具将作为
issueCard
返回
在底层,这些选项被传递到Knock 工作流触发器 API以调用每个收件人的工作流。此处列出的收件人集可以是动态的,或者转到通过Knock 的订阅 API的一组用户。
然后,我们可以将打包的发行卡工具传递给代理上 onChatMessage
方法中的 LLM 调用,以便可以在与代理进行交互的过程中调用工具调用。
export class AIAgent extends AIChatAgent {
// ... other methods
async onChatMessage(onFinish) {
const { tools } = await initializeToolkit(this);
return createDataStreamResponse({
execute: async (dataStream) => {
const stream = streamText({
model: openai("gpt-4o-mini"),
system: "You are a helpful assistant for a financial services company. You help customers with credit card issuing.",
messages: this.messages,
onFinish,
tools,
maxSteps: 5,
});
stream.mergeIntoDataStream(dataStream);
},
});
}
}
现在,当智能体调用issueCardTool
时,我们调用 Knock 来发送审批通知,从而推迟了工具调用以发行卡片,直到收到批准为止。Knock 的工作流程负责将消息发送到一组指定的收件人,根据每个用户的偏好生成并传递消息。
将 Knock 工作流程用于审批消息,可以轻松构建跨渠道消息传递,以根据用户的通信 偏好到达用户。我们还可以利用延迟、限流、批处理和条件来编排更复杂的消息传递。
处理审批
消息发送给批准者后,下一步就是处理返回的批准,将人工交互带入智能体的循环中。
批准请求是异步的,这意味着响应可以在未来的任何时候到达。幸运的是,Knock 已经为您处理这里的繁重工作,通过一个跟踪与底层消息交互的 webhook 将事件路由到智能体 worker。在我们的例子中,只需点击“批准”或“拒绝”按钮。
首先,我们在 Knock 仪表板中设置一个 message.interacted
webhook 处理程序,将交互转发给我们的 Worker,并最终转发给我们的代理进程。
在我们的示例中,我们将批准点击路由回 Worker 进行处理,将 Knock 消息 ID 附加到批准 URL
和拒绝 URL
的末尾,以跟踪对所发送特定消息的参与度。我们通过 Knock 中消息模板中的 Liquid 来实现这一点:{{ data.approve_url }}?messageId={{ current_message.id }}
。需要注意的是,如果这是一个生产应用程序,我们很可能会在另一个应用程序中处理批准点击,而不是运行此代理。我们将其放在这里仅为了演示之目的。
点击链接后,worker 中的处理程序将使用 Knock 的消息交互 API将消息标记为已交互,并将状态作为元数据传递,以便以后使用。
import Knock from '@knocklabs/node';
import { Hono } from "hono";
const app = new Hono();
const client = new Knock();
app.get("/card-issued/approve", async (c) => {
const { messageId } = c.req.query();
if (!messageId) return c.text("No message ID found", { status: 400 });
await client.messages.markAsInteracted(messageId, {
status: "approved",
});
return c.text("Approved");
});
消息交互将通过我们设置的 webhook 从 Knock 进行,确保这个过程完全异步。webhook 的恶意有效负载包括完整的消息,其中包括有关生成原始请求的用户的元数据,并保留了有关请求本身的详细信息(在我们的例子中包含了工具调用)。
import { getAgentByName, routeAgentRequest } from "agents";
import { Hono } from "hono";
const app = new Hono();
app.post("/incoming/knock/webhook", async (c) => {
const body = await c.req.json();
const env = c.env as Env;
// Find the user ID from the tool call for the calling user
const userId = body?.data?.actors[0];
if (!userId) {
return c.text("No user ID found", { status: 400 });
}
// Find the agent DO for the user
const existingAgent = await getAgentByName(env.AIAgent, userId);
if (existingAgent) {
// Route the request to the agent DO to process
const result = await existingAgent.handleIncomingWebhook(body);
return c.json(result);
} else {
return c.text("Not found", { status: 404 });
}
});
我们利用代理通过命名标识符寻址的能力,将请求从 Worker 路由到代理。在我们的例子中,就是 userId
。因为代理由一个 Durable 对象提供支持,所以从传入 Worker 请求到查找并恢复代理的过程非常简单。
恢复推迟的工具调用
然后,我们使用原始工具调用的上下文(传递给 Knock 并往返回给智能体),恢复工具执行并发放卡片。
export class AIAgent extends AIChatAgent {
// ... other methods
async handleIncomingWebhook(body: any) {
const { toolkit } = await initializeToolkit(this);
const deferredToolCall = toolkit.handleMessageInteraction(body);
if (!deferredToolCall) {
return { error: "No deferred tool call given" };
}
// If we received an "approved" status then we know the call was approved
// so we can resume the deferred tool call execution
if (result.interaction.status === "approved") {
const toolCallResult =
await toolkit.resumeToolExecution(result.toolCall);
const { response } = await generateText({
model: openai("gpt-4o-mini"),
prompt: `You were asked to issue a card for a customer. The card is now approved. The result was: ${JSON.stringify(toolCallResult)}.`,
});
const message = responseToAssistantMessage(
response.messages[0],
result.toolCall,
toolCallResult
);
// Save the message so that it's displayed to the user
this.persistMessages([...this.messages, message]);
}
return { status: "success" };
}
}
同样,这里发生了很多事情,所以我们来逐步了解重要的部分:
我们尝试通过
handleMessageInteraction
方法将正文(来自 Knock 的 webhook 有效负载)转换为延迟工具调用如果我们传递给之前交互调用的元数据状态为“approved”,那么我们通过
restoreToolExecution
方法恢复工具调用最后,我们从 LLM 生成一条消息并将其保留,以确保用户了解已批准的信用卡
完成最后一部分后,我们现在可以要求签发新卡,从代理发出批准请求,发送批准消息,并将这些批准路由回我们的代理进行处理。只需很少的代码,代理将异步处理我们的发卡请求,并为我们恢复延迟的工具调用。
防止重复批准
上述实现的一个问题是,如果有人点击批准按钮以上,我们很容易发放多张卡。为纠正这一问题,我们要跟踪正在发出的工具调用,并确保最多处理一次调用。
为了驱动这一点,我们利用 代理的内置 state,它可用于持久化信息,而无需访问其他持久性存储(如数据库或 Redis),尽管如果我们愿意,我们绝对可以这样做。我们可以通过工具 ID 跟踪工具调用,并在代理进程内捕获其当前状态。
type ToolCallStatus = "requested" | "approved" | "rejected";
export interface AgentState {
toolCalls: Record<string, ToolCallStatus>;
}
class AIAgent extends AIChatAgent<Env, AgentState> {
initialState: AgentState = {
toolCalls: {},
};
setToolCallStatus(toolCallId: string, status: ToolCallStatus) {
this.setState({
...this.state,
toolCalls: { ...this.state.toolCalls, [toolCallId]: status },
});
}
// ...
}
在这里,我们将工具调用的初始状态创建为空对象。我们还添加了一个快速设置辅助方法,以便更轻松地交互。
下一步,我们需要记录正在进行的工具调用。为此,我们可以使用 requireHumanInput
帮助程序中的 onAfterCallKnock 选项来捕获已为用户请求工具调用。
const { issueCard } = toolkit.requireHumanInput(
{ issueCard: issueCardTool },
{
// Keep track of the tool call state once it's been sent to Knock
onAfterCallKnock: async (toolCall) =>
agent.setToolCallStatus(toolCall.id, "requested"),
// ... as before
}
);
最后,我们需要在处理传入 webhook 时检查状态,并将工具调用标记为已批准(为简洁起见,省略了一些代码)。
export class AIAgent extends AIChatAgent {
async handleIncomingWebhook(body: any) {
const { toolkit } = await initializeToolkit(this);
const deferredToolCall = toolkit.handleMessageInteraction(body);
const toolCallId = result.toolCall.id;
// Make sure this is a tool call that can be processed
if (this.state.toolCalls[toolCallId] !== "requested") {
return { error: "Tool call is not requested" };
}
if (result.interaction.status === "approved") {
const toolCallResult = await toolkit.resumeToolExecution(result.toolCall);
this.setToolCallStatus(toolCallId, "approved");
// ... rest as before
}
}
}
总结
使用 Agents SDK 和 Knock,可轻松构建推迟工具调用的高级人机交互体验。
Knock 的工作流构建器和通知引擎为您提供构建块,为您的代理创建复杂的跨渠道消息传递。您能轻松创建升级流程,通过短信、推送、电子邮件或 Slack 发送消息,同时尊重用户的通知偏好。Knock 还可以让您对用户接收的消息拥有完全的可见性。
Agents SDK 下方的 Durable Object 抽象意味着我们获得了一个可全局寻址的代理进程,很容易让出并恢复到这个进程。利用 Durable Object 中的持久性存储,我们可以保留每个用户的完整聊天历史记录,以及在智能体进程中恢复智能体所需的任何其他状态(例如我们的工具调用)。最后,底层 Durable Object 的无服务器特性意味着我们能够水平扩展,毫不费力地支持大量用户。
如果您希望利用多人循环体验构建自己的 AI 智能体聊天体验,可在GitHub中找到本指南中的完整代码。