新規投稿のお知らせを受信されたい方は、サブスクリプションをご登録ください:

Cloudflare Workflowsが一般公開:プロダクションレディの耐久実行性

2025-04-07

5分で読了
この投稿はEnglish繁體中文FrançaisDeutsch한국어Español (Latinoamérica)Nederlands简体中文でも表示されます。

ベータ版はフィードバックやイテレーションに有用とはいえ、結局のところ、誰もが実験台になったり、ベータ版ソフトウェアに時折起きる厳しいエッジを許容できるわけではありません。時には、輝かしい「一般提供」のラベル(またはブログ記事)が必要になることもあるでしょう。そこでWorkflowsの出番です。

長期間実行されるマルチステップアプリケーション(別名「ステップ関数」)をWorkers上に構築するサーバーレスで耐久性のある実行エンジンであるWorkflowsは、現在GA(一般提供)版となっています。

要するに、プロダクションレディで利用できると言うことで、Workflowsが固定されてしまうという意味ではありません。当社は引き続き、Workflowsを拡張し(より多くの同時インスタンスを含む)、新しい機能(新しいwaitForEvent APIなど)を導入し、エージェントSDKとWorkflowsを使用してAIエージェントのより簡単な構築の実現を目指しています。

散文よりもコードが望ましい場合、Workflowsスタータープロジェクトをすぐにインストールし、単一のコマンドでコードとAPIの調査を開始することができます。

npm create cloudflare@latest workflows-starter -- 
--template="cloudflare/workflows-starter"

Workflowsの仕組みとは?この製品で何が構築できるか?WorkflowsとエージェントSDKを使ったAIエージェント構築についてどう考えるか?では、この記事をご覧ください。

Workflowsを使って構築する

Workflowsは、Cloudflare Workers上に構築された耐久性のある実行エンジンで、回復力のあるマルチステップアプリケーションを構築することができます。

その核となるWorkflowsは、ステップベースのアーキテクチャを実装しており、アプリケーションの各ステップを個別にリトライ処理することができ、ステップ間のステートは自動的に永続化されます。つまり、一時的なエラーやネットワークの問題によりあるステップが失敗した場合でも、Workflowsは、アプリケーション全体を最初から再起動することなく、そのステップだけを再試行できるのです。

Workflowを定義する際は、アプリケーションを以下のような論理的なステップに分割します。

  • 各ステップでコード(step.do)を実行するWorkflowをスリープ状態にする(step.sleepまたはstep.sleepUntil)イベントを待つ(step.waitForEvent

  • Workflowが実行されると、各ステップから返されたステートを自動的に保持するので、障害やハイバネーション期間が経過した後でも、アプリケーションが中断した場所から正確に続行できます。 

  • この耐久性のある実行モデルは、複数のシステム間で調整したり、データを順番に処理したり、数分、数時間、数日に及ぶような長期的なタスクを処理する必要があるアプリケーションにとって特に強力なものとなります。

Workflowsは、従来のステートレス機能が苦手とする複雑なビジネスプロセスの処理に特に役立ちます。

例えば、eコマースの注文処理ワークフローにおいて、在庫の確認、支払い方法の請求、確認メールの送信、データベースの更新などが、すべて別個のステップとして行われます。一時的な停止により支払処理ステップが失敗した場合、Workflowsは、在庫チェックを重複させたりプロセス全体を再起動したりすることなく、支払サービスが再び利用可能になった時にそのステップを自動的に再試行します。 

この仕組み:サービスへの各呼び出しはステップとしてモデル化することができ、個別にリトライ処理することが可能で、必要に応じてそのステップ以降を復元することができます。

import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';

// The params we expect when triggering this Workflow
type OrderParams = {
	orderId: string;
	customerId: string;
	items: Array<{ productId: string; quantity: number }>;
	paymentMethod: {
		type: string;
		id: string;
	};
};

// Our Workflow definition
export class OrderProcessingWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
	async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
		// Step 1: Check inventory
		const inventoryResult = await step.do('check-inventory', async () => {
			console.log(`Checking inventory for order ${event.payload.orderId}`);

			// Mock: In a real workflow, you'd query your inventory system
			const inventoryCheck = await this.env.INVENTORY_SERVICE.checkAvailability(event.payload.items);

			// Return inventory status as state for the next step
			return {
				inStock: true,
				reservationId: 'inv-123456',
				itemsChecked: event.payload.items.length,
			};
		});

		// Exit workflow if items aren't in stock
		if (!inventoryResult.inStock) {
			return { status: 'failed', reason: 'out-of-stock' };
		}

		// Step 2: Process payment
		// Configure specific retry logic for payment processing
		const paymentResult = await step.do(
			'process-payment',
			{
				retries: {
					limit: 3,
					delay: '30 seconds',
					backoff: 'exponential',
				},
				timeout: '2 minutes',
			},
			async () => {
				console.log(`Processing payment for order ${event.payload.orderId}`);

				// Mock: In a real workflow, you'd call your payment processor
				const paymentResponse = await this.env.PAYMENT_SERVICE.processPayment({
					customerId: event.payload.customerId,
					orderId: event.payload.orderId,
					amount: calculateTotal(event.payload.items),
					paymentMethodId: event.payload.paymentMethod.id,
				});

				// If payment failed, throw an error that will trigger retry logic
				if (paymentResponse.status !== 'success') {
					throw new Error(`Payment failed: ${paymentResponse.message}`);
				}

				// Return payment info as state for the next step
				return {
					transactionId: 'txn-789012',
					amount: 129.99,
					timestamp: new Date().toISOString(),
				};
			},
		);

		// Step 3: Send email confirmation
		await step.do('send-confirmation-email', async () => {
			console.log(`Sending confirmation email for order ${event.payload.orderId}`);
			console.log(`Including payment confirmation ${paymentResult.transactionId}`);
			return await this.env.EMAIL_SERVICE.sendOrderConfirmation({ ... })
		});

		// Step 4: Update database
		const dbResult = await step.do('update-database', async () => {
			console.log(`Updating database for order ${event.payload.orderId}`);
			await this.updateOrderStatus(...)

			return { dbUpdated: true };
		});

		// Return final workflow state
		return {
			orderId: event.payload.orderId,
			processedAt: new Date().toISOString(),
		};
	}
}

