订阅以接收新文章的通知:

基于 MQTT 的可编程消息传递

2022/05/12

9 分钟阅读时间
Announcing Pub/Sub: Programmable MQTT-based Messaging

推动 Platform Week 的基本问题之一是 “我们如何让开发人员能够在 Cloudflare 上构建全栈应用程序?”。将 Workers 作为一个无服务器环境,用于轻松部署默认分布式应用程序,将 KVDurable Object 用于缓存和协调,以及将 R2 作为我们的零输出成本对象存储,在此基础上,我们继续讨论还需要构建什么,才能帮助开发人员构建新应用程序以及/或将现有应用程序带到 Cloudflare 的开发人员平台。

基于此,我们很高兴地推出 Cloudflare Pub/Sub 的私人测试版,这是一个可编程消息中枢,建立在当今数千万现有设备支持的普遍和行业标准的 MQTT 协议基础上。

简而言之,Pub/Sub 可让您:

  • 从任何具有 MQTT 功能的客户端(以及在未来,还可以从其他面向客户端的协议)发布事件、遥测或传感器数据
  • 编写代码,在使用 Cloudflare Workers 将消息发布到代理时可以过滤、聚合和/或修改消息,且在分发给订阅者之前,无需将消息传送到单个 “云区域”
  • 从其他云中的应用程序或从本地推送事件,使用 Pub/Sub 作为可编程事件路由器或作为到持久数据存储(例如 R2 或 KV)的挂钩
  • 在客户端上推送更新可能很困难(或有风险!),或者在客户端设备上运行代码会增加材料成本(CPU、内存),因此,将逻辑移出客户端,同时仍保持尽可能低的延迟(您的代码在每个位置运行)。

而且可能有一长串我们甚至还没有预测到的事情。我们已经看到开发人员在 Cloudflare Workers 之上构建了令人难以置信的产品,我们也很期待他们利用 Pub/Sub 之类可编程消息中枢的强大功能能够构建出什么。

为什么选择 MQTT 以及什么是 MQTT?

如果您以前没有听说过 MQTT,您可能会惊讶地发现它是如今最为普遍部署的 “消息传递协议” 之一。如今,有数千万(至少!)设备使用 MQTT,从连接的支付终端到自动驾驶汽车、手机,甚至视频游戏,都囊括在内。传感器读数、遥测、金融交易以及/或移动通知和消息都是 MQTT 的常见用例,协议的灵活性允许开发人员围绕其用例特定的可靠性、主题层次结构和持久性进行权衡。

我们选择 MQTT 作为 Cloudflare Pub/Sub 的基础,因为我们认为要建立在开放、可访问的标准之上,就像我们选择 Service Worker API 作为 Workers 的基础时所做的那样,加上我们不久前宣布参与 Common API 项目。我们还希望为现有客户提供一条从 Cloudflare 的规模和可编程性中受益的简单途径,以及为开发人员提供以他们目前熟悉的语言编写的客户端库生态系统。

除此之外,我们还认为 MQTT 满足现代 “发布-订阅” 消息服务的需求。它具有灵活的交付保证、用于传输加密的 TLS(没有定制加密!)、可扩展的主题创建和订阅模型、可扩展的每条消息元数据,重要的是,它为错误消息提供了一个定义明确的规范。

考虑到这一点,我们希望为 Pub/Sub 提供更多“入口”:MQTT 的许多最佳部分都源自想要通过 HTTP 或 WebSockets 与我们交谈的客户。

构建基块

如果能够编写代码来作用于发布到 Pub/Sub 代理的每条消息,那么它在实践中会是什么样子?

这是一个直接在 Worker 中处理 Pub/Sub 消息的简单说明性示例。我们有客户端(在本例中为支付终端)报告交易数据,我们希望捕获每个区域处理的交易数量,以便我们可以跟踪一段时间内的交易量。

具体而言,我们:

  1. 过滤我们所关心的消息的特定主题前缀
  2. 将特定键值对的消息解析为指标
  3. 将该指标直接写入我们新的无服务器时间序列分析服务 Workers Analytics Engine,以便我们可以直接使用 GraphQL 查询它。

这让我们不必维护外部指标服务、配置另一个云服务或考虑如何扩展:我们可以直接在 Cloudflare 上完成所有这些工作。


async function pubsub(
  messages: Array<PubSubMessage>,
  env: any,
  ctx: ExecutionContext
): Promise<Array<PubSubMessage>> {
  
  for (let msg of messages) {
    // Extract a value from the payload and write it to Analytics Engine
    // In this example, a transactionsProcessed counter that our clients are sending
    // back to us.
    if (msg.topic.startsWith(“/transactions/”)) {
      // This is non-blocking, and doesn’t hold up our message
      // processing.
      env.TELEMETRY.writeDataPoint({
        // We label this metric so that we can query against these labels
        labels: [`${msg.broker}.${msg.namespace}`, msg.payload.region, msg.payload.merchantId],
        metrics: [msg.payload.transactionsProcessed ?? 0]
      });
    }
  }

  // Return our messages back to the Broker
  return messages;
}

