Announcing Pub/Sub: Programmable MQTT-based Messaging

Una de las preguntas latentes que promueve nuestra Platform Week es "¿cómo permitimos a los desarrolladores crear aplicaciones integrales en Cloudflare?". Con Workers, nuestro entorno sin servidor que permite implementar con facilidad aplicaciones distribuidas por defecto, KV y Durable Objects, para el almacenamiento en caché y la coordinación, y R2, nuestro almacén de objetos sin coste de salida, hemos seguido analizando qué más necesitamos crear para ayudar a los desarrolladores a diseñar nuevas aplicaciones y acercar las existentes a la Plataforma para desarrolladores de Cloudflare.

Con este objetivo, nos complace anunciar la versión beta privada de Cloudflare Pub/Sub, un bus de mensajes programable que hemos desarrollado sobre el protocolo MQTT generalizado, un estándar del sector que es compatible con decenas de millones de dispositivos existentes en la actualidad.

En pocas palabras, Pub/Sub te permite:

  • Publicar datos de eventos, telemetría o sensores desde cualquier cliente con capacidad para MQTT (y en el futuro, otros protocolos orientados al usuario).
  • Escribir código que puede filtrar, añadir y/o modificar los mensajes conforme se publican en el agente utilizando Cloudflare Workers, y antes de que se distribuyan a los suscriptores, sin necesidad de enviar los mensajes a una única "región de la nube".
  • Desencadenar eventos desde aplicaciones en otras nubes, o desde las propias con Pub/Sub, que actuará como un enrutador de eventos programable o un enlace en el almacenamiento de datos persistentes (como R2 o KV).
  • Trasladar la lógica fuera del cliente, donde puede ser difícil (¡o arriesgado!) insertar actualizaciones, o donde la ejecución del código en los dispositivos aumenta el coste de los materiales (CPU, memoria), manteniendo la latencia lo más baja posible (tu código se ejecuta en cada lugar).

Y probablemente haya una larga lista de cosas que aún no hemos previsto. Hemos visto a desarrolladores crear cosas increíbles utilizando Cloudflare Workers, y estamos deseando ver lo que desarrollarán también con la eficacia de un bus de mensajes programable como Pub/Sub.

¿Por qué y qué es MQTT?

Si no has oído hablar antes de MQTT, quizá te sorprenda saber que es uno de los "protocolos de mensajería" más generalizados en la actualidad. Hay decenas de millones (¡al menos!) de dispositivos que hablan MQTT hoy en día, desde terminales de pago conectados hasta vehículos autónomos, teléfonos móviles e incluso videojuegos. Las lecturas de los sensores, la telemetría, las transacciones financieras o las notificaciones de los móviles son todos casos de uso comunes para MQTT, y la flexibilidad del protocolo permite a los desarrolladores equilibrar la fiabilidad, la jerarquía de temas y la persistencia específicas para su caso de uso.

Elegimos MQTT como base para Cloudflare Pub/Sub porque creemos en construir sobre estándares abiertos y accesibles, como hicimos cuando elegimos la API de Service Worker como base para Workers, y con nuestra participación recientemente anunciada en el proyecto Common API. También queríamos permitir a nuestros clientes un camino fácil para beneficiarse de la escala y la programación de Cloudflare, y garantizar que los desarrolladores dispongan de un ecosistema enriquecido de bibliotecas de clientes en lenguajes con los que están familiarizados actualmente.

Sin embargo, más allá de eso, también pensamos que MQTT satisface las necesidades de un moderno servicio de mensajería "publicar-suscribir". Tiene garantías de entrega flexibles, TLS para el cifrado del transporte (¡sin criptografía a medida!), un modelo escalable de creación de temas y suscripción, metadatos extensibles por mensaje y, lo que es más importante, proporciona una especificación bien definida con mensajes de error claros.

Teniendo esto en cuenta, esperamos dar soporte a muchas más "puntos de acceso" a Pub/Sub: muchas de las mejores partes de MQTT pueden abstraerse de los clientes que quieran hablar con nosotros a través de HTTP o WebSockets.

Elementos esenciales

Dada la posibilidad de escribir código que actúe sobre cada mensaje publicado en un agente Pub/Sub, ¿qué significa en la práctica?

A continuación, te mostramos un ejemplo sencillo pero ilustrativo de la gestión de mensajes Pub/Sub directamente en un Worker. Tenemos clientes (en este caso, terminales de pago) que informan de los datos de las transacciones, y queremos capturar el número de transacciones procesadas en cada región para poder hacer un seguimiento del volumen de transacciones conforme avanza el tiempo.

En concreto, nosotros:

  1. Filtramos los mensajes que nos interesan en un prefijo de tema específico.
  2. Analizamos el mensaje para un par clave:valor específico como una métrica.
  3. Escribimos esa métrica directamente en Analytics Engine de Workers, nuestro nuevo servicio de análisis de series temporales sin servidor, para que podamos consultarlo directamente con GraphQL.

Esto nos evita tener que montar y mantener un servicio de métricas externo, configurar otro servicio en la nube o pensar en cómo escalará. Podemos hacerlo todo directamente en Cloudflare.

# language: TypeScript