この耐久性、自動リトライ処理、ステートの永続性を備えたWorkflowsは、実際の障害に柔軟に対処できる信頼性の高い分散アプリケーションの構築に最適です。

ヒューマン・イン・ザ・ループ

Workflowsは単なるコード、しかし非常に強力:オンザフライで、ステップを動的に定義付け、条件によってブランチしたり、必要なシステムにAPI呼び出しを行うことができます。しかし、現実世界では、何かが起こるのを待つためにWorkflowも必要になる場合があります。

例:

  • 進歩するための人間からの承認。

  • Stripeの支払いやGitHubイベントなどの、受信webhook。

  • イベント通知をトリガーするR2へのファイルのアップロードなどのステート変更により、ワークフローへのファイルへの参照をプッシュして、ワークフローがファイルを処理できる(またはAIモデルで実行できる)ようにします。

Workflowsの新しいwaitForEvent APIを使用すると、まさにそれが可能になります。 

let event = await step.waitForEvent<IncomingStripeWebhook>("receive invoice paid webhook from Stripe", { type: "stripe-webhook", timeout: "1 hour" }) 

そうすれば、HTTPリクエストを行う外部サービスから特定のインスタンスにイベントを送信することができるようになります。

curl -d '{"transaction":"complete","id":"1234-6789"}' \
  -H "Authorization: Bearer ${CF_TOKEN}" \
\ "https://api.cloudflare.com/client/v4/accounts/{account_id}/workflows/{workflow_name}/instances/{instance_id}/events/{event_type}"

またはWorker自体内にあるWorkers API経由する方法:

interface Env {
  MY_WORKFLOW: Workflow;
}

interface Payload {
  transaction: string;
  id: string;
}

export default {
  async fetch(req: Request, env: Env) {
    const instanceId = new URL(req.url).searchParams.get("instanceId")
    const webhookPayload = await req.json<Payload>()

    let instance = await env.MY_WORKFLOW.get(instanceId);
    // Send our event, with `type` matching the event type defined in
    // our step.waitForEvent call
    await instance.sendEvent({type: "stripe-webhook", payload: webhookPayload})
    
    return Response.json({
      status: await instance.status(),
    });
  },
};

typeパラメーターを使用して、複数のイベントを待機したり、Promise.raceを使用して、最初に受信したイベントに応じて複数のイベントを続行することもできます。

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
	async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
		let state = await step.do("get some data", () => { /* step call here */ })
		// Race the events, resolving the Promise based on which event
// we receive first
		let value = Promise.race([
step.waitForEvent("payment success", { type: "payment-success-webhook", timeout: "4 hours" ),
step.waitForEvent("payment failure", { type: "payment-failure-webhook", timeout: "4 hours" ),
])
// Continue on based on the value and event received
	}
}

waitForEventをもう少し詳細に可視化するために、ここで、GitHubリポジトリを監視するコードレビューエージェントによってトリガーされるWorkflowがあると仮定してみましょう。

