Skip to content

Integrate with Prefect

Use Prefect for orchestration, observability, and workflow management.

Overview

The Prefect integration provides (near-zero boilerplate) logic to build any Stardag DAG such that the execution gets mapped to native Prefect primitives:

  • Get observability via the Prefect UI
  • Manage concurrent task execution, retry logic and error handling via native Prefect Task Runners
  • Still leverage Stardag for Makefile-style/bottom-up execution and persistent caching

Prerequisites

Stardag with prefect extra dependencies installed:

uv add stardag[prefect]
pip install stardag[prefect]

You'll also need a Prefect server or Prefect Cloud account.

Setup

Start a local Prefect server:

prefect server start

Then, in a separate terminal:

export PREFECT_API_URL="http://127.0.0.1:4200/api"

Sign up at prefect.io then:

prefect cloud login

Basic Usage

Use the Prefect builder to execute your DAG with Prefect orchestration:

import asyncio

import stardag as sd
from prefect import flow
from stardag.integration.prefect import build_aio as prefect_build_aio


@sd.task
def fetch_data(source: str) -> list[int]:
    return [1, 2, 3, 4, 5]


@sd.task
def process(data: sd.Depends[list[int]]) -> int:
    return sum(data)


@flow
async def my_flow():
    task = process(data=fetch_data(source="api"))
    await prefect_build_aio(task)


if __name__ == "__main__":
    asyncio.run(my_flow())

The only thing needed to build your DAG using Prefect primitives is to replace stardag.build_aio with stardag.integration.prefect.build_aio.

You can call stardag.integration.prefect.build_aio anywhere in an existing Prefect flow and mix Prefect's imperative execution with Stardag's bottom-up, persistently cached execution to your liking.

A difference between stardag.build_aio and stardag.integration.prefect.build_aio is that the latter returns dict[str, PrefectConcurrentFuture] - a mapping from task.id to a Prefect future. If you set build_aio(..., wait_for_completion=False), the function will return as soon as the DAG is traversed and all tasks are submitted to the native Prefect TaskRunner, so that you can continue submitting other Prefect tasks to the task runner conditioned on Stardag tasks having completed.

Running the stardag-examples Example

The examples package includes a ready-to-run Prefect example. First, clone the repo:

Clone using the web URL.

git clone https://github.com/stardag-dev/stardag.git
cd stardag/lib/stardag-examples

Use a password-protected SSH key.

git clone git@github.com:stardag-dev/stardag.git
cd stardag/lib/stardag-examples

Use the GitHub official CLI. Learn more

gh repo clone stardag-dev/stardag
cd stardag/lib/stardag-examples

And install the package (with prefect and ml-pipeline extra dependencies):

uv sync --extra prefect --extra ml-pipeline
uv run python -m stardag_examples.prefect.main

You'll see Prefect logs in your terminal. Navigate to the Prefect UI and click "latest run" to see your DAG:

Prefect UI showing DAG execution

Click the artifacts to get more details about the task:

Task details

Configuration

Environment Variables

Variable Description
PREFECT_API_URL Prefect API URL (local or cloud)

Using with Modal

For running Prefect-observed Stardag on Modal's serverless infrastructure, see Integrate with Modal and the Modal/Prefect Example.

See Also