Announcing Pub/Sub: Programmable MQTT-based Messaging

Platform Weekを推進する根本的な疑問の1つは、「開発者が Cloudflare上でフルスタックアプリケーションを構築できるようにするにはどうすればよいか」ということです。Workersはデフォルトで分散されたアプリケーションを簡単にデプロイするためのサーバーレス環境として、KVDurable Objectsはキャッシュと調整のため、R2はゼロエグレスコストオブジェクトストアとして、開発者が新しいアプリケーションを構築したり、既存のアプリケーションをCloudflareの開発者プラットフォームに取り込んだりするために、他に何を構築する必要があるかについて議論し続けてきました。

このことを念頭に置き、Cloudflare Pub/Subは、現在何千万もの既存デバイスでサポートされているユビキタスで、業界標準のMQTTプロトコルをベースに構築されたプログラマブルメッセージバスであり、私たちはこのプライベートベータを発表できることを大変嬉しく思っています。

一言で言えば、Pub/Subでは以下のようなことが可能です。

  • MQTT対応クライアント(将来的には他のクライアント向けプロトコル)からのイベント、テレメトリー、センサーデータを発行します
  • Cloudflare Workers を使ってブローカーに発行され、サブスクライバーに配信される前に、メッセージを単一の「クラウドリージョン」に転送する必要なく、メッセージをフィルタリング、集約、修正できるコードを記述ができます
  • Pub/Subはプログラム可能なイベントルーターとして、またR2やKVといった永続的なデータストレージへのフックとして機能し、他のクラウド上のアプリケーションやオンプレミスからイベントをプッシュします
  • アップデートをプッシュすることが難しく(あるいはリスクが高く!)、デバイス上でコードを実行すると材料費(CPU、メモリ)が高くなりますが、クライアントからロジックを移動させることで、レイテンシーができるだけ低く保たれます(あなたのコードは すべての場所で実行されます)。

そして、私たちがまだ予測していないことが、数多くありそうです。私たちは、開発者がCloudflare Workersの上に信じられないほどのものを構築するのを見てきましたが、Pub/Subのようなプログラム可能なメッセージバスの力で、彼らがどんなものを構築するのかを見ることにも興奮しています。

なぜMQTTなのか?そしてMQTTとは何か?

MQTTを聞いたことがない人は、それが今日展開されている最も普及した「メッセージングプロトコル」の1つであることを知って驚くかもしれません。今日、接続された決済端末から自律走行車、携帯電話、さらにはビデオゲームまで、MQTTを話すデバイスは数千万台(少なくとも!)にものぼります。センサー読み取り、テレメトリー、金融取引、モバイル通知やメッセージなどはすべてMQTTの一般的な使用例で、プロトコルの柔軟性により、開発者はそれらの使用例に応じて信頼性、トピック階層、永続性などのトレードオフを行うことができます。

Cloudflare Pub/Subの基盤としてMQTTを選択したのは、Workersの基盤としてService Worker APIを選んだときや、最近発表したCommon APIプロジェクトへの参加と同様に、オープンでアクセス可能な標準の上に構築することが重要だと考えているからです。また、既存のクライアントがCloudflareのスケールとプログラマビリティの恩恵を簡単に受けられるようにし、開発者が現在慣れ親しんでいる言語で、クライアントライブラリの豊富なエコシステムを持てるようにしたいと思いました。

しかしそれ以上に、MQTTは最新の「パブリッシュ・サブスクライブ」メッセージングサービスのニーズを満たしているとも考えています。柔軟な配信保証、トランスポート暗号化のためのTLS(特注の暗号は不要!)、スケーラブルなトピック作成とサブスクリプションモデル、拡張可能なメッセージごとのメタデータがあり、そして重要なのは、明確なエラーメッセージとともに、明確に定義された仕様を提供していることです。

このことを念頭に置いて、私たちはPub/Subへのより多くの「オンランプ」をサポートするつもりです。MQTTの優れた部分の多くは、HTTPやWebSocketを介して私たちと通信することを望むクライアントから、抽象化して切り離すことが可能です。

ビルディングブロック

Pub/Sub Brokerに発行されるすべてのメッセージに作用するコードを書くことができるとすると、実際にはどのようになるのでしょうか。

ここでは、Workerで直接Pub/Subメッセージを処理する、シンプルでわかりやすい例を示します。クライアント(この場合は決済端末)が取引データを報告しており、私たちは各地域で処理された取引数を取得し、取引量を時系列で追跡できるようにしたいのです。

具体的には、以下の通りです。

  1. 気になるメッセージを、特定のトピックプレフィックスでフィルタリングします
  2. 特定のkey:valueペアのメッセージを、メトリックとして解析します
  3. そのメトリックを、サーバーレスの新しい時系列分析サービスであるWorkers Analytics Engineに直接書き込んで、GraphQLで直接クエリできるようにします

これにより、外部のメトリクスサービスを立ち上げて保守したり、別のクラウドサービスを設定したり、どのように拡張するかを考えたりする必要がなく、すべてCloudflare上で直接行うことができます。