イベントを待つ能力がなければ、Workflowは、提案を返す(あるいは独自のPRを提出する)ことについて、人間の承認を容易に得ることができません。更新されたステートをポーリングする可能性はありますが、これをする場合、任意の期間、step.sleepを呼び出し、ストレージサービスをポーリングして更新された値を取得し、存在しない場合はこれを繰り返す必要があります。この場合コードが多いのでエラーの可能性が高くなります。

waitForEventなしでは、実行中のWorkflowインスタンスへのデータ送信が困難

同じ例を変更して、新しいwaitForEvent APIを組み込むと、変更を加える前に、このAPIを使用して人間の承認を待つことができます。 

コードレビューワークフローにwaitForEventを追加し、明示的な承認を求めることができます。

AIエージェントが人間の代わりに送信したり、行動したりすることを想定できます。waitForEventは、単にWorkflowが継続する(または継続しない)前に世界の何かが変化するのを取得し、一時停止する方法を公開するに過ぎません。

重要なのは、Workflowsの他のステップと同じように、waitForEventを呼び出すことができることです。条件付き、複数回、およびループの中で呼び出すことができます。WorkflowsはまさにWorkers:プログラミング言語のフルパワーを持ち、またドメイン固有言語(DSL)や設定言語に制限されることはありません。

価格設定

耳寄りな話:オリジナルベータ版の発表からほとんど変更されていません!Workflowsによって保存されるステートのストレージ価格を追加し、CPUベースとリクエスト(呼び出し)ベースの価格を以下のように維持します。

単位

Workers 無料

Workers 有料

CPU時間(ミリ秒)

1つのWorkflowにつき10ミリ秒

月々3,000万ミリ秒のCPUコストを含む

CPU時間100万ミリ秒につき$0.02追加

リクエスト

1日あたり100,000件のワークフロー呼び出し(Workersと共有

1か月あたり、1,000万込み

追加100万ごとに、+$0.30

ストレージ料金(GB)

1 GB

1GB/月を含む $0.20/月追加

ストレージ料金が新しいため、2025年9月15日まではストレージに対する積極的な請求は行いません。当社は、ストレージに課金する前に、基本料金に含まれる1GBの制限を超えたユーザーに対して通知を行います。Workflowsは、既定で、3日(Freeプラン)または30日(有料プラン)経過後に、保存ステートが失効します。

「CPU時間」とは何かということですが、これはWorkflowが積極的にコンピューティングリソースを消費している時間です。それにはAPI呼び出しの待ち時間、LLMの推論、その他のI/O(データベースへの書き込みなど)は含まれません。これは小さいことに思えるかもしれませんが、実際積み重なると膨大になります。ほとんどのアプリケーションは、1桁ミリ秒のCPU時間と数秒の経過実時間ですが、応答するのに100~250ミリ秒かかるAPIが1つか2つでもあれば、それは膨大な量になってしまいます!

Workflowの課金は、アイドル中や待機中に費やされる時間ではなく、CPUに対して請求されます。

Workflowエンジンは、特に待機時間が長くなり、オブジェクトストレージ(Cloudflare R2など)からデータを読み取る、o3-miniやClaude 3.7のようなサードパーティのAPIやLLMを呼び出す、D1、Postgres、MySQLのようなデータベースに問い合わせるといったことも行います。Workflowsで、Workersの様に:アプリケーションが待機している時間は課金されません。

構築を開始する

Workflowsとそのの仕組みを十分に把握したうえで、構築しましょう。そしてその次は?

  1. Workflowsの仕組み、Workflows APIの理解、ベストプラクティスについては、Workflowsのドキュメントをご覧ください

  2. スタータープロジェクトのコードを確認する

  3. 最後に、数回クリックするだけで、スターターをCloudflareアカウントにデプロイできます。

Cloudflareへデプロイ

Cloudflareは企業ネットワーク全体を保護し、お客様がインターネット規模のアプリケーションを効率的に構築し、あらゆるWebサイトやインターネットアプリケーションを高速化し、DDoS攻撃を退けハッカーの侵入を防ぎゼロトラスト導入を推進できるようお手伝いしています。

ご使用のデバイスから1.1.1.1 にアクセスし、インターネットを高速化し安全性を高めるCloudflareの無料アプリをご利用ください。

より良いインターネットの構築支援という当社の使命について、詳しくはこちらをご覧ください。新たなキャリアの方向性を模索中の方は、当社の求人情報をご覧ください。
Developer WeekCloudflare WorkersWorkflows開発者プラットフォーム

Xでフォロー

Sid Chatterjee|@chatsidhartha
Matt Silverlock|@elithrar
Cloudflare|@cloudflare

関連ブログ投稿