このコンテンツは自動機械翻訳サービスによる翻訳版であり、皆さまの便宜のために提供しています。原本の英語版と異なる誤り、省略、解釈の微妙な違いが含まれる場合があります。ご不明な点がある場合は、英語版原本をご確認ください。
開発者はすでにCloudflare Workflowsを使用して、長時間実行されるマルチステップアプリケーションをWorkers上で構築することができます。今回、Python Workflowsが登場しました。つまり、選択した言語を使用して、マルチステップのアプリケーションをオーケストレーションできるようになったのです。
Workflowsを使用すると、組み込みのエラー処理と再試行動作を使用して、アプリケーション内のべき等ステップのシーケンスを自動化できます。しかし、Workflowsは元々TypeScriptでのみサポートされていました。Pythonは、データパイプライン、人工知能/機械学習、タスク自動化のためのデファクト言語です。これらはすべて、オーケストレーションに大きく依存しているため、これが多くの開発者にとって摩擦を引き起こしました。
長年にわたり、開発者に、Pythonでアプリケーションを構築するためのツールをCloudflare上で提供してきました。2020年には、Transcrypt経由でPythonをWorkersに導入し、2024年にはPythonをworkerdに直接統合しました。今年初め、私たちはmatprotlibやpandasのように、Pyodideでビルドされたパッケージとともに、CPythonのサポートをWorkersで構築しました。現在、Python Workflowsもサポートされているため、開発者は、慣れ親しんだ言語を使用して堅牢なアプリケーションを作成できます。
なぜPython for Workflowsなのか?
LLMを訓練しているところを想像してみてください。データセットにラベルを付け、データをフィードし、モデルが実行するのを待ち、損失を評価し、モデルを調整し、これを繰り返す必要があります。自動化がなければ、各ステップを開始し、完了まで手動で監視し、次のステップを始める必要があります。代わりに、ワークフローを使用してモデルのトレーニングを調整し、前のステップの完了を保留して各ステップをトリガーすることができます。損失を評価し、それに応じてモデルを調整するなど、手動調整が必要な場合は、通知を行うステップを実装して、必要な入力を待つことができます。
データパイプラインを考えてみましょう。これは、データを取り込み処理するためのPythonの主なユースケースです。開発者は、一連の識別されたステップによってデータパイプラインを自動化することにより、データパイプライン全体を処理するワークフローをデプロイできます。
別の例として、AIエージェントの構築、例えば食料品を管理するエージェントを構築するといった場合です。毎週、レシピのリストを入力すると、エージェントが(1) 必要な食材リストを作成し、(2) 先週から買いことがてある食材をチェックし、(3) 近所の食料品からの受け取りを注文する可能性があります。Workflowを使用すると、次のようになります:
await step.wait_for_event() ユーザーが食料品リストを入力する
step.do()必要な材料のリストをコンパイルする
step.do()残りの食材に対して必要な材料のリストを確認する
step.do()は、API呼び出しを実行して注文を配置します。
step.do()支払い手続きに進む
Cloudflare上でエージェントを構築するためのツールとしてworkflowsを使用することで、エージェントのアーキテクチャを簡素化し、個々のステップのリトライやステートの永続化によって、完了に達する確率を高めることができます。Python Workflowsのサポートにより、Pythonを使用したエージェントの構築がこれまで以上に容易になります。
Cloudflare Workflowsは、耐久性のある実行のために当社が作成した基盤となるインフラストラクチャを使用し、Pythonユーザーがワークフローを記述できるようにします。さらに、JavascriptとPython SDK間で完全な機能パリティを目指しました。これはCloudflare Workersがランタイム自体でPythonを直接サポートしているから可能です。
Cloudflare Workflowsは、WorkersとDurable Objects上に完全に構築されています。各要素は、Workflowのメタデータとインスタンスレベルの情報を保存する役割を果たします。Workflowsプラットフォームの仕組みの詳細については、こちらのブログ記事をご覧ください。
Workflowsコントロールプレーンの最下部にはユーザーのWorkerがあり、これがWorkflowEntrypointです。Workflowインスタンスが実行する準備ができると、WorkflowエンジンはRPC経由でユーザーWorkerのrunメソッドを呼び出します(この場合はPython Worker)。
これは、公式ドキュメントで提供されているWorkflow宣言のスクレイピングの例です。
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// Steps here
}
}
runメソッドは、前述のように、耐久性のある実行APIを実装するWorkflowStepパラメータを提供します。これは、ユーザーが同時実行できるようにするものです。これらのAPIsはJavaScriptで実装されており、Python Workerのコンテキスト内でアクセスする必要があります。
WorkflowStepは、RPCバリアを通過しなければなりません。つまり、エンジン(呼び出し元)は、WorkflowStepをRpcTargetとして公開することになるのです。このセットアップにより、ユーザーのWorkflow(コール)は、パラメータをスタブに置き換えることができます。このスタブは、RPCをエンジンに戻すことで、Workflowsの耐久性のある実行APIsの使用を可能にします。RPCの直列化と、呼び出し元と呼び出し先から関数を渡す方法については、Remote-Procedure呼び出しのドキュメントをご覧ください。
Workflows側からユーザーWorkerの呼び出し方法を実際に変更するわけではないため、これはPython WorkflowsとJavaScript Workflowsの両方に当てはまります。しかし、Pythonの場合は、PythonとJavaScriptモジュールの間に言語ブリッジという別の障壁があります。RPCリクエストがPython Workerをターゲットとする場合、Pythonスクリプトによって処理されるようにリクエストをプロキシして、呼び出し元に返される責任を負うJavascriptエントリポイントモジュールがあります。このプロセスでは、通常、リクエスト処理の前後にタイプ変換が行われます。
Python workersはPyodideを使用します。PyodideはCPythonのWebAssemblyへのポートです。Pyodideは、Pythonに外部関数インターフェース(FFI)を提供しており、PythonからJavaScriptメソッドを呼び出すことができます。この仕組みによって、他のバインディングやPythonパッケージもWorkersプラットフォーム内で動作するようになります。したがって、このFFIレイヤーを使用することで、Workflowバインディングの直接使用を可能にするだけでなく、PythonでWorkflowStepメソッドを提供することも可能になります。つまり、WorkflowEntrypointがランタイムにとって特別なクラスであると考慮することで、runメソッドは手動でラップされ、WorkflowStepが他のJavaScriptオブジェクトのように型変換されるのではなく、JsProxyとして公開されます。さらに、ユーザーWorkerの視点からAPIsをラップすることで、単にJavaScript SDKを異なる意味の言語に公開するのではなく、開発体験全体にいくつかの調整を加えることができます。
Python Workflows SDKをPython化する
WorkflowsをPythonに移植する際の大部分は、Pythonユーザーが使い慣れた、問題なく使用できるインターフェイスを公開することです。これは、JavaScript APIsで起こることと同様です。少し戻って、Workflow(Typescriptで記述)定義のスニペットを見てみましょう。
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent} from 'cloudflare:workers';
export class MyWorkflow extends WorkflowEntrypoint {
async run(event: WorkflowEvent<YourEventType>, step: WorkflowStep) {
let state = step.do("my first step", async () => {
// Access your properties via event.payload
let userEmail = event.payload.userEmail
let createdTimestamp = event.payload.createdTimestamp
return {"userEmail": userEmail, "createdTimestamp": createdTimestamp}
})
step.sleep("my first sleep", "30 minutes");
await step.waitForEvent<EventType>("receive example event", { type: "simple-event", timeout: "1 hour" })
const developerWeek = Date.parse("22 Sept 2025 13:00:00 UTC");
await step.sleepUntil("sleep until X times out", developerWeek)
}
}
Workflows APIのPython実装には、doメソッドの変更が必要です。他の言語とは異なり、Pythonは匿名コールバックを簡単にサポートしません。この動作は通常、デコレータの使用によって実現されます。この場合、メソッドをインターセプトして慣用的に公開することができます。つまり、すべてのパラメータは元の順序を維持し、デプロイされたメソッドがコールバックとして機能します。
waitForEvent、sleep、sleepUntilは、名前がヘビケースに変換されている限り、元の署名を保持できます。
同じワークフローで、同様の動作を実現するPythonのバージョンは以下の通りです:
from workers import WorkflowEntrypoint
class MyWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
@step.do("my first step")
async def my_first_step():
user_email = event["payload"]["userEmail"]
created_timestamp = event["payload"]["createdTimestamp"]
return {
"userEmail": user_email,
"createdTimestamp": created_timestamp,
}
await my_first_step()
step.sleep("my first sleep", "30 minutes")
await step.wait_for_event(
"receive example event",
"simple-event",
timeout="1 hour",
)
developer_week = datetime(2024, 10, 24, 13, 0, 0, tzinfo=timezone.utc)
await step.sleep_until("sleep until X times out", developer_week)
Workflowsを設計する際、これらのタスクの一部は同時に処理できる場合でも、ステップ間の依存関係を管理することがよくあります。考えていないとしても、多くのWorkflowsには、有向非巡回グラフ(DAG)の実行フローがあります。PyodideはJavascriptのthenablesをキャプチャし、Python awaitablesにプロキシするため、Python Workflowsの最初のイテレーション(つまり、Python Workersへの最小ポート)で並行処理が可能です。
結果として、asyncio.gatherはPromise.allに対応するものとして機能します。これは全く問題なくSDKで使用できますが、宣言的なアプローチもサポートしています。
doメソッドをコーディングする利点の1つは、オリジナルのAPIでさらなる抽象化を提供し、エントリーポイントラッパーで動作させることができるということです。以下は、導入されたDAGの機能を使用するPython APIの例です。
from workers import Response, WorkflowEntrypoint
class PythonWorkflowDAG(WorkflowEntrypoint):
async def run(self, event, step):
@step.do('dependency 1')
async def dep_1():
# does stuff
print('executing dep1')
@step.do('dependency 2')
async def dep_2():
# does stuff
print('executing dep2')
@step.do('demo do', depends=[dep_1, dep_2], concurrent=True)
async def final_step(res1=None, res2=None):
# does stuff
print('something')
await final_step()
この種のアプローチにより、Workflows宣言ははるかにクリーンになり、状態管理はWorkflowsエンジンのデータプレーンとPython WorkersWorkflowsラッパーに任せられます。同じ名前で複数のステップを実行できる場合でも、エンジンは一意性を確保するために、各ステップの名前をわずかに変更することに注意してください。Python Workflowsでは、依存関係は、その依存関係に関わる最初のステップが正常に完了すると解決されたものとみなされます。
PythonでのWorkersの記述をチェックして、今すぐ最初のPythonワークフローを作成しましょう!機能リクエストがある場合、またはバグにお気づきの点がある場合は、DiscordのCloudflare開発者コミュニティに参加して、Cloudflareチームに直接フィードバックを共有してください。