async function pubsub(
  messages: Array<PubSubMessage>,
  env: any,
  ctx: ExecutionContext
): Promise<Array<PubSubMessage>> {
  
  for (let msg of messages) {
    // Extract a value from the payload and write it to Analytics Engine
    // In this example, a transactionsProcessed counter that our clients are sending
    // back to us.
    if (msg.topic.startsWith(“/transactions/”)) {
      // This is non-blocking, and doesn’t hold up our message
      // processing.
      env.TELEMETRY.writeDataPoint({
        // We label this metric so that we can query against these labels
        labels: [`${msg.broker}.${msg.namespace}`, msg.payload.region, msg.payload.merchantId],
        metrics: [msg.payload.transactionsProcessed ?? 0]
      });
    }
  }

  // Return our messages back to the Broker
  return messages;
}

const worker = {
  async fetch(req: Request, env: any, ctx: ExecutionContext) {
    // Critical: you must validate the incoming request is from your Broker
    // In the future, Workers will be able to do this on your behalf for Workers
    // in the same account as your Pub/Sub Broker.
    if (await isValidBrokerRequest(req)) {

      // Parse the incoming PubSub messages
      let incomingMessages: Array<PubSubMessage> = await req.json();
      
      // Pass the message to our pubsub handler, and capture the returned
      // messages
      let outgoingMessages = await pubsub(incomingMessages, env, ctx);

      // Re-serialize the messages and return a HTTP 200 so our Broker
      // knows we’ve successfully handled them
      return new Response(JSON.stringify(outgoingMessages), { status: 200 });
    }

    return new Response("not a valid Broker request", { status: 403 });
  },
};

export default worker;

そしてこれらのメトリクスを、使い慣れた言語であるSQLを使って直接問い合わせることができます。このクエリでは作成したメトリクスを使用して、決済デバイスで処理されたトランザクションの内訳を、マーチャントごとにグループ化して表示します(繰り返しますが、すべてCloudflareで処理されます)。

SELECT
  label_2 as region,
  label_3 as merchantId,
  sum(metric_1) as total_transactions
FROM TELEMETRY
WHERE
  metric_1 > 0
  AND timestamp >= now() - 604800
GROUP BY
  region,
  merchantId
ORDER BY
  total_transactions DESC
LIMIT 10

Analytics Engineの呼び出しを、任意の数の例で置き換えたり増やしたりすることができます。

  • メッセージ配信をブロックすることなく、特定のトピックに関するメッセージを、R2オブジェクトストレージに非同期に書き込みます(ctx.waitUntilを使用)。
  • メッセージがサブスクライバーにプッシュされる前に、KVから入力されたデータを使用してその場でメッセージを書き換えます
  • ペイロードに基づいてメッセージを集約し、Cloudflareの外部にホストされているレガシーインフラにHTTP POSTします

Pub/Subは、データをCloudflareのネットワークに取り込み、フィルタリング、集約、変異させ、サブスクライバー(そのトピックをリッスンしているサブスクライバーが10人でも1000人でも1万人でも)にプッシュバックする方法を提供します。

今後の私たち

よく言われるように、私たちはまだスタートしたばかりです。Pub/Subのプライベートベータは、道のりの始まりに過ぎず、私たちにはすでに取り組んでいる数多くの機能があります。

重要なのは、MQTT v5.0仕様をできるだけ多くカバーし、お客様が既存のデプロイメントを移行して「そのまま使える」ようにすることであり、それが私たちの優先事項の 1 つなのです。多数のサブスクライバー間でメッセージの負荷分散ができるようにする共有サブスクリプション、集約ユースケースのワイルドカードサブスクリプション(単層と多層の両方)、より強力な配信保証(QoS)、追加の認証モード(特にMutual TLS)のサポートなどの便利な機能は、私たちが取り組んでいるものの一部に過ぎません。

それ以上に、Pub/Subの開発者体験を最高のものにすることに注力しており、ベータ期間中、私たちは以下のことを行います。

  • 開発者用CLIであるwrangler2で、新しい「pubsub」サブコマンドのセットをサポートし、できるだけ摩擦を少なくして使い始められるようにします。
  • メッセージの発信元がCloudflare以外のクライアントであるかどうかに関係なく、Workerコードから直接メッセージを発行したりトピックを購読したりできる「ネイティブ」バインディング(Workers KVの操作方法と同様)を構築します。
  • HTTPリクエストやWebSocketなど、MQTTベースではないクライアントから発行・購読する方法をさらに検討し、既存のコードとの統合をさらに容易にします。

開発者向けドキュメントでは、これらの機能を順次紹介していく予定です。

私たちはまた、価格設定は開発者経験の大きな部分を占めると認識しており、アクセス可能で柔軟な無料枠の確保に全力を注いでいます。開発者が実験し、プロトタイプを作成し、私たちがまだ考えつかないような問題を解決できるようにしたいと考えています。価格については、ベータ期間中に詳しくお知らせする予定です。

利用をはじめる

Pub/Subの利用を開始したい方は、プライベートベータにサインアップしてください:来月中にアクセスできるようにする予定です。私たちは開発者からのフィードバックを収集し、メンバーがどのようなものを作り始めるか見届けることを楽しみにしています。
その間、最新のPub/Sub開発者向けドキュメントをご覧になり、Pub/Subがどのように機能するか、MQTTプロトコル、Cloudflare Workersとの統合方法について理解していただければと思います。