Integrate with Prefect¶
Use Prefect for orchestration, observability, and workflow management.
Overview¶
The Prefect integration provides (near-zero boilerplate) logic to build any Stardag DAG such that the execution gets mapped to native Prefect primitives:
- Get observability via the Prefect UI
- Manage concurrent task execution, retry logic and error handling via native Prefect Task Runners
- Still leverage Stardag for Makefile-style/bottom-up execution and persistent caching
Prerequisites¶
Stardag with prefect extra dependencies installed:
You'll also need a Prefect server or Prefect Cloud account.
Setup¶
Start a local Prefect server:
Then, in a separate terminal:
Sign up at prefect.io then:
Basic Usage¶
Use the Prefect builder to execute your DAG with Prefect orchestration:
import asyncio
import stardag as sd
from prefect import flow
from stardag.integration.prefect import build_aio as prefect_build_aio
@sd.task
def fetch_data(source: str) -> list[int]:
return [1, 2, 3, 4, 5]
@sd.task
def process(data: sd.Depends[list[int]]) -> int:
return sum(data)
@flow
async def my_flow():
task = process(data=fetch_data(source="api"))
await prefect_build_aio(task)
if __name__ == "__main__":
asyncio.run(my_flow())
The only thing needed to build your DAG using Prefect primitives is to replace stardag.build_aio with stardag.integration.prefect.build_aio.
You can call stardag.integration.prefect.build_aio anywhere in an existing Prefect flow and mix Prefect's imperative execution with Stardag's bottom-up, persistently cached execution to your liking.
A difference between stardag.build_aio and stardag.integration.prefect.build_aio is that the latter returns dict[str, PrefectConcurrentFuture] - a mapping from task.id to a Prefect future. If you set build_aio(..., wait_for_completion=False), the function will return as soon as the DAG is traversed and all tasks are submitted to the native Prefect TaskRunner, so that you can continue submitting other Prefect tasks to the task runner conditioned on Stardag tasks having completed.
Running the stardag-examples Example¶
The examples package includes a ready-to-run Prefect example. First, clone the repo:
And install the package (with prefect and ml-pipeline extra dependencies):
You'll see Prefect logs in your terminal. Navigate to the Prefect UI and click "latest run" to see your DAG:
Click the artifacts to get more details about the task:
Configuration¶
Environment Variables¶
| Variable | Description |
|---|---|
PREFECT_API_URL |
Prefect API URL (local or cloud) |
Using with Modal¶
For running Prefect-observed Stardag on Modal's serverless infrastructure, see Integrate with Modal and the Modal/Prefect Example.
See Also¶
- ML Pipeline Example - Complete ML pipeline walkthrough
- Prefect Examples - Source code
- Prefect Documentation - Prefect features