본 콘텐츠는 사용자의 편의를 고려해 자동 기계 번역 서비스를 사용하였습니다. 영어 원문과 다른 오류, 누락 또는 해석상의 미묘한 차이가 포함될 수 있습니다. 필요하시다면 영어 원문을 참조하시기를 바랍니다.
개발자들은 이미 Cloudflare Workflows를 사용하여 Workers에서 장기간 실행되는 다단계 애플리케이션을 구축할 수 있습니다. 이제 Python Workflows가 등장합니다. 원하는 언어를 사용하여 다단계 애플리케이션을 조율할 수 있습니다.
With Workflows를 사용하면 내장된 오류 처리 및 재시도 동작으로 애플리케이션에서 일련의 멱등 단계를 자동화할 수 있습니다. 하지만 Workflows는 원래 TypeScript에서만 지원되었습니다. 파이썬이 사실상 데이터 파이프라인, 인공 지능/머신 러닝, 작업 자동화(모두 오케스트레이션에 크게 의존하는 언어)에서 선택되는 언어였기 때문에, 이로 인해 많은 개발자들이 어려움을 겪었습니다.
여러 해에 걸쳐 Cloudflare는 개발자들에게 Cloudflare에서 Python으로 이러한 애플리케이션을 구축할 수 있는 도구를 제공해 왔습니다. 2020년에 Python을 Transcrypt를 통해 Workers에 도입한 후 2024년에 Python을 workerd에 직접 통합했습니다. 올해 초, 저희는 Workers에서 실행되는 matplotlib 및 pandas와 같이 Pyodide로 구축된 모든 패키지와 함께 CPython에 대한 지원을 구축했습니다. 이제 Python Workflows도 지원되므로 개발자가 가장 잘 아는 언어를 사용하여 강력한 애플리케이션을 만들 수 있습니다.
왜 Python for Workflows인가?
LLM을 학습시킨다고 상상해 보세요. 데이터 세트에 레이블을 지정하고, 데이터를 피드하며, 모델이 실행될 때까지 기다렸다가 손실을 평가하고, 모델을 조정하는 것을 반복해야 합니다. 자동화를 사용하지 않는다면 각 단계를 시작하고 완료될 때까지 수동으로 모니터링하고, 다음 단계를 시작해야 합니다. 대신, 워크플로우를 사용하여 모델 학습을 조율하고, 이전 단계가 완료될 때까지 각 단계를 트리거할 수 있습니다. 손실을 평가하고 그에 따라 모델을 조정하는 등 수동 조정이 필요한 경우, 사용자에게 알리고 필요한 입력을 기다리는 단계를 구현할 수 있습니다.
데이터 수집 및 처리를 위한 Python 사용 사례인 데이터 파이프라인을 고려해 보세요. 개발자는 정의된 멱등 단계 집합을 통해 데이터 파이프라인을 자동화함으로써 전체 데이터 파이프라인을 처리하는 워크플로우를 배포할 수 있습니다.
또 다른 예를 들어보겠습니다. 식품을 관리하기 위한 에이전트와 같은 AI 에이전트를 구축하는 것입니다. 매주 레시피 목록을 입력하면 에이전트가 (1) 필요한 재료 목록을 작성하고, (2) 지난 몇 주 동안 먹으려고 남은 재료를 확인하며, (3) 지역 식품점에 픽업을 주문합니다 저장합니다. Workflow를 사용하면 이는 다음과 같은 모습일 수 있습니다.
await step.wait_for_event() 사용자가 식품 목록을 입력합니다.
step.do() com파일 목록 필수 요소
step.do() 남은 재료 대비 필수 재료 목록
step.do() API 호출을 통해 주문을 걸고
step.do() 결제를 진행합니다
Cloudflare에서 에이전트를 구축하기 위한 도구로 워크플로를 사용하면 에이전트의 아키텍처를 단순화하며, 개별 단계 재시도와 상태 지속성을 통해 완료 확률을 높일 수 있습니다. Python Workflows를 지원하므로 Python으로 에이전트를 구축하는 일이 그 어느 때보다 쉬워집니다.
Cloudflare Workflows는 Python 사용자가 Workflows를 작성할 수 있는 관용적 방법을 제공하면서 지속형 실행을 위해 만든 기본 인프라를 사용합니다. 또한, 저희는 Javascript와 Python SDK 간의 완전한 기능 동등성을 목표로 했습니다. 이는 Cloudflare Workers가 런타임 자체에서 직접 Python을 지원하기 때문에 가능합니다.
Cloudflare Workflows는 Workers 및 Durable Objects를 기반으로 구축되었습니다. 각 요소는 Workflow 메타데이터와 인스턴스 수준 정보를 저장하는 역할을 합니다. Workflows 플랫폼의 작동 방식에 대한 자세한 내용은 이 블로그 게시물을 확인하세요.
Workflows 제어창의 맨 아래에는 사용자 Worker가 있으며, 이는 WorkflowEntrypoint입니다. Workflow 인스턴스를 실행할 준비가 되면 Workflow 엔진은 RPC를 통해 사용자 worker의 run 메서드를 호출하며, 이 경우에는 Python Worker가 됩니다.
다음은 공식 문서에서 제공하는 Workflow 선언의 뼈대 예시입니다.
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// Steps here
}
}
위에 설명된 것처럼, run 메서드는 내구성 실행 API를 구현하는 WorkflowStep 매개변수를 제공합니다. 이는 사용자가 최대 한 번 실행에 의존하는 것입니다. 이러한 APIs는 JavaScript로 구현되며 Python Worker의 컨텍스트에서 액세스해야 합니다.
WorkflowStep은 RPC 장벽을 통과해야 하며, 이는 엔진(호출자)이 이를 RpcTarget으로 노출함을 의미합니다. 이 설정을 통해 사용자의 Workflow(수신 수신자)가 매개변수를 스텁으로 대체할 수 있습니다. 그런 다음 이 스텁을 통해 엔진으로 다시 RPCing하여 Workflows에 지속형 실행 APIs를 사용할 수 있습니다. RPC 직렬화와 호출자 및 수신자 간에 함수를 전달하는 방법에 대해 자세히 알아보려면 원격 프로시저 호출 문서를 읽어보세요.
Workflows 측에서 사용자 Worker를 호출하는 방식은 실제로 변경되지 않으므로 이 모든 것은 Python과 JavaScript Workflows 모두에 해당됩니다. 하지만 Python의 경우 Python과 JavaScript 모듈 사이를 연결하는 언어라는 또 다른 장벽이 있습니다. RPC 요청이 Python Worker를 대상으로 하는 경우, Python 스크립트가 요청을 처리한 다음 호출자에게 반환하도록 요청을 프록시 설정하는 Javascript 진입점 모듈이 있습니다. 이 프로세스에는 일반적으로 요청 처리 전과 후에 유형 변환이 포함됩니다.
Python Workers는 CPython을 WebAssembly로 이식하는 Pyodide를 사용합니다. Pyodide는 Python에서 JavaScript 메서드를 호출할 수 있도록 JavaScript에 외부 함수 인터페이스(FFI)를 제공합니다. 이러한 메커니즘을 통해 다른 바인딩 및 Python 패키지가 Workers 플랫폼 내에서 작동할 수 있게 됩니다. 따라서 이 FFI 계층을 사용하여 Workflow 바인딩을 직접 사용할 수 있을 뿐만 아니라 Python에서 WorkflowStep 메서드를 제공합니다. 즉, WorkflowEntrypoint 가 런타임의 특수 클래스라는 점을 고려하여, run 메서드를 수동으로 래핑하여 WorkflowStep 이 다른 JavaScript 객체처럼 타입 변환되지 않고 JsProxy 로 노출되도록 하는 것입니다. 또한 사용자 Worker의 관점에서 APIs를 래핑함으로써 단순히 JavaScript SDK를 다른 시맨틱을 가진 다른 언어에 노출하는 대신 전반적인 개발 경험을 일부 조정할 수 있습니다.
Python Workflows SDK를 Pythonic으로 만들기
Workflows를 Python으로 포팅할 때 큰 부분은 Python 사용자에게 친숙하고 사용에 문제가 없는 인터페이스를 노출하는 것인데, 이는 Cloudflare의 JavaScript APIs에서 발생하는 것과 유사합니다. 한 걸음 물러서서 Typescript로 작성된 Workflow 정의 스니펫을 살펴보겠습니다.
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent} from 'cloudflare:workers';
export class MyWorkflow extends WorkflowEntrypoint {
async run(event: WorkflowEvent<YourEventType>, step: WorkflowStep) {
let state = step.do("my first step", async () => {
// Access your properties via event.payload
let userEmail = event.payload.userEmail
let createdTimestamp = event.payload.createdTimestamp
return {"userEmail": userEmail, "createdTimestamp": createdTimestamp}
})
step.sleep("my first sleep", "30 minutes");
await step.waitForEvent<EventType>("receive example event", { type: "simple-event", timeout: "1 hour" })
const developerWeek = Date.parse("22 Sept 2025 13:00:00 UTC");
await step.sleepUntil("sleep until X times out", developerWeek)
}
}
Python에서 워크플로 API를 구현하려면 do 메서드를 수정해야 합니다. 다른 언어와 달리 Python은 익명 콜백을 쉽게 지원하지 않습니다. 이러한 동작은 일반적으로 데코레이터를 사용하여 이루어지며, 이 경우 데코레이터를 사용하면 메서드를 가로채서 관용적 방식으로 노출할 수 있습니다. 즉, 장식된 메서드가 콜백 역할을 하여 모든 매개변수가 원래 순서를 유지합니다.
waitForEvent, 절전 모드 및 절전 모드로 전환하는 메서드는 이름을 뱀의 대/소문자로 변환하는 한 원래 서명을 유지할 수 있습니다.
다음은 동일한 워크플로에 대해 유사한 동작을 보이는 Python 버전입니다.
from workers import WorkflowEntrypoint
class MyWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
@step.do("my first step")
async def my_first_step():
user_email = event["payload"]["userEmail"]
created_timestamp = event["payload"]["createdTimestamp"]
return {
"userEmail": user_email,
"createdTimestamp": created_timestamp,
}
await my_first_step()
step.sleep("my first sleep", "30 minutes")
await step.wait_for_event(
"receive example event",
"simple-event",
timeout="1 hour",
)
developer_week = datetime(2024, 10, 24, 13, 0, 0, tzinfo=timezone.utc)
await step.sleep_until("sleep until X times out", developer_week)
Workflows를 설계할 때는 이러한 작업 중 일부가 동시에 처리될 수 있는 경우에도 단계 간의 종속성을 관리하는 경우가 많습니다. 별도로 생각하고 있지는 않지만, 많은 Workflows에는 유방향 비순환 그래프(DAG) 실행 흐름이 있습니다. Python Workflows의 첫 번째 반복에서는 Pyodide가 Javascript theable을 캡처하고 이를 Python awaitable로 프록시하기 때문에 동시성이 달성 가능합니다(즉: Python Workers로 최소한의 포트).
따라서 asyncio.gather 는 Promise.all에 대응합니다. 이는 완벽하게 괜찮고 SDK에서 사용할 준비가 되었지만, 저희는 선언적 접근 방식도 지원합니다.
do 메서드를 데코레이션하는 것의 장점 중 하나는 원래 API에서 추가 추상화를 본질적으로 제공하고 진입점 래퍼에서 작동하도록 할 수 있다는 것입니다. 다음은 DAG 기능을 사용하는 Python API의 예입니다.
from workers import Response, WorkflowEntrypoint
class PythonWorkflowDAG(WorkflowEntrypoint):
async def run(self, event, step):
@step.do('dependency 1')
async def dep_1():
# does stuff
print('executing dep1')
@step.do('dependency 2')
async def dep_2():
# does stuff
print('executing dep2')
@step.do('demo do', depends=[dep_1, dep_2], concurrent=True)
async def final_step(res1=None, res2=None):
# does stuff
print('something')
await final_step()
이러한 종류의 접근 방식을 사용하면 Workflow 선언이 훨씬 명확해지며, 상태 관리는 Workflows 엔진 데이터 영역과 Python Workers Workflow 래퍼에 맡겨집니다. 여러 단계를 동일한 이름으로 실행할 수 있더라도 엔진은 고유성을 보장하기 위해 각 단계의 이름을 약간 수정합니다. Python Workflows에서는 종속성과 관련된 초기 단계가 성공적으로 완료되면 종속성이 해결된 것으로 간주됩니다.
Python으로 Workers 작성 을 확인하고 첫 번째 Python Workflow 를 지금 만들어 보세요! 기능에 요청하거나 버그를 발견하면, Discord의 Cloudflare 개발자 커뮤니티에 가입하여 Cloudflare팀에 직접 피드백을 공유해 주세요.