async function pubsub(
  messages: Array<PubSubMessage>,
  env: any,
  ctx: ExecutionContext
): Promise<Array<PubSubMessage>> {
  
  for (let msg of messages) {
    // Extract a value from the payload and write it to Analytics Engine
    // In this example, a transactionsProcessed counter that our clients are sending
    // back to us.
    if (msg.topic.startsWith(“/transactions/”)) {
      // This is non-blocking, and doesn’t hold up our message
      // processing.
      env.TELEMETRY.writeDataPoint({
        // We label this metric so that we can query against these labels
        labels: [`${msg.broker}.${msg.namespace}`, msg.payload.region, msg.payload.merchantId],
        metrics: [msg.payload.transactionsProcessed ?? 0]
      });
    }
  }

  // Return our messages back to the Broker
  return messages;
}

const worker = {
  async fetch(req: Request, env: any, ctx: ExecutionContext) {
    // Critical: you must validate the incoming request is from your Broker
    // In the future, Workers will be able to do this on your behalf for Workers
    // in the same account as your Pub/Sub Broker.
    if (await isValidBrokerRequest(req)) {

      // Parse the incoming PubSub messages
      let incomingMessages: Array<PubSubMessage> = await req.json();
      
      // Pass the message to our pubsub handler, and capture the returned
      // messages
      let outgoingMessages = await pubsub(incomingMessages, env, ctx);

      // Re-serialize the messages and return a HTTP 200 so our Broker
      // knows we’ve successfully handled them
      return new Response(JSON.stringify(outgoingMessages), { status: 200 });
    }

    return new Response("not a valid Broker request", { status: 403 });
  },
};

export default worker;

A continuación, podemos consultar estas métricas directamente utilizando un lenguaje conocido: SQL. Nuestra consulta toma las métricas que hemos escrito y nos da un desglose de las transacciones procesadas por nuestros dispositivos de pago, agrupadas por comerciante (y de nuevo, todo en Cloudflare):

SELECT
  label_2 as region,
  label_3 as merchantId,
  sum(metric_1) as total_transactions
FROM TELEMETRY
WHERE
  metric_1 > 0
  AND timestamp >= now() - 604800
GROUP BY
  region,
  merchantId
ORDER BY
  total_transactions DESC
LIMIT 10

Podrías sustituir o aumentar las llamadas a Analytics Engine con una serie ejemplos:

  • Escribe mensajes de forma asíncrona (utilizando ctx.waitUntil) sobre temas específicos en nuestro almacenamiento de objetos R2 sin bloquear la entrega de mensajes.
  • Reescribe mensajes de manera instantánea con datos completados a partir de KV antes de enviar el mensaje a los suscriptores.
  • Añade los mensajes en función de su carga útil y envíalos por HTTP POST a la infraestructura heredada alojada fuera de Cloudflare.

Pub/Sub te ofrece una forma de introducir datos en la red de Cloudflare, filtrarlos, añadirlos y/o transformarlos, y devolverlos a los suscriptores, ya sean 10, 1 000 o 10 000 los que escuchan sobre ese tema.

¿Y nuestro rumbo?

Como nos gusta decir a menudo, eso es solo el principio. La versión beta privada de Pub/Sub es solo el comienzo de nuestro recorrido, y tenemos una larga lista de funciones es en las que ya estamos trabajando.

Una de nuestras prioridades es incluir la mayor parte de la especificación MQTT v5.0 que podamos, para que los clientes puedan migrar las implementaciones existentes y que "simplemente funcionen". Actualmente estamos trabajando en funciones útiles como suscripciones compartidas que te permiten equilibrar la carga de mensajes entre muchos suscriptores, suscripciones comodín (tanto de uno como de varios niveles) para casos de uso de agregación, garantías de entrega más sólidas ( QoS), y compatibilidad para modos de autenticación adicionales (concretamente, TLS mutuo).

Aparte, estamos centrados en asegurarnos de que la experiencia de los desarrolladores de Pub/Sub sea la mejor posible, y durante la versión beta, nuestra prioridad será:

  • Admitir un nuevo conjunto de subcomandos "pubsub" en Wrangler, nuestra CLI para desarrolladores, para que empezar sea lo menos complicado posible.
  • Crear enlaces "nativos" (similares a cómo funciona Workers KV) que te permitan publicar mensajes y suscribirte a temas directamente desde el código de Worker, independientemente de si el mensaje se origina en (o está destinado a) un cliente más allá de Cloudflare.
  • Explorar más formas de publicar y suscribirse desde clientes no basados en MQTT, incluidas solicitudes HTTP y WebSockets, para que la integración del código existente sea aún más fácil.

Nuestra documentación para desarrolladores incluirá estas funciones a medida que las vayamos consiguiendo.

También somos conscientes de que el precio es una parte importante de la experiencia de los desarrolladores, y nos comprometemos a garantizar que haya un nivel gratuito accesible y flexible. Queremos que los desarrolladores puedan experimentar, crear prototipos y resolver problemas que aún no se nos han ocurrido. Compartiremos más información sobre los precios durante el transcurso de la versión beta.

Primeros pasos

Si quieres empezar a utilizar Pub/Sub, apúntate a la versión beta privada. Nuestra idea es comenzar a dar acceso durante el próximo mes. Estamos deseando recabar los comentarios de los desarrolladores y ver lo que empiezan a desarrollar.
Mientras tanto, revisa la nueva documentación para desarrolladores de Pub/Sub para entender cómo funciona Pub/Sub en profundidad, el protocolo MQTT y cómo se integra con Cloudflare Workers.