const worker = {
  async fetch(req: Request, env: any, ctx: ExecutionContext) {
    // Critical: you must validate the incoming request is from your Broker
    // In the future, Workers will be able to do this on your behalf for Workers
    // in the same account as your Pub/Sub Broker.
    if (await isValidBrokerRequest(req)) {

      // Parse the incoming PubSub messages
      let incomingMessages: Array<PubSubMessage> = await req.json();
      
      // Pass the message to our pubsub handler, and capture the returned
      // messages
      let outgoingMessages = await pubsub(incomingMessages, env, ctx);

      // Re-serialize the messages and return a HTTP 200 so our Broker
      // knows we’ve successfully handled them
      return new Response(JSON.stringify(outgoingMessages), { status: 200 });
    }

    return new Response("not a valid Broker request", { status: 403 });
  },
};

export default worker;

然后,我们可以使用熟悉的语言——SQL,直接查询这些指标。我们的查询采用了我们编写的指标,并提供了由我们的支付设备处理的交易明细,按商户分组(同样,都在 Cloudflare 上):

SELECT
  label_2 as region,
  label_3 as merchantId,
  sum(metric_1) as total_transactions
FROM TELEMETRY
WHERE
  metric_1 > 0
  AND timestamp >= now() - 604800
GROUP BY
  region,
  merchantId
ORDER BY
  total_transactions DESC
LIMIT 10

您可以使用任意数量的示例替换或增加对 Analytics Engine 的调用:

  • (使用 ctx.waitUntil)异步地将有关特定主题的消息写入我们的 R2 对象存储中,而不阻断消息传递
  • 在消息推送给订阅者之前,用 从 KV 填充的数据即时重写消息。
  • 根据消息的有效负载聚合消息并将它们 HTTP POST 到托管在 Cloudflare 之外的旧式基础设施

Pub/Sub 为您提供了一种将数据导入 Cloudflare 网络,过滤、聚合和/或改变数据,然后将其推送回订阅者的方法——无论收听该主题的订阅者是 10 个、1,000 个还是 10,000 个。

我们的目标是什么?

正如我们常说的:我们才刚刚开始。Pub/Sub 的私测版只是我们旅程的开始,我们已经在着手开发一长串功能。

至关重要的是,我们的首要任务之一是尽可能多地涵盖 MQTT v5.0 规范,以便客户可以迁移现有部署并使其 “正常工作”。可让您在多个订阅者之间平衡消息负载的共享订阅等有用功能、用于聚合用例的通配符订阅(单层和多层)、更强大的交付保证 (QoS) 以及对其他身份验证模式(特别是 Mutual TLS)的支持,是我们正在处理的工作的一部分。

除此之外,我们注重于确保为 Pub/Sub 开发人员提供最佳体验,在测试期间,我们将:

  • 在我们的开发人员 CLI wrangler2 中支持一套新的 “pubsub” 子命令,以便尽可能地减少入门的难度
  • 构建“本机”绑定(类似于 Workers KV 的操作方式),可让您直接从 Worker 代码发布消息和订阅主题,无论消息是否来自(或发往)Cloudflare 之外的客户端
  • 探索更多非 MQTT 客户端的发布和订阅方式,包括 HTTP 请求和 WebSockets,以便更轻松地集成现有代码。

我们的开发者文档将在我们推出时涵盖这些功能。

我们也意识到定价是开发人员体验的重要组成部分,并致力于确保提供可访问且灵活的免费层。我们希望让开发人员能够进行实验、制作原型并解决我们尚未想到的问题。我们将在测试期间分享更多关于定价的信息。

开始使用

如果您想开始使用 Pub/Sub,请注册私测版:我们计划在下个月开始授予访问权限。我们期待着从开发人员处收集反馈并看看他们会构建些什么。
同时,查看全新的 Pub/Sub 开发人员文档,了解 Pub/Sub 在后台的工作情况、MQTT 协议以及它如何与 Cloudflare Workers 集成。

我们保护整个企业网络,帮助客户高效构建互联网规模的应用程序,加速任何网站或互联网应用程序抵御 DDoS 攻击,防止黑客入侵,并能协助您实现 Zero Trust 的过程

从任何设备访问 1.1.1.1,以开始使用我们的免费应用程序,帮助您更快、更安全地访问互联网。要进一步了解我们帮助构建更美好互联网的使命,请从这里开始。如果您正在寻找新的职业方向,请查看我们的空缺职位
Platform Week (CN)Product News (CN)简体中文

在 X 上关注

Matt Silverlock|@elithrar
Cloudflare|@cloudflare

相关帖子