Skip to content

API Reference

Auto-generated API documentation from source code.

Core Module

Stardag: Declarative and composable DAG framework for Python.

Stardag provides a clean Python API for representing persistently stored assets as a declarative Directed Acyclic Graph (DAG).

Basic usage::

import stardag as sd

@sd.task
def get_range(limit: int) -> list[int]:
    return list(range(limit))

@sd.task
def get_sum(integers: sd.Depends[list[int]]) -> int:
    return sum(integers)

task = get_sum(integers=get_range(limit=10))
sd.build(task)
print(task.target().load())  # 45

Core components:

  • :func:task - Decorator for creating tasks from functions
  • :class:Task - Task with automatic serialization and filesystem targets
  • :class:LoadableTask - Abstract base for tasks with load() -> T
  • :class:TargetTask - Base class for tasks with typed target outputs
  • :class:Depends - Dependency injection type annotation
  • :func:build - Execute task and its dependencies

See https://docs.stardag.com for full documentation.

TODO: Expand docstrings for all public API components.

Depends module-attribute

Depends = Annotated[_DependsT, _DependsOnMarker]

TaskLoads module-attribute

TaskLoads = Annotated[
    LoadableTask[LoadedT_co], Polymorphic()
]

TaskStruct module-attribute

TaskStruct = Union[
    "BaseTask",
    Sequence["TaskStruct"],
    Mapping[str, "TaskStruct"],
]

target_factory_provider module-attribute

target_factory_provider = resource_provider(
    type_=TargetFactory, default_factory=TargetFactory
)

BaseTask

Bases: PolymorphicRoot

__init_subclass__

__init_subclass__(**kwargs)

Validate that subclasses implement either run() or run_aio().

Also wraps run() and run_aio() methods with precheck validation.

complete abstractmethod

complete()

Declare if the task is complete.

complete_aio async

complete_aio()

Asynchronously declare if the task is complete.

run

run()

Execute the task logic (sync).

Override this method for synchronous tasks. If you only override run_aio(), this method will automatically run it via asyncio.run().

RETURNS DESCRIPTION
None | Generator[TaskStruct, None, None]

None for simple tasks, or a Generator yielding TaskStruct for

None | Generator[TaskStruct, None, None]

tasks with dynamic dependencies (See Dynamic Dependencies Contract below).

RAISES DESCRIPTION
RuntimeError

If called from within an existing event loop when only run_aio() is implemented. In that case, call run_aio() directly instead.

NotImplementedError

If run_aio() is an async generator (dynamic deps). Async generators cannot be automatically converted to sync generators.

Dynamic Dependencies Contract: When a task yields dynamic dependencies via a generator, the BUILD SYSTEM guarantees that ALL yielded tasks are COMPLETE before the generator is resumed. The task can rely on this contract:

def run(self):
    # Do some initial work to get info about what additional dependencies
    # are needed
    initial_data = "..."

    # Yield deps we need to be built first
    task_a = TaskA(input=initial_data)
    task_b = TaskB(input=initial_data)
    yield [task_a, task_b]

    # CONTRACT: When we reach here, ALL deps are complete.
    # We can safely access their outputs.
    result_a = task_a.target().load()
    result_b = task_b.target().load()

    # Yield more deps if needed
    task_c = TaskC(input=result_a)
    yield task_c

    # Again, TaskC is complete when we reach here
    self.target().save(task_c.target().load() + result_b)

This contract is essential for correctness - tasks can depend on previously yielded tasks being complete before continuing execution.

run_aio async

run_aio()

Execute the task logic (async).

Override this method for asynchronous tasks. If you only override run(), this method will automatically delegate to it.

For dynamic dependencies, you can use 'yield' which makes this an async generator. Note that async generator methods have different type signatures that may require type: ignore comments.

RETURNS DESCRIPTION
None | Generator[TaskStruct, None, None]

None for simple tasks, or a Generator/AsyncGenerator for

None | Generator[TaskStruct, None, None]

tasks with dynamic dependencies.

Dynamic Dependencies Contract

Same as run() - the build system guarantees that ALL yielded tasks are COMPLETE before the generator is resumed. See run() docstring for detailed documentation and examples.

artifacts

artifacts()

Return artifacts to be stored in the registry after task completion.

Override this method to expose rich outputs (reports, summaries, structured data) that will be viewable in the registry UI.

This method is called after the task completes successfully. It should be stateless - loading any required data from the task's target rather than relying on in-memory state.

RETURNS DESCRIPTION
Sequence[Artifact]

Sequence of artifacts (MarkdownArtifact, JSONArtifact, etc.)

artifacts_aio async

artifacts_aio()

Asynchronously return artifacts to be stored in the registry after task completion.

resolve classmethod

resolve(namespace, name, extra)

Override PolymorphicRoot.resolve to handle AliasTask deserialization.

from_registry classmethod

from_registry(id, registry=None)

Instantiate the task from the registry.

PARAMETER DESCRIPTION
id

The UUID (or string representation) of the task to load.

TYPE: UUID | str

registry

An optional registry instance to use for loading metadata. If not provided, the default registry from registry_provider will be used.

TYPE: Union[RegistryABC, None] DEFAULT: None

Returns: An AliasTask instance referencing the specified task.

LoadableTask

Bases: BaseTask, ABC, Generic[LoadedT_co]

A task that can load its output as a typed value.

This is the minimal interface required by :class:~stardag.TaskLoads: any BaseTask subclass that implements load() -> T is compatible with TaskLoads[T].

Both :class:~stardag.Task (via diamond inheritance) and bare subclasses of LoadableTask satisfy TaskLoads[T].

Subclasses must implement at least one of load() or load_aio(). The missing method will delegate to the other automatically (mirroring the run/run_aio pattern on BaseTask).

load

load()

Load the output of this task (sync).

If only load_aio() is implemented, this delegates via asyncio.run(). Raises RuntimeError if called from within an existing event loop.

load_aio async

load_aio()

Asynchronously load the output of this task.

If only load() is implemented, this delegates to it.

TargetTask

Bases: BaseTask, Generic[TargetType]

Base class for tasks that produce a target output.

Extends BaseTask with a typed target() method and a default complete() implementation that checks whether the target exists.

Most users should subclass :class:~stardag.Task (which extends this class with automatic serialization and filesystem target management) rather than using TargetTask directly.

complete

complete()

Check if the task is complete.

complete_aio async

complete_aio()

Asynchronously check if the task is complete.

target abstractmethod

target()

The task output target.

Task

Bases: TargetTask[LoadableSaveableFileSystemTarget[LoadedT]], LoadableTask[LoadedT], ABC, Generic[LoadedT]

A base class for tasks with automatic serialization and filesystem targets.

The target of a Task is a LoadableSaveableFileSystemTarget that uses a serializer inferred from the generic type parameter LoadedT.

The target file path is automatically constructed based on the task's namespace, name, version, and unique ID and has the following structure:

[<relpath_base>/][<namespace>/]<name>/v<version>/[<relpath_extra>/]
<id>[:2]/<id>[2:4]/<id>[/<relpath_filename>].<relpath_extension>

You can override the following properties to customize the target path: _relpath_base, _relpath_extra, _relpath_filename, and _relpath_extension.

See stardag.target.serialize.get_serializer for details on how the serializer is inferred from the generic type parameter, and how to customize it.

Example:

import stardag as sd

class MyTask(sd.Task[dict[str, int]]):
    def run(self):
        self._save({"a": 1, "b": 2})

my_task = MyTask()

print(my_task.target())
# FileSerializable(../MyTask/03/6f/036f6e71-1b3c-54b8-aec1-182359f1e09a.json)

print(my_task.target().serializer)
# <stardag.target.serialize.JSONSerializer at 0x1064e4710>

serializer property

serializer

The serializer used for this task's target.

__map_generic_args_to_ancestor__ classmethod

__map_generic_args_to_ancestor__(ancestor_origin, args)

Map generic args from Task to how they appear on an ancestor class.

This enables type compatibility checking when using Task with polymorphic annotations like TaskLoads[T] and SubClass[TargetTask[LoadableTarget[T]]].

PARAMETER DESCRIPTION
ancestor_origin

The ancestor class to map args to

TYPE: type

args

The generic args of this class (e.g., (str,) for Task[str])

TYPE: tuple

RETURNS DESCRIPTION
tuple | None

The mapped args for the ancestor, or None if mapping is not applicable.

load

load()

Load the task target and run any LoadValidators.

load_aio async

load_aio()

Async load — delegates to the target's load_aio and validates.

TaskRef dataclass

TaskRef(name, version, id)

task

task(
    _func: Callable[_PWrapped, _FuncReturnT],
    *,
    name: str | None = None,
    version: str = "",
    relpath: RelpathSettings
    | _RelpathOverride
    | None = None,
    target_root_key: str | None = None,
) -> Type[_FunctionTask[_FuncReturnT, _PWrapped]]
task(
    *,
    name: str | None = None,
    version: str = "",
    relpath: RelpathSettings
    | _RelpathOverride
    | None = None,
    target_root_key: str | None = None,
) -> _TaskWrapper
task(
    _func=None,
    *,
    name=None,
    version="",
    relpath=None,
    target_root_key=None,
)

build_aio async

build_aio(
    tasks,
    task_executor=None,
    fail_mode=FAIL_FAST,
    registry=None,
    max_concurrent_discover=50,
    global_lock_manager=None,
    global_lock_config=None,
    resume_build_id=None,
    register_all=False,
    on_registry_failure="raise",
)

Build tasks concurrently using hybrid async/thread/process execution.

This is the main build function for production use. It: - Discovers all tasks in the DAG(s) and registers each one with the registry as soon as it's discovered (so the full DAG is visible in the UI immediately, not progressively as tasks become runnable) - Schedules tasks for execution when dependencies are met - Handles dynamic dependencies via generator suspension - Supports multiple root tasks (built concurrently) - Routes tasks to async/thread/process based on ExecutionModeSelector - Manages all registry interactions (register/start/complete/fail task) - Optionally uses global concurrency locks for distributed execution

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

task_executor

TaskExecutor for executing tasks (default: HybridConcurrentTaskExecutor). Use RoutedTaskExecutor to route tasks to different executors (e.g., Modal).

TYPE: TaskExecutorABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

registry

Registry for tracking builds (default: from registry_provider)

TYPE: RegistryABC | None DEFAULT: None

max_concurrent_discover

Maximum concurrent completion checks during DAG discovery. Higher values speed up discovery for large DAGs with remote targets.

TYPE: int DEFAULT: 50

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution to ensure exactly-once execution across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

build_sequential

build_sequential(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    dual_run_default="sync",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
    register_all=False,
    on_registry_failure="raise",
)

Sync API for building tasks sequentially.

This is intended primarily for debugging and testing.

Tasks are registered with the registry as they are discovered (in deterministic DFS order from the roots), so the full DAG appears in the UI immediately rather than progressively as tasks become runnable.

Task execution policy: - Sync-only tasks: run via run() - Async-only tasks: run via asyncio.run(run_aio()). (Does not work if called from within an existing event loop.) - Dual tasks: run via run() if dual_run_default=="sync" (default), else (dual_run_default=="async") via asyncio.run(run_aio()).

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

dual_run_default

For dual tasks, prefer sync or async execution

TYPE: Literal['sync', 'async'] DEFAULT: 'sync'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

build_sequential_aio async

build_sequential_aio(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    sync_run_default="blocking",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
    register_all=False,
    on_registry_failure="raise",
)

Async API for building tasks sequentially.

This is intended primarily for debugging and testing.

Tasks are registered with the registry as they are discovered (in deterministic DFS order from the roots), so the full DAG appears in the UI immediately rather than progressively as tasks become runnable.

Task execution policy: - Sync-only tasks: runs blocking via run() in main event loop if sync_run_default=="blocking" (default), else (sync_run_default=="thread") in thread pool. - Async-only tasks: run via await run_aio(). - Dual tasks: run via await run_aio().

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

sync_run_default

For sync-only tasks, block or use thread pool

TYPE: Literal['thread', 'blocking'] DEFAULT: 'blocking'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

namespace

namespace(namespace, scope)

Set the task namespace for the module and any submodules.

PARAMETER DESCRIPTION
namespace

The namespace to set for the module.

TYPE: str

scope

The module scope, typically passed as __name__.

TYPE: str

Usage:

```python import stardag as sd sd.namespace("my_custom_namespace", name)

class MyNamespacedTask(sd.Task[int]): a: int

def run(self):
    self._save(self.a)

assert MyNamespacedTask.get_namespace() == "my_custom_namespace"

auto_namespace

auto_namespace(scope)

Set the task namespace for the module to the module import path.

PARAMETER DESCRIPTION
scope

The module scope, typically passed as __name__.

TYPE: str

Usage:

import stardag as sd

sd.auto_namespace(__name__)

class MyAutoNamespacedTask(sd.Task[int]):
    a: int

    def run(self):
        self._save(self.a)

assert MyAutoNamespacedTask.get_namespace() == __name__

get_file_target

get_file_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Get a file target for the given relative path.

get_directory_target

get_directory_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Build Module

build

Build module for stardag.

This module provides functions and classes for building task DAGs.

Primary build functions: - build(): Concurrent build, recommended for real workloads from a sync context - build_aio(): Async concurrent build, recommended for real workloads from an async context or already running event loop - build_sequential(): Sync sequential build (for debugging) - build_sequential_aio(): Async sequential build (for debugging)

Task executor: - HybridConcurrentTaskExecutor: Routes tasks to async/thread/process pools

Interfaces: - TaskExecutorABC: Abstract base class for custom task executors - ExecutionModeSelector: Protocol for custom execution mode selection

Global concurrency locking: - GlobalConcurrencyLockManager: Protocol for distributed lock implementations - LockHandle: Protocol for lock handles (async context manager) - GlobalLockConfig: Configuration for global locking behavior

BuildSummary dataclass

BuildSummary(status, task_count, build_id=None, error=None)

Summary of a build execution.

raise_on_failure

raise_on_failure()

Raise :class:BuildFailed if the build status is FAILURE.

Source code in stardag/build/_base.py
def raise_on_failure(self) -> None:
    """Raise :class:`BuildFailed` if the build status is ``FAILURE``."""
    if self.status == BuildExitStatus.FAILURE:
        raise BuildFailed(self)

__repr__

__repr__()

Return a human-readable summary of the build.

Source code in stardag/build/_base.py
def __repr__(self) -> str:
    """Return a human-readable summary of the build."""
    tc = self.task_count
    status_icon = "✓" if self.status == BuildExitStatus.SUCCESS else "✗"
    lines = [
        f"Build {self.status.value.upper()} {status_icon}",
    ]
    if self.build_id:
        lines.append(f"  Build ID: {self.build_id}")
    lines.extend(
        [
            f"  Discovered: {tc.discovered}",
            f"  Previously completed: {tc.previously_completed}",
            f"  Succeeded: {tc.succeeded}",
            f"  Failed: {tc.failed}",
        ]
    )
    if tc.cancelled > 0:
        lines.append(f"  Cancelled: {tc.cancelled}")
    if tc.skipped > 0:
        lines.append(f"  Skipped: {tc.skipped}")
    if tc.pending > 0:
        lines.append(f"  Pending: {tc.pending}")
    if self.error:
        lines.append(f"  Error: {self.error}")
    return "\n".join(lines)

BuildExitStatus

Bases: StrEnum

FailMode

Bases: StrEnum

How to handle task failures during build.

ATTRIBUTE DESCRIPTION
FAIL_FAST

Stop the build at the first task failure.

CONTINUE

Continue executing all tasks whose dependencies are met, even if some tasks have failed.

HybridConcurrentTaskExecutor

HybridConcurrentTaskExecutor(
    execution_mode_selector=None,
    max_async_workers=10,
    max_thread_workers=10,
    max_process_workers=None,
)

Bases: TaskExecutorABC

Task executor with async, thread, and process pools.

Routes tasks to appropriate execution context based on ExecutionModeSelector. Handles generator suspension for dynamic dependencies.

Note: This executor does not handle registry calls - those are managed by the build() function. The executor only executes tasks and returns results.

For routing tasks to different executors (e.g., some to Modal, some local), use RoutedTaskExecutor to compose multiple executors.

Alternative: For fully async multiprocessing without thread pools, one could implement an AIOMultiprocessingTaskExecutor using libraries like aiomultiprocess.

PARAMETER DESCRIPTION
execution_mode_selector

Callable to select execution mode per task.

TYPE: ExecutionModeSelector | None DEFAULT: None

max_async_workers

Maximum concurrent async tasks (semaphore-based).

TYPE: int DEFAULT: 10

max_thread_workers

Maximum concurrent thread pool workers.

TYPE: int DEFAULT: 10

max_process_workers

Maximum concurrent process pool workers.

TYPE: int | None DEFAULT: None

Source code in stardag/build/_concurrent.py
def __init__(
    self,
    execution_mode_selector: ExecutionModeSelector | None = None,
    max_async_workers: int = 10,
    max_thread_workers: int = 10,
    max_process_workers: int | None = None,
) -> None:
    self.execution_mode_selector = (
        execution_mode_selector or DefaultExecutionModeSelector()
    )
    self.max_async_workers = max_async_workers
    self.max_thread_workers = max_thread_workers
    self.max_process_workers = max_process_workers

    # Pools - initialized in setup()
    self._async_semaphore: asyncio.Semaphore | None = None
    self._thread_pool: ThreadPoolExecutor | None = None
    self._process_pool: ProcessPoolExecutor | None = None

    # Track suspended generators (task_id -> sync or async generator)
    # For in-process execution where we can suspend and resume
    self._suspended_generators: dict[
        UUID,
        Union[
            Generator[TaskStruct, None, None],
            AsyncGenerator[TaskStruct, None],
        ],
    ] = {}

    # Track tasks pending re-execution (task_id -> True)
    # For cross-process/remote execution: when task yields incomplete deps,
    # it's re-executed from scratch after deps complete (idempotent re-execution)
    self._pending_reexecution: set[UUID] = set()

setup async

setup()

Initialize worker pools.

Source code in stardag/build/_concurrent.py
async def setup(self) -> None:
    """Initialize worker pools."""
    import multiprocessing as mp

    self._async_semaphore = asyncio.Semaphore(self.max_async_workers)
    self._thread_pool = ThreadPoolExecutor(max_workers=self.max_thread_workers)
    if self.max_process_workers:
        # Use 'spawn' explicitly for cross-platform compatibility.
        # Python 3.14 changed the default from 'fork' to 'forkserver' on Linux,
        # which can cause issues with environment variable inheritance.
        # 'spawn' is the safest option and works consistently across platforms.
        self._process_pool = ProcessPoolExecutor(
            max_workers=self.max_process_workers,
            mp_context=mp.get_context("spawn"),
        )

teardown async

teardown()

Shutdown worker pools.

Source code in stardag/build/_concurrent.py
async def teardown(self) -> None:
    """Shutdown worker pools."""
    if self._thread_pool:
        self._thread_pool.shutdown(wait=True)
        self._thread_pool = None
    if self._process_pool:
        self._process_pool.shutdown(wait=True)
        self._process_pool = None
    self._async_semaphore = None
    self._suspended_generators.clear()
    self._pending_reexecution.clear()

submit async

submit(task)

Execute a task and return result.

Note: This method does not make any registry calls. The build function is responsible for calling start_task, complete_task, and fail_task.

Source code in stardag/build/_concurrent.py
async def submit(self, task: BaseTask) -> None | TaskStruct | TaskExecutionError:
    """Execute a task and return result.

    Note: This method does not make any registry calls. The build function
    is responsible for calling start_task, complete_task, and fail_task.
    """
    # Check if we're resuming a suspended generator (in-process dynamic deps)
    if task.id in self._suspended_generators:
        gen = self._suspended_generators[task.id]
        if hasattr(gen, "__anext__"):
            return await self._resume_generator_aio(task)
        return self._resume_generator(task)

    # Check if task is pending re-execution (cross-process dynamic deps)
    # Task yielded incomplete deps, deps are now built, re-execute task
    if task.id in self._pending_reexecution:
        self._pending_reexecution.discard(task.id)

    mode = self.execution_mode_selector(task)

    try:
        result = await self._execute_task(task, mode)
        return await self._handle_result(task, result)
    except Exception as e:
        return TaskExecutionError(
            exception=e,
            traceback="".join(tb_module.format_exception(e)),
        )

TaskExecutorABC

Bases: ABC

Abstract base for task executors.

Receives tasks and executes them according to some policy. The executor is responsible for: - Executing tasks in the appropriate context (async/thread/process) - Handling generator suspension for dynamic dependencies

The executor is NOT responsible for: - Dependency resolution - handled by build() - Registry calls (start_task, complete_task, etc.) - handled by build()

submit abstractmethod async

submit(task)

Submit a task for execution.

PARAMETER DESCRIPTION
task

The task to execute.

TYPE: BaseTask

RETURNS DESCRIPTION
None | TaskStruct | TaskExecutionError
  • None: Task completed successfully with no dynamic dependencies.
None | TaskStruct | TaskExecutionError
  • TaskStruct: Task "suspended" because it yielded dynamic dependencies. The returned TaskStruct contains the discovered dependencies.
None | TaskStruct | TaskExecutionError
  • TaskExecutionError: Task failed. Contains the exception and a pre-formatted traceback string captured at the point of failure.
Source code in stardag/build/_base.py
@abstractmethod
async def submit(self, task: BaseTask) -> None | TaskStruct | TaskExecutionError:
    """Submit a task for execution.

    Args:
        task: The task to execute.

    Returns:
        - None: Task completed successfully with no dynamic dependencies.
        - TaskStruct: Task "suspended" because it yielded dynamic dependencies.
            The returned TaskStruct contains the discovered dependencies.
        - TaskExecutionError: Task failed. Contains the exception and a
            pre-formatted traceback string captured at the point of failure.
    """
    ...

setup abstractmethod async

setup()

Setup any resources needed for the task runner (pools, etc.).

Source code in stardag/build/_base.py
@abstractmethod
async def setup(self) -> None:
    """Setup any resources needed for the task runner (pools, etc.)."""
    ...

teardown abstractmethod async

teardown()

Teardown any resources used by the task executor.

Source code in stardag/build/_base.py
@abstractmethod
async def teardown(self) -> None:
    """Teardown any resources used by the task executor."""
    ...

cancel async

cancel(task)

Best-effort cancel an in-flight task.

Default: no-op. The build loop also calls asyncio.Task.cancel() on the future wrapping submit(), which propagates as asyncio.CancelledError into cooperative awaitables (async tasks, modal.Function.remote.aio). Override this for executors that need explicit teardown beyond asyncio cooperation (e.g. cancelling a tracked remote handle).

Effectiveness depends on the executor and how it implements teardown(). For example, HybridConcurrentTaskExecutor cannot reliably terminate thread- or process-pool work from Python, AND its teardown() calls shutdown(wait=True) — so the build will block until the underlying thread/subprocess finishes. Async-only tasks and Modal calls do propagate the cancellation cooperatively and unblock the build promptly.

Source code in stardag/build/_base.py
async def cancel(self, task: BaseTask) -> None:
    """Best-effort cancel an in-flight task.

    Default: no-op. The build loop also calls ``asyncio.Task.cancel()``
    on the future wrapping ``submit()``, which propagates as
    ``asyncio.CancelledError`` into cooperative awaitables (async
    tasks, ``modal.Function.remote.aio``). Override this for executors
    that need explicit teardown beyond asyncio cooperation (e.g.
    cancelling a tracked remote handle).

    Effectiveness depends on the executor and how it implements
    ``teardown()``. For example, ``HybridConcurrentTaskExecutor``
    cannot reliably terminate thread- or process-pool work from
    Python, AND its ``teardown()`` calls ``shutdown(wait=True)`` —
    so the build will block until the underlying thread/subprocess
    finishes. Async-only tasks and Modal calls do propagate the
    cancellation cooperatively and unblock the build promptly.
    """
    pass

build

build(
    tasks,
    task_executor=None,
    fail_mode=FAIL_FAST,
    registry=None,
    max_concurrent_discover=50,
    global_lock_manager=None,
    global_lock_config=None,
    resume_build_id=None,
    register_all=False,
    on_registry_failure="raise",
)

Build tasks concurrently (sync wrapper for build_aio).

This is the recommended entry point for building tasks from synchronous code. Wraps the async build_aio() function.

Note

This function cannot be called from within an already running event loop. If you're in an async context (e.g., inside an async function, or using frameworks like Playwright, FastAPI, etc.), use await build_aio() instead.

Source code in stardag/build/_concurrent.py
def build(
    tasks: Sequence[BaseTask] | BaseTask,
    task_executor: TaskExecutorABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    registry: RegistryABC | None = None,
    max_concurrent_discover: int = 50,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
    resume_build_id: UUID | None = None,
    register_all: bool = False,
    on_registry_failure: OnRegistryFailure = "raise",
) -> BuildSummary:
    """Build tasks concurrently (sync wrapper for build_aio).

    This is the recommended entry point for building tasks from synchronous code.
    Wraps the async build_aio() function.

    Note:
        This function cannot be called from within an already running event loop.
        If you're in an async context (e.g., inside an async function, or using
        frameworks like Playwright, FastAPI, etc.), use `await build_aio()` instead.
    """
    try:
        return asyncio.run(
            build_aio(
                tasks,
                task_executor,
                fail_mode,
                registry,
                max_concurrent_discover,
                global_lock_manager,
                global_lock_config,
                resume_build_id,
                register_all,
                on_registry_failure,
            )
        )
    except RuntimeError as e:
        if "cannot be called from a running event loop" in str(e):
            raise RuntimeError(
                "build() cannot be used from within an already running event loop. "
                "Use 'await build_aio()' instead, or 'build_sequential()' if you "
                "need synchronous execution without an event loop."
            ) from e
        raise

build_aio async

build_aio(
    tasks,
    task_executor=None,
    fail_mode=FAIL_FAST,
    registry=None,
    max_concurrent_discover=50,
    global_lock_manager=None,
    global_lock_config=None,
    resume_build_id=None,
    register_all=False,
    on_registry_failure="raise",
)

Build tasks concurrently using hybrid async/thread/process execution.

This is the main build function for production use. It: - Discovers all tasks in the DAG(s) and registers each one with the registry as soon as it's discovered (so the full DAG is visible in the UI immediately, not progressively as tasks become runnable) - Schedules tasks for execution when dependencies are met - Handles dynamic dependencies via generator suspension - Supports multiple root tasks (built concurrently) - Routes tasks to async/thread/process based on ExecutionModeSelector - Manages all registry interactions (register/start/complete/fail task) - Optionally uses global concurrency locks for distributed execution

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

task_executor

TaskExecutor for executing tasks (default: HybridConcurrentTaskExecutor). Use RoutedTaskExecutor to route tasks to different executors (e.g., Modal).

TYPE: TaskExecutorABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

registry

Registry for tracking builds (default: from registry_provider)

TYPE: RegistryABC | None DEFAULT: None

max_concurrent_discover

Maximum concurrent completion checks during DAG discovery. Higher values speed up discovery for large DAGs with remote targets.

TYPE: int DEFAULT: 50

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution to ensure exactly-once execution across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

Source code in stardag/build/_concurrent.py
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
async def build_aio(
    tasks: Sequence[BaseTask] | BaseTask,
    task_executor: TaskExecutorABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    registry: RegistryABC | None = None,
    max_concurrent_discover: int = 50,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
    resume_build_id: UUID | None = None,
    register_all: bool = False,
    on_registry_failure: OnRegistryFailure = "raise",
) -> BuildSummary:
    """Build tasks concurrently using hybrid async/thread/process execution.

    This is the main build function for production use. It:
    - Discovers all tasks in the DAG(s) and registers each one with the
      registry as soon as it's discovered (so the full DAG is visible in the
      UI immediately, not progressively as tasks become runnable)
    - Schedules tasks for execution when dependencies are met
    - Handles dynamic dependencies via generator suspension
    - Supports multiple root tasks (built concurrently)
    - Routes tasks to async/thread/process based on ExecutionModeSelector
    - Manages all registry interactions (register/start/complete/fail task)
    - Optionally uses global concurrency locks for distributed execution

    Args:
        tasks: List of root tasks to build (and their dependencies) or a single root
            task.
        task_executor: TaskExecutor for executing tasks (default: HybridConcurrentTaskExecutor).
            Use RoutedTaskExecutor to route tasks to different executors (e.g., Modal).
        fail_mode: How to handle task failures
        registry: Registry for tracking builds (default: from registry_provider)
        max_concurrent_discover: Maximum concurrent completion checks during DAG discovery.
            Higher values speed up discovery for large DAGs with remote targets.
        global_lock_manager: Global concurrency lock manager for distributed builds.
            If provided with global_lock_config.enabled=True, tasks will acquire locks
            before execution to ensure exactly-once execution across processes.
        global_lock_config: Configuration for global locking behavior.
        resume_build_id: Optional build ID to resume. If provided, continues tracking
            events under this existing build instead of starting a new one.
        register_all: If True, discovery continues recursing into dependencies of
            already-complete tasks. This ensures all tasks in the DAG get registered
            in the registry (useful for complete DAG visualization). Default False
            for performance — skipping complete subgraphs avoids unnecessary I/O.
        on_registry_failure: How to handle registry call failures. "raise" (default)
            propagates the exception; "warn" logs a warning and continues.

    Returns:
        BuildSummary with status, task counts, and build_id
    """
    if isinstance(tasks, BaseTask):
        tasks = [tasks]
    else:
        tasks = list(tasks)
        for idx, task in enumerate(tasks):
            if not isinstance(task, BaseTask):
                raise ValueError(
                    f"Invalid task at index {idx}: {task} (must be BaseTask)"
                )

    # Determine registry: explicit > registry_provider
    if registry is None:
        registry = registry_provider.get()
    logger.info(f"Using registry: {type(registry).__name__}")

    if task_executor is None:
        task_executor = HybridConcurrentTaskExecutor()

    # Setup global lock selector
    if global_lock_config is None:
        global_lock_config = GlobalLockConfig()
    lock_selector: GlobalLockSelector = DefaultGlobalLockSelector(global_lock_config)

    # Track locks held by this build for manual release
    held_locks: set[str] = set()

    task_count = TaskCount()
    completion_cache: set[UUID] = set()
    error: BaseException | None = None
    # Set when any task fails (or its lock acquisition fails). The main
    # loop processes the whole batch first, then breaks out if FAIL_FAST.
    # Lifting the raise out of process_result keeps sibling completions in
    # the same wait-batch from being silently dropped.
    fail_fast_triggered: bool = False

    # Task execution states
    task_states: dict[UUID, TaskExecutionState] = {}
    # Events for completion signaling
    completion_events: dict[UUID, asyncio.Event] = {}
    # Currently executing tasks
    executing: set[UUID] = set()

    # Tasks found to be already complete during discovery (mark complete in registry
    # after discovery finishes — registration itself happens via the bulk call).
    previously_completed_tasks: list[BaseTask] = []
    # Tasks accumulated during the current discover() walk, in post-order,
    # awaiting the bulk-registration call. Within each subtree this is
    # strict post-order; sibling subtrees may interleave (TaskGroup runs
    # them concurrently), but each task's static deps are always appended
    # before it — which is all the API needs to avoid phantom-creation.
    # Cleared by ``flush_pending_registrations`` after each bulk call.
    pending_registrations: list[BaseTask] = []
    # Per-task event signalling that this task's discover() has finished
    # appending it to pending_registrations. The fast-path
    # ``if task.id in task_states: return`` would otherwise let a sibling
    # discoverer race past — appending its own parent ahead of the still-
    # in-flight dep — re-introducing exactly the phantom window the
    # post-order walk is designed to eliminate (diamond DAGs, shared deps).
    discover_done: dict[UUID, asyncio.Event] = {}

    # Synchronization for concurrent discovery
    discover_lock = asyncio.Lock()
    discover_semaphore = asyncio.Semaphore(max_concurrent_discover)

    # Start or resume build *before* discovery so we have a build_id to
    # register tasks against. Registering during discovery makes the full
    # DAG visible in the UI immediately, not leaves-first as tasks run.
    if resume_build_id is not None:
        build_id = resume_build_id
        logger.info(f"Resuming build: {build_id}")
        # Emit a BUILD_RESUMED event so the registry flips a previously
        # terminal build back to RUNNING and the UI surfaces "running
        # (resumed)". On older registry servers this is a no-op (the
        # endpoint 404s and APIRegistry swallows it with a warning).
        try:
            await registry.build_resume_aio(build_id)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to mark build {build_id} as resumed",
                on_registry_failure,
            )
    else:
        build_id = await registry.build_start_aio(root_tasks=tasks)
        logger.info(f"Started build: {build_id}")

    async def discover(task: BaseTask) -> None:
        """Recursively discover tasks, stopping at already-complete tasks.

        Discovery only populates local state and ``pending_registrations``;
        the actual ``task_register_bulk_aio`` call fires once via
        ``flush_pending_registrations()`` after the whole walk completes.
        Walks in **post-order** so deps appear in ``pending_registrations``
        before their parents — the bulk endpoint processes the array in
        order, so by the time a parent's ``dependency_task_ids`` are
        reconciled the dep rows already exist (no phantom creation in
        ``_reconcile_dependency_edges``).

        Uses concurrent recursion with TaskGroup for parallel discovery,
        with a lock protecting shared data structures and a semaphore
        limiting concurrent completion checks.

        Concurrency invariant: when a sibling coroutine encounters this
        task already-discovered (fast-path), it ``await``s the
        ``discover_done`` event before returning. Without that, a
        diamond-DAG sibling could append its own parent to
        ``pending_registrations`` before this coroutine appends the
        shared dep — re-introducing the phantom window we're trying to
        eliminate.
        """
        # Check if already discovered and reserve our spot (with lock)
        async with discover_lock:
            if task.id in task_states:
                done_event = discover_done[task.id]
                already_seen = True
            else:
                static_deps = flatten_task_struct(task.requires())
                task_states[task.id] = TaskExecutionState(
                    task=task, static_deps=static_deps
                )
                completion_events[task.id] = asyncio.Event()
                discover_done[task.id] = asyncio.Event()
                done_event = discover_done[task.id]
                task_count.discovered += 1
                already_seen = False

        if already_seen:
            # Wait for the original discoverer to finish appending this
            # task to pending_registrations before returning. Otherwise a
            # parent further up our chain would append ahead of the dep.
            await done_event.wait()
            return

        # ``static_deps`` is only assigned in the else-branch above, but
        # pyright can't infer the control flow; pull it back from the
        # state we just stored.
        static_deps = task_states[task.id].static_deps

        try:
            # Check completion outside lock (I/O bound, use semaphore to limit concurrency)
            async with discover_semaphore:
                is_complete = await task.complete_aio()

            if is_complete:
                async with discover_lock:
                    completion_cache.add(task.id)
                    task_states[task.id].completed = True
                    completion_events[task.id].set()
                    task_count.previously_completed += 1
                    previously_completed_tasks.append(task)
                if not register_all:
                    # Don't recurse into deps — they're already built.
                    # Append to pending_registrations (leaf in post-order).
                    pending_registrations.append(task)
                    return

            # Task not complete (or register_all) — recurse into deps
            # first (post-order). TaskGroup waits for all children to
            # finish before this body continues, so all child appends to
            # pending_registrations land before our own append below.
            async with asyncio.TaskGroup() as tg:
                for dep in static_deps:
                    tg.create_task(discover(dep))

            # Append self after children — preserves post-order within
            # subtree.
            pending_registrations.append(task)
        finally:
            # Always set so any sibling fast-path waiters can proceed,
            # even when this discover() raised. (TaskGroup will propagate
            # the failure to siblings via cancellation, but we don't want
            # waiters to deadlock on a never-set event before that
            # cancellation reaches them.)
            done_event.set()

    async def flush_pending_registrations() -> None:
        """Bulk-register every task accumulated since the last flush.

        Called after each discover-walk (initial walk and each dynamic-deps
        walk). Chunks ``batch`` into ``_BULK_REGISTER_CHUNK_SIZE``-sized
        slices to stay within the API's per-call cap and to keep each
        transaction bounded. On chunk failure: ``warn`` mode logs and
        stops processing further chunks — the per-task retry inside
        ``submit_with_lock`` picks up the slack as tasks become runnable.
        ``raise`` mode propagates.
        """
        if not pending_registrations:
            return
        # Snapshot then clear so a recursive discover call inside the same
        # event loop tick can't accidentally re-register these tasks.
        batch = list(pending_registrations)
        pending_registrations.clear()

        for chunk_start in range(0, len(batch), _BULK_REGISTER_CHUNK_SIZE):
            chunk = batch[chunk_start : chunk_start + _BULK_REGISTER_CHUNK_SIZE]
            try:
                await registry.task_register_bulk_aio(build_id, chunk)
            except Exception as reg_err:
                # Include up to 5 task IDs in the warning so debugging is
                # possible without dumping a 1000-id list into logs.
                ids_preview = ", ".join(str(t.id) for t in chunk[:5])
                if len(chunk) > 5:
                    ids_preview += f", +{len(chunk) - 5} more"
                total_chunks = (
                    len(batch) + _BULK_REGISTER_CHUNK_SIZE - 1
                ) // _BULK_REGISTER_CHUNK_SIZE
                this_chunk = (chunk_start // _BULK_REGISTER_CHUNK_SIZE) + 1
                handle_registry_error(
                    reg_err,
                    f"Failed to bulk-register chunk {this_chunk}/{total_chunks} "
                    f"({len(chunk)} tasks; ids: {ids_preview})",
                    on_registry_failure,
                )
                return
            for t in chunk:
                task_states[t.id].registered = True

    # Mark previously completed tasks as complete in the registry. Registration
    # already happened inline in discover() above; we still need to fire
    # task_complete_aio so they appear COMPLETED rather than PENDING — and to
    # self-heal tasks left in "Started" state from a previous build that
    # crashed (their target exists, so they are complete, but the registry
    # still shows them as running).
    async def mark_previously_completed(task: BaseTask) -> None:
        if not task_states[task.id].registered:
            # Registration failed in `warn` mode; skip task_complete since
            # the API row doesn't exist.
            return
        try:
            await registry.task_complete_aio(build_id, task)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to mark previously completed task {task.id} as complete",
                on_registry_failure,
            )

    # Map task_id -> asyncio.Task for in-flight executions
    pending_futures: dict[UUID, asyncio.Task] = {}

    async def process_result(
        task: BaseTask,
        result: LockAcquisitionResult
        | TaskExecutionError
        | BaseException
        | TaskStruct
        | None,
    ):
        """Process a single task result (including lock acquisition results).

        Never raises in FAIL_FAST mode — sets ``fail_fast_triggered`` so the
        main loop can finish processing the current ``done`` batch first
        (otherwise sibling completions in the same batch would be lost) and
        then escalate.
        """
        nonlocal error, fail_fast_triggered
        state = task_states[task.id]

        # Handle lock acquisition results (lock was not acquired)
        if isinstance(result, LockAcquisitionResult):
            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                # Task completed externally - wait for visibility then mark complete
                await wait_for_completion_with_retry(task)
                state.completed = True
                completion_cache.add(task.id)
                completion_events[task.id].set()
                task_count.previously_completed += 1
            else:
                # ERROR, HELD_BY_OTHER, or CONCURRENCY_LIMIT_REACHED after timeout
                msg = f"Lock {result.status.value}"
                if result.error_message:
                    msg += f": {result.error_message}"
                state.exception = Exception(msg)
                task_count.failed += 1
                error = state.exception
                if fail_mode == FailMode.FAIL_FAST:
                    fail_fast_triggered = True
            return

        # Handle normal task execution results
        if isinstance(result, TaskExecutionError):
            # Task failed - release lock (not completed) and notify registry
            await release_lock_for_task(task, completed=False)
            try:
                await registry.task_fail_aio(build_id, task, str(result))
            except Exception as reg_err:
                handle_registry_error(
                    reg_err,
                    f"Failed to notify registry of task {task.id} failure",
                    on_registry_failure,
                )
            state.exception = result.exception
            task_count.failed += 1
            error = result.exception
            if fail_mode == FailMode.FAIL_FAST:
                fail_fast_triggered = True

        elif isinstance(result, BaseException):
            # Backward compat: custom executor returned a bare exception
            await release_lock_for_task(task, completed=False)
            try:
                await registry.task_fail_aio(build_id, task, str(result))
            except Exception as reg_err:
                handle_registry_error(
                    reg_err,
                    f"Failed to notify registry of task {task.id} failure",
                    on_registry_failure,
                )
            state.exception = result
            task_count.failed += 1
            error = result
            if fail_mode == FailMode.FAIL_FAST:
                fail_fast_triggered = True

        elif result is None:
            # Task completed - release lock (completed) and notify registry
            await release_lock_for_task(task, completed=True)
            try:
                await registry.task_complete_aio(build_id, task)
            except Exception as reg_err:
                handle_registry_error(
                    reg_err,
                    f"Failed to notify registry of task {task.id} completion",
                    on_registry_failure,
                )
            # Upload artifacts if any
            try:
                artifacts = await task.artifacts_aio()
                if artifacts:
                    await registry.task_upload_artifacts_aio(build_id, task, artifacts)
            except Exception as artifact_err:
                handle_registry_error(
                    artifact_err,
                    f"Failed to collect/upload artifacts for task {task.id}",
                    on_registry_failure,
                )
            state.completed = True
            completion_cache.add(task.id)
            completion_events[task.id].set()
            task_count.succeeded += 1

        else:
            # Dynamic deps returned (TaskStruct) - task is suspended
            # Note: Lock is still held - release on final completion/failure
            dynamic_deps = flatten_task_struct(result)

            # Notify registry that task is suspended waiting for dynamic deps
            try:
                await registry.task_suspend_aio(build_id, task)
            except Exception as reg_err:
                handle_registry_error(
                    reg_err,
                    f"Failed to notify registry of task {task.id} suspension",
                    on_registry_failure,
                )

            # Discover any new dynamic deps FIRST (which post-order-collects
            # them and their requires() subtree into pending_registrations).
            # Then bulk-register the new batch BEFORE recording the edge —
            # this way the upstream row exists when the edge insert runs,
            # and _reconcile_dependency_edges doesn't have to phantom-
            # create it.
            for dep in dynamic_deps:
                if dep.id not in task_states:
                    await discover(dep)
            await flush_pending_registrations()

            # Now record yielded deps as edges so the DAG view shows them.
            # This is the ONLY place dynamic edges enter the registry —
            # static deps are recorded by task_register via task.requires().
            if dynamic_deps:
                try:
                    await registry.task_add_dependencies_aio(
                        build_id, task, dynamic_deps, is_dynamic=True
                    )
                except Exception as reg_err:
                    handle_registry_error(
                        reg_err,
                        f"Failed to record dynamic deps for task {task.id}",
                        on_registry_failure,
                    )

            # Accumulate dynamic deps (don't overwrite)
            existing_dyn_ids = {d.id for d in state.dynamic_deps}
            for dep in dynamic_deps:
                if dep.id not in existing_dyn_ids:
                    state.dynamic_deps.append(dep)

    def find_ready_tasks() -> list[BaseTask]:
        """Find tasks that are ready to execute."""
        ready: list[BaseTask] = []
        for state in task_states.values():
            if state.completed or state.task.id in executing:
                continue
            if state.exception is not None:
                continue

            # Check all deps (static + dynamic) complete
            all_deps_complete = all(
                task_states[dep.id].completed for dep in state.all_deps
            )
            if all_deps_complete:
                ready.append(state.task)
                executing.add(state.task.id)
        return ready

    async def wait_for_completion_with_retry(task: BaseTask) -> bool:
        """Wait for task.complete_aio() to return True (handles eventual consistency).

        When the lock reports ALREADY_COMPLETED, the task output may not be
        immediately visible due to eventual consistency (e.g., S3). This function
        retries until the output exists or timeout is reached.
        """
        assert global_lock_config is not None
        timeout = global_lock_config.completion_retry_timeout_seconds
        interval = global_lock_config.completion_retry_interval_seconds
        start_time = asyncio.get_event_loop().time()

        while True:
            if await task.complete_aio():
                return True
            elapsed = asyncio.get_event_loop().time() - start_time
            if elapsed >= timeout:
                logger.warning(
                    f"Task {task.id} reported as completed by lock service, "
                    f"but complete_aio() returned False after {timeout}s. "
                    "Treating as complete (eventual consistency)."
                )
                return True
            await asyncio.sleep(interval)

    async def release_lock_for_task(task: BaseTask, completed: bool) -> None:
        """Release lock for task if held."""
        if global_lock_manager is None:
            return
        task_id = str(task.id)
        if task_id not in held_locks:
            return
        try:
            await global_lock_manager.release(task_id, task_completed=completed)
        except Exception as e:
            logger.warning(f"Failed to release lock for task {task_id}: {e}")
        finally:
            held_locks.discard(task_id)

    async def acquire_lock_with_completion_check(
        task: BaseTask,
        task_id: str,
        lock_manager: GlobalConcurrencyLockManager,
        config: GlobalLockConfig,
    ) -> LockAcquisitionResult:
        """Acquire lock with retry/backoff and external completion checking.

        During the retry loop, we also check if the task was completed externally
        (e.g., by another process). This handles the race condition where:
        1. Lock is held by another process
        2. That process completes the task and releases the lock
        3. Before we can re-acquire, we should notice the task is complete
        """
        timeout = config.lock_wait_timeout_seconds
        current_interval = config.lock_wait_initial_interval_seconds
        max_interval = config.lock_wait_max_interval_seconds
        backoff_factor = config.lock_wait_backoff_factor
        state = task_states[task.id]
        notified_waiting = False  # Track if we've already notified registry

        loop = asyncio.get_event_loop()
        start_time = loop.time()

        while True:
            # Try to acquire the lock
            result = await lock_manager.acquire(task_id)

            if result.status == LockAcquisitionStatus.ACQUIRED:
                # Clear waiting flag if we were waiting
                state.waiting_for_lock = False
                return result

            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                state.waiting_for_lock = False
                return result

            if result.status == LockAcquisitionStatus.ERROR:
                state.waiting_for_lock = False
                return result

            # HELD_BY_OTHER or CONCURRENCY_LIMIT_REACHED - retry with backoff
            # Mark task as waiting for lock and notify registry (once)
            if not notified_waiting:
                state.waiting_for_lock = True
                notified_waiting = True
                lock_owner = result.error_message  # May contain owner info
                try:
                    await registry.task_waiting_for_lock_aio(build_id, task, lock_owner)
                except Exception as e:
                    handle_registry_error(
                        e,
                        f"Failed to notify registry of lock wait for task {task.id}",
                        on_registry_failure,
                    )

            if timeout is None:
                return result

            elapsed = loop.time() - start_time
            if elapsed >= timeout:
                return LockAcquisitionResult(
                    status=result.status,
                    acquired=False,
                    error_message=f"Timeout after {timeout}s: {result.status.value}",
                )

            # Check if task was completed externally during the wait
            if await task.complete_aio():
                state.waiting_for_lock = False
                return LockAcquisitionResult(
                    status=LockAcquisitionStatus.ALREADY_COMPLETED,
                    acquired=False,
                )

            logger.debug(
                f"Lock for {task_id} unavailable ({result.status}), "
                f"retrying in {current_interval:.1f}s..."
            )
            await asyncio.sleep(current_interval)
            current_interval = min(current_interval * backoff_factor, max_interval)

    async def submit_with_lock(
        task: BaseTask,
    ) -> LockAcquisitionResult | TaskExecutionError | TaskStruct | None:
        """Submit task for execution, acquiring lock first if enabled.

        This wraps lock acquisition + task execution as a single async unit,
        allowing the main loop to remain non-blocking while waiting for locks.

        Returns:
            - LockAcquisitionResult: If lock was not acquired (ALREADY_COMPLETED,
                ERROR, or timeout). The task was NOT executed.
            - TaskExecutionError | TaskStruct | None: Normal task result if lock was
                acquired (or locking wasn't needed) and task was executed.
        """
        state = task_states[task.id]
        use_lock = global_lock_manager is not None and lock_selector(task)

        if use_lock:
            assert global_lock_manager is not None  # For type checker
            task_id_str = str(task.id)

            # Always acquire lock (even if we think we hold it from a previous
            # dynamic deps yield). This handles:
            # 1. Fresh task execution - normal acquire
            # 2. Task resuming after dynamic deps - re-acquire is safe since
            #    we're the same owner, and handles the case where lock expired
            #    during the wait for deps
            lock_result = await acquire_lock_with_completion_check(
                task, task_id_str, global_lock_manager, global_lock_config
            )

            if lock_result.status != LockAcquisitionStatus.ACQUIRED:
                # Lock not acquired - return the lock result for handling
                return lock_result

            # Lock acquired - track it for release later
            held_locks.add(task_id_str)

        # Now we have the lock (or locking wasn't needed)
        # Start the task in registry. The task was already registered during
        # discover; if registration failed in `warn` mode we retry once here so
        # the /start endpoint doesn't 404.
        if not state.started:
            if not state.registered:
                try:
                    await registry.task_register_aio(build_id, task)
                    state.registered = True
                except Exception as reg_err:
                    handle_registry_error(
                        reg_err,
                        f"Failed to register task {task.id} before start",
                        on_registry_failure,
                    )
            # Skip /start if registration never succeeded — the endpoint
            # would 404 and that hard-fails the build even in `warn` mode.
            if state.registered:
                try:
                    await registry.task_start_aio(build_id, task)
                    state.started = True
                except Exception as reg_err:
                    handle_registry_error(
                        reg_err,
                        f"Failed to start task {task.id}",
                        on_registry_failure,
                    )
        elif state.dynamic_deps:
            # Task was suspended waiting for dynamic deps, now resuming. Same
            # warn-mode protection: no point firing /resume if registration
            # never landed.
            if state.registered:
                try:
                    await registry.task_resume_aio(build_id, task)
                except Exception as reg_err:
                    handle_registry_error(
                        reg_err,
                        f"Failed to resume task {task.id}",
                        on_registry_failure,
                    )

        # Execute the task via the executor
        return await task_executor.submit(task)

    async def cancel_pending_in_flight() -> None:
        """Cancel any still-running ``pending_futures`` and emit ``task_cancel``.

        Called when escalating FAIL_FAST after the first failure, so
        sibling tasks running concurrently (e.g. on Modal) terminate
        promptly instead of being abandoned. Idempotent — safe to call
        multiple times.

        Race-aware: the awaits inside ``process_result`` (registry calls,
        artifact upload) can let other futures finish in the same loop
        iteration. Those already-done futures are processed normally so
        their actual outcome is recorded — only the futures still
        running get cancel-marked.

        The asyncio cancel is what propagates into the executor (e.g.
        Modal's remote-call cancel); the explicit ``task_executor.cancel``
        hook is for executors that need extra teardown (default no-op).
        """
        if not pending_futures:
            return
        snapshot = dict(pending_futures)
        pending_futures.clear()

        # Split: futures that already completed while process_result was
        # awaiting (treat as their actual outcome) vs still-running ones
        # (cancel and mark TASK_CANCELLED).
        completed_tids: list[UUID] = []
        running_tids: list[UUID] = []
        for tid, fut in snapshot.items():
            if fut.done():
                completed_tids.append(tid)
            else:
                running_tids.append(tid)

        for tid in completed_tids:
            executing.discard(tid)
            done_task = task_states[tid].task
            try:
                done_result = snapshot[tid].result()
            except Exception as e:
                done_result = TaskExecutionError(
                    exception=e,
                    traceback="".join(tb_module.format_exception(e)),
                )
            await process_result(done_task, done_result)

        if not running_tids:
            return

        cancelled_tasks: list[BaseTask] = [
            task_states[tid].task for tid in running_tids
        ]
        for tid in running_tids:
            snapshot[tid].cancel()
        for cancel_task in cancelled_tasks:
            try:
                await task_executor.cancel(cancel_task)
            except Exception as e:
                logger.warning(f"Executor cancel failed for {cancel_task.id}: {e}")
        # Drain so cancelled futures don't dangle.
        await asyncio.gather(
            *(snapshot[tid] for tid in running_tids), return_exceptions=True
        )
        for tid in running_tids:
            executing.discard(tid)
        for cancel_task in cancelled_tasks:
            await release_lock_for_task(cancel_task, completed=False)
            state = task_states[cancel_task.id]
            # Only call /cancel when the task was successfully registered;
            # otherwise the endpoint 404s (warn-mode tolerates registration
            # failures). Mirrors the guard pattern around /start, /resume.
            if state.registered:
                try:
                    await registry.task_cancel_aio(build_id, cancel_task)
                except Exception as reg_err:
                    handle_registry_error(
                        reg_err,
                        f"Failed to mark task {cancel_task.id} as cancelled",
                        on_registry_failure,
                    )
            if state.exception is None:
                state.exception = asyncio.CancelledError(
                    "Cancelled by build engine in FAIL_FAST mode"
                )
            task_count.cancelled += 1

    async def emit_skips_for_blocked_tasks() -> None:
        """Emit ``task_skip_aio`` for tasks blocked by failed/cancelled deps.

        Iterates to a fixed point so transitive descendants are also
        skipped. Idempotent: tasks already marked with ``state.exception``
        are filtered out, so re-calling is a no-op.
        """
        while True:
            skipped_this_pass = 0
            for state in task_states.values():
                if state.completed or state.exception is not None:
                    continue
                blocked = any(
                    task_states[dep.id].exception is not None for dep in state.all_deps
                )
                if not blocked:
                    continue
                if state.registered:
                    try:
                        await registry.task_skip_aio(build_id, state.task)
                    except Exception as reg_err:
                        handle_registry_error(
                            reg_err,
                            f"Failed to mark task {state.task.id} as skipped",
                            on_registry_failure,
                        )
                state.exception = _SkippedSentinel()
                task_count.skipped += 1
                skipped_this_pass += 1
            if skipped_this_pass == 0:
                return

    try:
        # Discover all tasks from roots concurrently. Inline-registration
        # makes the full DAG appear in the registry/UI immediately. If any
        # discover() raises (e.g. a task's requires() throws), the outer
        # except below emits build_fail_aio so the build doesn't get stuck
        # in RUNNING state.
        async with asyncio.TaskGroup() as tg:
            for root in tasks:
                tg.create_task(discover(root))

        # Bulk-register every discovered task in one HTTP call. Order is
        # post-order so the API resolves all dependency_task_ids to existing
        # rows without phantom-creating any.
        await flush_pending_registrations()

        # Mark previously-completed tasks as complete (concurrently). Has to
        # happen after registration so the API rows exist; previously_completed
        # is populated during discover() above.
        if previously_completed_tasks:
            async with asyncio.TaskGroup() as tg:
                for task in previously_completed_tasks:
                    tg.create_task(mark_previously_completed(task))

        await task_executor.setup()

        # Main build loop using as_completed pattern
        while True:
            # Check if all roots complete
            all_roots_complete = all(task_states[root.id].completed for root in tasks)
            if all_roots_complete:
                break

            # Find and submit ready tasks
            ready = find_ready_tasks()

            # Submit ready tasks (lock acquisition + execution as single async unit)
            for task in ready:
                async_task = asyncio.create_task(submit_with_lock(task))
                pending_futures[task.id] = async_task

            # If nothing is pending, check for deadlock or completion
            if not pending_futures:
                incomplete = [
                    s
                    for s in task_states.values()
                    if not s.completed and s.exception is None
                ]
                if incomplete:
                    # Check if all incomplete tasks are blocked by failed dependencies
                    def has_failed_dep(state: TaskExecutionState) -> bool:
                        for dep in state.all_deps:
                            dep_state = task_states[dep.id]
                            if dep_state.exception is not None:
                                return True
                        return False

                    truly_blocked = [s for s in incomplete if not has_failed_dep(s)]
                    if truly_blocked:
                        # Real deadlock - tasks blocked without failed deps
                        raise RuntimeError(
                            f"Deadlock: {len(truly_blocked)} tasks cannot proceed. "
                            f"Tasks: {[s.task.id for s in truly_blocked[:5]]}"
                        )
                    # All remaining tasks are blocked by failed deps - exit gracefully
                break

            # Wait for at least one task to complete
            done, _ = await asyncio.wait(
                pending_futures.values(), return_when=asyncio.FIRST_COMPLETED
            )

            # Process completed tasks
            for async_task in done:
                # Find which task this was
                task_id = None
                for tid, fut in pending_futures.items():
                    if fut is async_task:
                        task_id = tid
                        break
                assert task_id is not None

                # Remove from pending and executing
                del pending_futures[task_id]
                executing.discard(task_id)

                # Get result and process
                task = task_states[task_id].task
                try:
                    result = async_task.result()
                except Exception as e:
                    result = TaskExecutionError(
                        exception=e,
                        traceback="".join(tb_module.format_exception(e)),
                    )
                await process_result(task, result)

            # Stop scheduling once any task has flagged FAIL_FAST escalation.
            # Sibling completions in this same `done` batch were already
            # processed above so their task_complete_aio events landed.
            if fail_fast_triggered and fail_mode == FailMode.FAIL_FAST:
                break

            # Check for exit-early condition: all remaining tasks waiting for locks
            if global_lock_config.exit_early_when_all_locked:
                remaining_tasks = [
                    s
                    for s in task_states.values()
                    if not s.completed and s.exception is None
                ]
                if remaining_tasks:
                    all_waiting = all(s.waiting_for_lock for s in remaining_tasks)
                    if all_waiting:
                        reason = (
                            f"All {len(remaining_tasks)} remaining tasks "
                            "are running in other builds"
                        )
                        logger.info(f"Exiting early: {reason}")
                        try:
                            await registry.build_exit_early_aio(build_id, reason)
                        except Exception as reg_err:
                            handle_registry_error(
                                reg_err,
                                "Failed to notify registry of exit early",
                                on_registry_failure,
                            )
                        return BuildSummary(
                            status=BuildExitStatus.EXIT_EARLY,
                            task_count=task_count,
                            build_id=build_id,
                            error=None,
                        )

        # If FAIL_FAST escalated, terminate in-flight siblings before
        # finalising. asyncio.cancel propagates CancelledError into
        # cooperative awaitables (e.g. Modal's remote.aio), so remote
        # containers stop billing rather than being abandoned. Skips
        # are emitted for any task whose deps include a failed/cancelled
        # task — covers both modes (no-op when nothing is blocked).
        if fail_fast_triggered and fail_mode == FailMode.FAIL_FAST:
            await cancel_pending_in_flight()
        await emit_skips_for_blocked_tasks()

        # FAIL_FAST: re-enter the outer except so build_fail_aio fires.
        if error is not None and fail_mode == FailMode.FAIL_FAST:
            raise error

        await registry.build_complete_aio(build_id)
        return BuildSummary(
            status=BuildExitStatus.SUCCESS
            if error is None
            else BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=error,
        )

    except Exception as e:
        # Cancel any in-flight (covers exceptions raised mid-loop) and
        # emit skips before recording the build failure. Both helpers
        # are idempotent so it's safe even when reached via the
        # FAIL_FAST escalation path that already called them.
        try:
            await cancel_pending_in_flight()
            await emit_skips_for_blocked_tasks()
        except Exception as cleanup_err:
            logger.warning(f"Error during build cleanup: {cleanup_err}")
        await registry.build_fail_aio(build_id, str(e))
        if fail_mode == FailMode.FAIL_FAST:
            raise
        return BuildSummary(
            status=BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=e,
        )

    finally:
        await task_executor.teardown()

build_sequential

build_sequential(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    dual_run_default="sync",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
    register_all=False,
    on_registry_failure="raise",
)

Sync API for building tasks sequentially.

This is intended primarily for debugging and testing.

Tasks are registered with the registry as they are discovered (in deterministic DFS order from the roots), so the full DAG appears in the UI immediately rather than progressively as tasks become runnable.

Task execution policy: - Sync-only tasks: run via run() - Async-only tasks: run via asyncio.run(run_aio()). (Does not work if called from within an existing event loop.) - Dual tasks: run via run() if dual_run_default=="sync" (default), else (dual_run_default=="async") via asyncio.run(run_aio()).

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

dual_run_default

For dual tasks, prefer sync or async execution

TYPE: Literal['sync', 'async'] DEFAULT: 'sync'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

Source code in stardag/build/_sequential.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
def build_sequential(
    tasks: Sequence[BaseTask] | BaseTask,
    registry: RegistryABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    dual_run_default: Literal["sync", "async"] = "sync",
    resume_build_id: UUID | None = None,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
    register_all: bool = False,
    on_registry_failure: OnRegistryFailure = "raise",
) -> BuildSummary:
    """Sync API for building tasks sequentially.

    This is intended primarily for debugging and testing.

    Tasks are registered with the registry as they are discovered (in
    deterministic DFS order from the roots), so the full DAG appears in the
    UI immediately rather than progressively as tasks become runnable.

    Task execution policy:
    - Sync-only tasks: run via `run()`
    - Async-only tasks: run via `asyncio.run(run_aio())`. (Does not work if called
        from within an existing event loop.)
    - Dual tasks: run via `run()` if `dual_run_default=="sync"` (default), else
        (`dual_run_default=="async"`) via `asyncio.run(run_aio())`.

    Args:
        tasks: List of root tasks to build (and their dependencies) or a single root
            task.
        registry: Registry for tracking builds
        fail_mode: How to handle task failures
        dual_run_default: For dual tasks, prefer sync or async execution
        resume_build_id: Optional build ID to resume. If provided, continues tracking
            events under this existing build instead of starting a new one.
        global_lock_manager: Global concurrency lock manager for distributed builds.
            If provided with global_lock_config.enabled=True, tasks will acquire locks
            before execution for "exactly once" semantics across processes.
        global_lock_config: Configuration for global locking behavior.
        register_all: If True, discovery continues recursing into dependencies of
            already-complete tasks. This ensures all tasks in the DAG get registered
            in the registry (useful for complete DAG visualization). Default False
            for performance — skipping complete subgraphs avoids unnecessary I/O.
        on_registry_failure: How to handle registry call failures. "raise" (default)
            propagates the exception; "warn" logs a warning and continues.

    Returns:
        BuildSummary with status, task counts, and build_id
    """
    tasks_list = _validate_tasks(tasks)

    if registry is None:
        registry = registry_provider.get()
    if global_lock_config is None:
        global_lock_config = GlobalLockConfig()
    lock_selector: GlobalLockSelector = DefaultGlobalLockSelector(global_lock_config)
    held_locks: set[str] = set()

    task_count = TaskCount()
    completion_cache: set[UUID] = set()
    failed_cache: set[UUID] = set()
    error: Exception | None = None

    # Discover all tasks, stopping at already-complete tasks
    all_tasks: dict[UUID, BaseTask] = {}
    previously_completed_tasks: list[BaseTask] = []
    # Tasks already registered with the registry. Tracked so the per-task
    # retry path inside _run_task_sequential / lock-failure handling
    # doesn't double-register.
    registered_tasks: set[UUID] = set()
    # Tasks accumulated during the current discover() walk (post-order)
    # awaiting the bulk-register call. Cleared by
    # ``flush_pending_registrations()``.
    pending_registrations: list[BaseTask] = []

    # Start or resume build *before* discovery so we have a build_id to
    # register tasks against.
    if resume_build_id is not None:
        build_id = resume_build_id
        # Emit a BUILD_RESUMED event so the registry flips a previously
        # terminal build back to RUNNING. On older registry servers this
        # is a no-op (the endpoint 404s and APIRegistry swallows it).
        try:
            registry.build_resume(build_id)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to mark build {build_id} as resumed",
                on_registry_failure,
            )
    else:
        build_id = registry.build_start(root_tasks=tasks_list)

    def register_task_once(task: BaseTask) -> None:
        """Register a single task (used as a fallback retry path).

        The happy path uses ``flush_pending_registrations`` to bulk-
        register every task collected during a discover walk. This per-
        task fallback exists for the lock-failure branch and for
        ``_run_task_sequential`` when discover-time registration was
        skipped or failed in ``warn`` mode.
        """
        if task.id in registered_tasks:
            return
        try:
            registry.task_register(build_id, task)
            registered_tasks.add(task.id)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to register task {task.id}",
                on_registry_failure,
            )

    def flush_pending_registrations() -> None:
        """Bulk-register every task accumulated since the last flush.

        Called after each discover walk (initial walk and any runtime
        walk triggered by dynamic deps). Chunks the batch into
        ``_BULK_REGISTER_CHUNK_SIZE``-sized slices to stay within the
        API's per-call cap. On chunk failure: ``warn`` mode logs and
        stops processing further chunks; the per-task retry inside
        ``_run_task_sequential`` falls back to per-task register as
        tasks become runnable. ``raise`` mode propagates.
        """
        if not pending_registrations:
            return
        batch = list(pending_registrations)
        pending_registrations.clear()

        for chunk_start in range(0, len(batch), _BULK_REGISTER_CHUNK_SIZE):
            chunk = batch[chunk_start : chunk_start + _BULK_REGISTER_CHUNK_SIZE]
            try:
                registry.task_register_bulk(build_id, chunk)
            except Exception as reg_err:
                ids_preview = ", ".join(str(t.id) for t in chunk[:5])
                if len(chunk) > 5:
                    ids_preview += f", +{len(chunk) - 5} more"
                total_chunks = (
                    len(batch) + _BULK_REGISTER_CHUNK_SIZE - 1
                ) // _BULK_REGISTER_CHUNK_SIZE
                this_chunk = (chunk_start // _BULK_REGISTER_CHUNK_SIZE) + 1
                handle_registry_error(
                    reg_err,
                    f"Failed to bulk-register chunk {this_chunk}/{total_chunks} "
                    f"({len(chunk)} tasks; ids: {ids_preview})",
                    on_registry_failure,
                )
                return
            for t in chunk:
                registered_tasks.add(t.id)

    def discover(task: BaseTask) -> None:
        """Recursively discover tasks, stopping at already-complete tasks.

        Discovery only collects into ``pending_registrations`` (post-order
        — deps first, parents last). The actual bulk-register call fires
        via ``flush_pending_registrations()`` after the walk.
        """
        if task.id in all_tasks:
            return
        all_tasks[task.id] = task
        task_count.discovered += 1

        # Check if this task is already complete
        if task.complete():
            completion_cache.add(task.id)
            task_count.previously_completed += 1
            previously_completed_tasks.append(task)
            if not register_all:
                # Don't recurse into deps - they're already built. Append
                # to pending_registrations (leaf in post-order).
                pending_registrations.append(task)
                return

        # Task not complete (or register_all) — recurse into deps first
        # (post-order), so when we register this task its deps already
        # exist in the API.
        for dep in flatten_task_struct(task.requires()):
            discover(dep)

        # All deps are registered. Append self after children — preserves
        # post-order within the subtree.
        pending_registrations.append(task)

    # Mark previously-completed tasks as complete in the registry. Registration
    # already happened inline in discover(); we still need to fire
    # task_complete so they appear COMPLETED rather than PENDING — and to
    # self-heal tasks left in "Started" state from a previous build that
    # crashed (their target exists, so they are complete, but the registry
    # still shows them as running).
    completed_previously_completed_count = 0

    def mark_pending_previously_completed() -> None:
        """Send task_complete for any previously-completed tasks not yet marked.

        Drains ``previously_completed_tasks`` starting from the last marked
        index. Called both for the initial bulk pass and after any runtime
        ``discover()`` call that might newly surface a complete task (e.g. the
        static dep of a dynamically yielded task).
        """
        nonlocal completed_previously_completed_count
        while completed_previously_completed_count < len(previously_completed_tasks):
            pc_task = previously_completed_tasks[completed_previously_completed_count]
            completed_previously_completed_count += 1
            if pc_task.id not in registered_tasks:
                # Registration failed in `warn` mode; can't mark complete
                # against a row that was never written.
                continue
            try:
                registry.task_complete(build_id, pc_task)
            except Exception as reg_err:
                handle_registry_error(
                    reg_err,
                    f"Failed to mark previously completed task {pc_task.id} as complete",
                    on_registry_failure,
                )

    def runtime_discover(task: BaseTask) -> None:
        """``discover()`` wrapper used after the initial registration pass.

        Walks ``discover()`` (collecting newly-found tasks into
        ``pending_registrations``), bulk-registers them, then sends
        task_complete for any previously-completed tasks the walk
        surfaced.
        """
        discover(task)
        flush_pending_registrations()
        mark_pending_previously_completed()

    def acquire_lock_sync(task: BaseTask) -> LockAcquisitionResult:
        """Acquire lock synchronously with retry/backoff."""
        assert global_lock_manager is not None
        assert global_lock_config is not None
        task_id = str(task.id)
        timeout = global_lock_config.lock_wait_timeout_seconds
        current_interval = global_lock_config.lock_wait_initial_interval_seconds
        max_interval = global_lock_config.lock_wait_max_interval_seconds
        backoff_factor = global_lock_config.lock_wait_backoff_factor
        start_time = time.time()

        while True:
            result = asyncio.run(global_lock_manager.acquire(task_id))

            if result.status == LockAcquisitionStatus.ACQUIRED:
                return result

            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                return result

            if result.status == LockAcquisitionStatus.ERROR:
                return result

            # HELD_BY_OTHER or CONCURRENCY_LIMIT_REACHED - retry with backoff
            if timeout is None:
                return result

            elapsed = time.time() - start_time
            if elapsed >= timeout:
                return LockAcquisitionResult(
                    status=result.status,
                    acquired=False,
                    error_message=f"Timeout after {timeout}s: {result.status.value}",
                )

            # Check if task was completed externally during the wait
            if task.complete():
                return LockAcquisitionResult(
                    status=LockAcquisitionStatus.ALREADY_COMPLETED,
                    acquired=False,
                )

            logger.debug(
                f"Lock for {task_id} unavailable ({result.status}), "
                f"retrying in {current_interval:.1f}s..."
            )
            time.sleep(current_interval)
            current_interval = min(current_interval * backoff_factor, max_interval)

    def release_lock_sync(task: BaseTask, completed: bool) -> None:
        """Release lock for task if held."""
        if global_lock_manager is None:
            return
        task_id = str(task.id)
        if task_id not in held_locks:
            return
        try:
            asyncio.run(global_lock_manager.release(task_id, task_completed=completed))
        except Exception as e:
            logger.warning(f"Failed to release lock for task {task_id}: {e}")
        finally:
            held_locks.discard(task_id)

    try:
        # Discover all tasks. If discover() raises (e.g. requires() /
        # complete() throws), the outer except below emits build_fail so
        # the build doesn't get stuck in RUNNING state.
        for root in tasks_list:
            discover(root)

        # Bulk-register every discovered task in one HTTP call. Order is
        # post-order so the API resolves dependency_task_ids without
        # phantom-creating any rows.
        flush_pending_registrations()

        # Mark previously-completed tasks as complete now that registration
        # has landed.
        mark_pending_previously_completed()

        # Build in topological order
        while True:
            ready_task = _find_ready_task(all_tasks, completion_cache, failed_cache)

            if ready_task is None:
                _check_for_deadlock(all_tasks, completion_cache, failed_cache)
                # All remaining tasks are blocked by failed deps - exit gracefully
                break

            # Acquire lock if needed
            use_lock = global_lock_manager is not None and lock_selector(ready_task)
            if use_lock:
                lock_result = acquire_lock_sync(ready_task)

                if lock_result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                    # Task completed elsewhere - skip execution
                    completion_cache.add(ready_task.id)
                    task_count.previously_completed += 1
                    continue

                if lock_result.status != LockAcquisitionStatus.ACQUIRED:
                    # Lock not acquired - treat as failure
                    task_count.failed += 1
                    failed_cache.add(ready_task.id)
                    error = RuntimeError(
                        f"Failed to acquire lock: {lock_result.error_message}"
                    )
                    # Ensure the task row exists before failing it (no-op if
                    # registration succeeded during discovery; retry if it
                    # failed in `warn` mode).
                    register_task_once(ready_task)
                    try:
                        registry.task_fail(build_id, ready_task, str(error))
                    except Exception as reg_err:
                        handle_registry_error(
                            reg_err,
                            "Failed to notify registry of lock failure",
                            on_registry_failure,
                        )
                    if fail_mode == FailMode.FAIL_FAST:
                        raise error
                    continue

                # Lock acquired - track it
                held_locks.add(str(ready_task.id))

            # Execute the task
            task_completed = False
            try:
                _run_task_sequential(
                    ready_task,
                    completion_cache,
                    all_tasks,
                    build_id,
                    registry,
                    dual_run_default,
                    runtime_discover,
                    register_task_once,
                    task_count,
                    on_registry_failure,
                )
                task_count.succeeded += 1
                task_completed = True
            except Exception as e:
                task_count.failed += 1
                failed_cache.add(ready_task.id)
                error = e
                try:
                    registry.task_fail(build_id, ready_task, str(e))
                except Exception as reg_err:
                    handle_registry_error(
                        reg_err,
                        f"Failed to notify registry of task {ready_task.id} failure",
                        on_registry_failure,
                    )
                if fail_mode == FailMode.FAIL_FAST:
                    raise
            finally:
                if use_lock:
                    release_lock_sync(ready_task, completed=task_completed)

        registry.build_complete(build_id)
        return BuildSummary(
            status=BuildExitStatus.SUCCESS
            if error is None
            else BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=error,
        )

    except Exception as e:
        registry.build_fail(build_id, str(e))
        if fail_mode == FailMode.FAIL_FAST:
            raise
        return BuildSummary(
            status=BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=e,
        )

build_sequential_aio async

build_sequential_aio(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    sync_run_default="blocking",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
    register_all=False,
    on_registry_failure="raise",
)

Async API for building tasks sequentially.

This is intended primarily for debugging and testing.

Tasks are registered with the registry as they are discovered (in deterministic DFS order from the roots), so the full DAG appears in the UI immediately rather than progressively as tasks become runnable.

Task execution policy: - Sync-only tasks: runs blocking via run() in main event loop if sync_run_default=="blocking" (default), else (sync_run_default=="thread") in thread pool. - Async-only tasks: run via await run_aio(). - Dual tasks: run via await run_aio().

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

sync_run_default

For sync-only tasks, block or use thread pool

TYPE: Literal['thread', 'blocking'] DEFAULT: 'blocking'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

Source code in stardag/build/_sequential.py
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
async def build_sequential_aio(
    tasks: Sequence[BaseTask] | BaseTask,
    registry: RegistryABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    sync_run_default: Literal["thread", "blocking"] = "blocking",
    resume_build_id: UUID | None = None,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
    register_all: bool = False,
    on_registry_failure: OnRegistryFailure = "raise",
) -> BuildSummary:
    """Async API for building tasks sequentially.

    This is intended primarily for debugging and testing.

    Tasks are registered with the registry as they are discovered (in
    deterministic DFS order from the roots), so the full DAG appears in the
    UI immediately rather than progressively as tasks become runnable.

    Task execution policy:
    - Sync-only tasks: runs *blocking* via `run()` in main event loop if
        `sync_run_default=="blocking"` (default), else (`sync_run_default=="thread"`)
        in thread pool.
    - Async-only tasks: run via `await run_aio()`.
    - Dual tasks: run via `await run_aio()`.

    Args:
        tasks: List of root tasks to build (and their dependencies) or a single root
            task.
        registry: Registry for tracking builds
        fail_mode: How to handle task failures
        sync_run_default: For sync-only tasks, block or use thread pool
        resume_build_id: Optional build ID to resume. If provided, continues tracking
            events under this existing build instead of starting a new one.
        global_lock_manager: Global concurrency lock manager for distributed builds.
            If provided with global_lock_config.enabled=True, tasks will acquire locks
            before execution for "exactly once" semantics across processes.
        global_lock_config: Configuration for global locking behavior.
        register_all: If True, discovery continues recursing into dependencies of
            already-complete tasks. This ensures all tasks in the DAG get registered
            in the registry (useful for complete DAG visualization). Default False
            for performance — skipping complete subgraphs avoids unnecessary I/O.
        on_registry_failure: How to handle registry call failures. "raise" (default)
            propagates the exception; "warn" logs a warning and continues.

    Returns:
        BuildSummary with status, task counts, and build_id
    """
    tasks_list = _validate_tasks(tasks)

    if registry is None:
        registry = registry_provider.get()
    if global_lock_config is None:
        global_lock_config = GlobalLockConfig()
    lock_selector: GlobalLockSelector = DefaultGlobalLockSelector(global_lock_config)
    held_locks: set[str] = set()

    task_count = TaskCount()
    completion_cache: set[UUID] = set()
    failed_cache: set[UUID] = set()
    error: Exception | None = None

    # Discover all tasks, stopping at already-complete tasks
    all_tasks: dict[UUID, BaseTask] = {}
    previously_completed_tasks: list[BaseTask] = []
    # Tasks already registered with the registry. Tracked so per-task
    # fallback retry paths don't double-register.
    registered_tasks: set[UUID] = set()
    # Tasks accumulated during the current discover() walk (post-order),
    # awaiting bulk registration. Cleared by flush_pending_registrations_aio.
    pending_registrations: list[BaseTask] = []

    # Start or resume build *before* discovery so we have a build_id to
    # register tasks against.
    if resume_build_id is not None:
        build_id = resume_build_id
        # Emit a BUILD_RESUMED event so the registry flips a previously
        # terminal build back to RUNNING. On older registry servers this
        # is a no-op (the endpoint 404s and APIRegistry swallows it).
        try:
            await registry.build_resume_aio(build_id)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to mark build {build_id} as resumed",
                on_registry_failure,
            )
    else:
        build_id = await registry.build_start_aio(root_tasks=tasks_list)

    async def register_task_once_aio(task: BaseTask) -> None:
        """Register a single task (per-task fallback retry path).

        Used by ``_run_task_sequential_aio`` when discover-time bulk
        registration was skipped or failed in ``warn`` mode.
        """
        if task.id in registered_tasks:
            return
        try:
            await registry.task_register_aio(build_id, task)
            registered_tasks.add(task.id)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to register task {task.id}",
                on_registry_failure,
            )

    async def flush_pending_registrations_aio() -> None:
        """Bulk-register every task collected since the last flush.

        Chunks the batch into ``_BULK_REGISTER_CHUNK_SIZE``-sized slices
        to stay within the API's per-call cap. Stops on first chunk
        failure (per-task retry inside ``_run_task_sequential_aio``
        handles the rest in ``warn`` mode).
        """
        if not pending_registrations:
            return
        batch = list(pending_registrations)
        pending_registrations.clear()

        for chunk_start in range(0, len(batch), _BULK_REGISTER_CHUNK_SIZE):
            chunk = batch[chunk_start : chunk_start + _BULK_REGISTER_CHUNK_SIZE]
            try:
                await registry.task_register_bulk_aio(build_id, chunk)
            except Exception as reg_err:
                ids_preview = ", ".join(str(t.id) for t in chunk[:5])
                if len(chunk) > 5:
                    ids_preview += f", +{len(chunk) - 5} more"
                total_chunks = (
                    len(batch) + _BULK_REGISTER_CHUNK_SIZE - 1
                ) // _BULK_REGISTER_CHUNK_SIZE
                this_chunk = (chunk_start // _BULK_REGISTER_CHUNK_SIZE) + 1
                handle_registry_error(
                    reg_err,
                    f"Failed to bulk-register chunk {this_chunk}/{total_chunks} "
                    f"({len(chunk)} tasks; ids: {ids_preview})",
                    on_registry_failure,
                )
                return
            for t in chunk:
                registered_tasks.add(t.id)

    async def discover(task: BaseTask) -> None:
        """Recursively discover tasks, stopping at already-complete tasks.

        Discovery only collects into ``pending_registrations`` in
        post-order (deps first, parents last). The bulk-register call
        fires via ``flush_pending_registrations_aio()`` after the walk.
        """
        if task.id in all_tasks:
            return
        all_tasks[task.id] = task
        task_count.discovered += 1

        # Check if this task is already complete
        if await task.complete_aio():
            completion_cache.add(task.id)
            task_count.previously_completed += 1
            previously_completed_tasks.append(task)
            if not register_all:
                # Don't recurse into deps - they're already built. Append
                # to pending_registrations (leaf in post-order).
                pending_registrations.append(task)
                return

        # Task not complete (or register_all) — recurse into deps first
        # (post-order), so by the time the bulk register call processes
        # this task its deps are already in the array (and thus in the DB).
        for dep in flatten_task_struct(task.requires()):
            await discover(dep)

        # Append self after children — preserves post-order within subtree.
        pending_registrations.append(task)

    # Mark previously-completed tasks as complete in the registry. Registration
    # already happened inline in discover(); we still need to fire
    # task_complete_aio so they appear COMPLETED rather than PENDING.
    completed_previously_completed_count = 0

    async def mark_pending_previously_completed_aio() -> None:
        """Send task_complete for any previously-completed tasks not yet marked.

        Drains ``previously_completed_tasks`` starting from the last marked
        index. Called for the initial bulk pass and after any runtime
        ``discover()`` call that might surface a new complete task (e.g. the
        static dep of a dynamically yielded task).
        """
        nonlocal completed_previously_completed_count
        while completed_previously_completed_count < len(previously_completed_tasks):
            pc_task = previously_completed_tasks[completed_previously_completed_count]
            completed_previously_completed_count += 1
            if pc_task.id not in registered_tasks:
                # Registration failed in `warn` mode; skip task_complete since
                # the API row doesn't exist.
                continue
            try:
                await registry.task_complete_aio(build_id, pc_task)
            except Exception as reg_err:
                handle_registry_error(
                    reg_err,
                    f"Failed to mark previously completed task {pc_task.id} as complete",
                    on_registry_failure,
                )

    async def runtime_discover_aio(task: BaseTask) -> None:
        """``discover()`` wrapper used after the initial registration pass.

        Walks ``discover()`` (collecting newly-found tasks into
        ``pending_registrations``), bulk-registers them, then sends
        task_complete for any previously-completed tasks the walk
        surfaced.
        """
        await discover(task)
        await flush_pending_registrations_aio()
        await mark_pending_previously_completed_aio()

    async def acquire_lock_aio(task: BaseTask) -> LockAcquisitionResult:
        """Acquire lock asynchronously with retry/backoff."""
        assert global_lock_manager is not None
        assert global_lock_config is not None
        task_id = str(task.id)
        timeout = global_lock_config.lock_wait_timeout_seconds
        current_interval = global_lock_config.lock_wait_initial_interval_seconds
        max_interval = global_lock_config.lock_wait_max_interval_seconds
        backoff_factor = global_lock_config.lock_wait_backoff_factor

        loop = asyncio.get_event_loop()
        start_time = loop.time()

        while True:
            result = await global_lock_manager.acquire(task_id)

            if result.status == LockAcquisitionStatus.ACQUIRED:
                return result

            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                return result

            if result.status == LockAcquisitionStatus.ERROR:
                return result

            # HELD_BY_OTHER or CONCURRENCY_LIMIT_REACHED - retry with backoff
            if timeout is None:
                return result

            elapsed = loop.time() - start_time
            if elapsed >= timeout:
                return LockAcquisitionResult(
                    status=result.status,
                    acquired=False,
                    error_message=f"Timeout after {timeout}s: {result.status.value}",
                )

            # Check if task was completed externally during the wait
            if await task.complete_aio():
                return LockAcquisitionResult(
                    status=LockAcquisitionStatus.ALREADY_COMPLETED,
                    acquired=False,
                )

            logger.debug(
                f"Lock for {task_id} unavailable ({result.status}), "
                f"retrying in {current_interval:.1f}s..."
            )
            await asyncio.sleep(current_interval)
            current_interval = min(current_interval * backoff_factor, max_interval)

    async def release_lock_aio(task: BaseTask, completed: bool) -> None:
        """Release lock for task if held."""
        if global_lock_manager is None:
            return
        task_id = str(task.id)
        if task_id not in held_locks:
            return
        try:
            await global_lock_manager.release(task_id, task_completed=completed)
        except Exception as e:
            logger.warning(f"Failed to release lock for task {task_id}: {e}")
        finally:
            held_locks.discard(task_id)

    try:
        # Discover all tasks. If discover() raises (e.g. requires() /
        # complete_aio() throws), the outer except below emits
        # build_fail_aio so the build doesn't get stuck in RUNNING state.
        for root in tasks_list:
            await discover(root)

        # Bulk-register every discovered task in one HTTP call.
        await flush_pending_registrations_aio()

        # Mark previously-completed tasks as complete now that registration
        # has landed.
        await mark_pending_previously_completed_aio()

        # Build in topological order
        while True:
            ready_task = _find_ready_task(all_tasks, completion_cache, failed_cache)

            if ready_task is None:
                _check_for_deadlock(all_tasks, completion_cache, failed_cache)
                # All remaining tasks are blocked by failed deps - exit gracefully
                break

            # Acquire lock if needed
            use_lock = global_lock_manager is not None and lock_selector(ready_task)
            if use_lock:
                lock_result = await acquire_lock_aio(ready_task)

                if lock_result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                    # Task completed elsewhere - skip execution
                    completion_cache.add(ready_task.id)
                    task_count.previously_completed += 1
                    continue

                if lock_result.status != LockAcquisitionStatus.ACQUIRED:
                    # Lock not acquired - treat as failure
                    task_count.failed += 1
                    failed_cache.add(ready_task.id)
                    error = RuntimeError(
                        f"Failed to acquire lock: {lock_result.error_message}"
                    )
                    # Ensure the task row exists before failing it (no-op if
                    # registration succeeded during discovery; retry if it
                    # failed in `warn` mode).
                    await register_task_once_aio(ready_task)
                    try:
                        await registry.task_fail_aio(build_id, ready_task, str(error))
                    except Exception as reg_err:
                        handle_registry_error(
                            reg_err,
                            "Failed to notify registry of lock failure",
                            on_registry_failure,
                        )
                    if fail_mode == FailMode.FAIL_FAST:
                        raise error
                    continue

                # Lock acquired - track it
                held_locks.add(str(ready_task.id))

            # Execute the task
            task_completed = False
            try:
                await _run_task_sequential_aio(
                    ready_task,
                    completion_cache,
                    all_tasks,
                    build_id,
                    registry,
                    sync_run_default,
                    runtime_discover_aio,
                    register_task_once_aio,
                    task_count,
                    on_registry_failure,
                )
                task_count.succeeded += 1
                task_completed = True
            except Exception as e:
                task_count.failed += 1
                failed_cache.add(ready_task.id)
                error = e
                try:
                    await registry.task_fail_aio(build_id, ready_task, str(e))
                except Exception as reg_err:
                    handle_registry_error(
                        reg_err,
                        f"Failed to notify registry of task {ready_task.id} failure",
                        on_registry_failure,
                    )
                if fail_mode == FailMode.FAIL_FAST:
                    raise
            finally:
                if use_lock:
                    await release_lock_aio(ready_task, completed=task_completed)

        await registry.build_complete_aio(build_id)
        return BuildSummary(
            status=BuildExitStatus.SUCCESS
            if error is None
            else BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=error,
        )

    except Exception as e:
        await registry.build_fail_aio(build_id, str(e))
        if fail_mode == FailMode.FAIL_FAST:
            raise
        return BuildSummary(
            status=BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=e,
        )

Target Module

target

target_factory_provider module-attribute

target_factory_provider = resource_provider(
    type_=TargetFactory, default_factory=TargetFactory
)

FileSystemTarget

Bases: Target, Protocol

Minimal base protocol for filesystem-backed targets.

Both FileTarget (file-oriented) and DirectoryTarget (directory-oriented) implement this protocol.

FileTarget

FileTarget(uri)

Bases: _FileTargetGeneric[bytes], Protocol

A file-oriented filesystem target with open/read/write capabilities.

Inherits all file I/O methods from _FileTargetGeneric: open(), proxy_path(), exists(), and their async variants. Concrete implementations: LocalFileTarget, RemoteFileTarget, InMemoryFileTarget.

Source code in stardag/target/_base.py
def __init__(self, uri: str) -> None:
    self.uri = uri

DirectoryTarget

DirectoryTarget(uri, prototype)

Bases: FileSystemTarget

A target representing a directory of file targets.

Manages a collection of sub-targets (files) under a common URI prefix, with a flag file to track completion. Sub-targets are created via get_sub_target() or the / operator.

Source code in stardag/target/_base.py
def __init__(
    self,
    uri: str,
    prototype: typing.Type[FileTarget] | typing.Callable[[str], FileTarget],
) -> None:
    self.uri = uri.removesuffix("/") + "/"
    self.prototype = prototype
    self._flag_target = prototype(self.uri[:-1] + "._DONE")
    self._sub_keys: set[str] = set()

exists_aio async

exists_aio()

Async check if directory is marked as done.

Source code in stardag/target/_base.py
async def exists_aio(self) -> bool:
    """Async check if directory is marked as done."""
    return await self._flag_target.exists_aio()

mark_done_aio async

mark_done_aio()

Async version of mark_done().

Source code in stardag/target/_base.py
async def mark_done_aio(self) -> None:
    """Async version of mark_done()."""
    async with self.sub_keys_target().proxy_path_aio("w") as path:
        async with aiofiles.open(path, "w") as f:
            await f.write("\n".join(sorted(self._sub_keys)))
    async with self._flag_target.proxy_path_aio("w") as path:
        async with aiofiles.open(path, "w") as f:
            await f.write("")  # empty file

LoadableSaveableFileSystemTarget

Bases: LoadableSaveableTarget[LoadedT], FileSystemTarget, Generic[LoadedT], Protocol

A filesystem target (file or directory) that supports load/save.

This is the return type of Task.target(). It provides: - load() -> LoadedT and save(obj: LoadedT) (from LoadableSaveableTarget) - uri: str and exists() -> bool (from FileSystemTarget)

LocalFileTarget

LocalFileTarget(uri)

Bases: FileTarget

TODO use luigi-style atomic writes.

Source code in stardag/target/_base.py
def __init__(self, uri: str) -> None:
    # Expand ~ to user home directory
    self.uri = os.path.expanduser(uri)

exists_aio async

exists_aio()

Asynchronously check if the local file exists.

Source code in stardag/target/_base.py
async def exists_aio(self) -> bool:
    """Asynchronously check if the local file exists."""
    return await aiofiles.os.path.exists(self.path)

TargetFactory

TargetFactory(
    target_roots=None, prefix_to_target_prototype=None
)
Source code in stardag/target/_factory.py
def __init__(
    self,
    target_roots: dict[str, str] | None = None,
    prefix_to_target_prototype: PrefixToFileTargetPrototype | None = None,
) -> None:
    # If no target_roots provided, get from central config
    if target_roots is None:
        target_roots = config_provider.get().target.roots

    self.target_roots = {
        key: value.removesuffix("/") + "/" for key, value in target_roots.items()
    }
    self.prefix_to_target_prototype = (
        prefix_to_target_prototype or get_default_prefix_to_target_prototype()
    )

get_file_target

get_file_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Get a file target.

PARAMETER DESCRIPTION
relpath

The path to the target, relative to the configured root path for target_root_key.

TYPE: str

target_root_key

The key to the target root to use.

TYPE: str DEFAULT: DEFAULT_TARGET_ROOT_KEY

RETURNS DESCRIPTION
FileTarget

A file target.

Source code in stardag/target/_factory.py
def get_file_target(
    self,
    relpath: str,
    target_root_key: str = DEFAULT_TARGET_ROOT_KEY,
) -> FileTarget:
    """Get a file target.

    Args:
        relpath: The path to the target, relative to the configured root path for
            `target_root_key`.
        target_root_key: The key to the target root to use.

    Returns:
        A file target.
    """
    if self._is_full_path(relpath):
        path = relpath
    else:
        path = self.get_path(relpath, target_root_key)
    target_prototype = self._get_target_prototype(path)
    return target_prototype(path)

get_directory_target

get_directory_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Get a directory target.

PARAMETER DESCRIPTION
relpath

The path to the target, relative to the configured root path for target_root_key.

TYPE: str

target_root_key

The key to the target root to use.

TYPE: str DEFAULT: DEFAULT_TARGET_ROOT_KEY

RETURNS DESCRIPTION
DirectoryTarget

A directory target.

Source code in stardag/target/_factory.py
def get_directory_target(
    self,
    relpath: str,
    target_root_key: str = DEFAULT_TARGET_ROOT_KEY,
) -> DirectoryTarget:
    """Get a directory target.

    Args:
        relpath: The path to the target, relative to the configured root path for
            `target_root_key`.
        target_root_key: The key to the target root to use.

    Returns:
        A directory target.
    """
    if self._is_full_path(relpath):
        path = relpath
    else:
        path = self.get_path(relpath, target_root_key)
    target_prototype = self._get_target_prototype(path)
    return DirectoryTarget(path, target_prototype)

get_path

get_path(relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY)

Get the full (/"absolute") path (/"URI") to the target.

Source code in stardag/target/_factory.py
def get_path(
    self, relpath: str, target_root_key: str = DEFAULT_TARGET_ROOT_KEY
) -> str:
    """Get the full (/"absolute") path (/"URI") to the target."""
    target_root = self.target_roots.get(target_root_key)
    if target_root is None:
        example_json = json.dumps({target_root_key: "...", "default": "..."})
        raise ValueError(
            f"No target root is configured for key: '{target_root_key}'. "
            f"Available keys are: {list(self.target_roots.keys())}. Set the missing "
            "target root in your registry config or via environment variable: "
            f"`STARDAG_TARGET_ROOTS='{example_json}'`."
        )

    return f"{target_root}{relpath}"

Registry Module

registry

Task registry module for stardag.

This module provides registry implementations for tracking task execution. The main classes are:

  • RegistryABC: Abstract base class defining the registry interface
  • APIRegistry: Registry that communicates with the stardag-api service
  • NoOpRegistry: A do-nothing registry (default when unconfigured)
  • registry_provider: Resource provider for getting the configured registry
  • RegistryGlobalConcurrencyLockManager: GlobalConcurrencyLockManager using Registry API
  • RegistryLockHandle: LockHandle implementation with automatic TTL renewal

registry_provider module-attribute

registry_provider = resource_provider(
    RegistryABC, init_registry
)

APIRegistry

APIRegistry(
    api_url=None,
    timeout=None,
    environment_id=None,
    api_key=None,
)

Bases: RegistryABC

Registry that stores task information via the stardag-api REST service.

This registry is stateless with respect to build_id - the build_id is passed explicitly to all methods that need it. This allows a single registry instance to be reused across multiple builds (via registry_provider).

Usage

build_id = await registry.build_start_aio(root_tasks=tasks) await registry.task_register_aio(build_id, task) await registry.task_start_aio(build_id, task)

... execute task ...

await registry.task_complete_aio(build_id, task) await registry.build_complete_aio(build_id)

Authentication: - API key can be provided directly or via STARDAG_API_KEY env var - JWT token from browser login (stored in registry credentials)

Configuration is loaded from the central config module (stardag.config).

Source code in stardag/registry/_api_registry.py
def __init__(
    self,
    api_url: str | None = None,
    timeout: float | None = None,
    environment_id: str | None = None,
    api_key: str | None = None,
):
    # Load central config
    config = config_provider.get()
    reg = config.registry

    # Resolve API URL: explicit > config
    resolved_url = api_url or (reg.url if reg else None)
    if not resolved_url:
        raise ValueError(
            "APIRegistry requires a registry URL. "
            "Set STARDAG_API_URL or configure a profile with a registry."
        )
    self.api_url = resolved_url.rstrip("/")

    # Timeout: explicit > config
    self.timeout = (
        timeout
        if timeout is not None
        else (reg.timeout if reg else DEFAULT_API_TIMEOUT)
    )

    # Environment ID: explicit > config
    self.environment_id = environment_id or (reg.environment_id if reg else None)

    # Build auth object
    resolved_api_key = api_key or (
        reg.auth.api_key.get_secret_value() if reg and reg.auth.api_key else None
    )
    if resolved_api_key:
        self._auth: httpx.Auth = StardagAPIKeyAuth(resolved_api_key)
        logger.debug("APIRegistry initialized with API key authentication")
    elif reg and reg.auth.access_token:
        # registry_name is optional — StardagTokenAuth can derive a
        # credential key from the URL when no profile is configured.
        self._auth = StardagTokenAuth(
            access_token=reg.auth.access_token.get_secret_value(),
            workspace_id=reg.workspace_id,
            user_email=reg.auth.user_email,
            registry_url=reg.url,
            registry_name=config.context.registry_name,
        )
        if not self.environment_id:
            logger.warning(
                "APIRegistry: JWT auth requires environment_id. "
                "Run 'stardag config set environment <id>' to set it."
            )
        else:
            logger.debug(
                "APIRegistry initialized with browser login (JWT) authentication"
            )
    else:
        # No auth - pass None; httpx handles this gracefully
        self._auth = None  # type: ignore[assignment]
        logger.warning(
            "APIRegistry initialized without authentication. "
            "Run 'stardag auth login' or set STARDAG_API_KEY env var."
        )

    self._client = None
    self._async_client = None
    self._async_client_loop = (
        None  # Track which event loop the async client belongs to
    )

async_client property

async_client

Lazy-initialized async HTTP client with retry transport.

The client is recreated if the event loop changes, which can happen when running in frameworks like Prefect that create new event loops for task execution.

build_start

build_start(root_tasks=None, description=None)

Start a new build and return its ID.

Source code in stardag/registry/_api_registry.py
def build_start(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Start a new build and return its ID."""
    build_data = {
        "commit_hash": get_git_commit_hash(),
        "root_task_ids": [str(task.id) for task in (root_tasks or [])],
        "description": description,
    }

    response = self._request(
        "POST",
        f"{self.api_url}/api/v1/builds",
        json=build_data,
        params=self._get_params(),
        operation="Start build",
    )
    data = response.json()
    build_id = UUID(data["id"])
    logger.info(f"Started build: {data['name']} (ID: {build_id})")
    return build_id

build_resume

build_resume(build_id)

Mark an existing build as resumed.

Emits a BUILD_RESUMED event server-side so a build that previously terminated (FAILED / COMPLETED / CANCELLED / EXIT_EARLY) flips back to RUNNING. The endpoint is new in the post-resume API; on older servers the request 404s with FastAPI's missing-route body, which we swallow with a warning so the SDK keeps working against an un-upgraded registry. Resource-level 404s (build does not exist) are re-raised.

Source code in stardag/registry/_api_registry.py
def build_resume(self, build_id: UUID) -> None:
    """Mark an existing build as resumed.

    Emits a BUILD_RESUMED event server-side so a build that previously
    terminated (FAILED / COMPLETED / CANCELLED / EXIT_EARLY) flips
    back to RUNNING. The endpoint is new in the post-resume API; on
    older servers the request 404s with FastAPI's missing-route body,
    which we swallow with a warning so the SDK keeps working against
    an un-upgraded registry. Resource-level 404s (build does not
    exist) are re-raised.
    """
    try:
        self._request(
            "POST",
            f"{self.api_url}/api/v1/builds/{build_id}/resume",
            params=self._get_event_params(),
            operation="Resume build",
        )
    except NotFoundError as e:
        if not _is_route_not_found(e):
            raise
        logger.warning(
            "Registry API does not support POST /builds/%s/resume; "
            "the resumed build will keep its previous status in the "
            "registry until you upgrade the API. Build will still run "
            "to completion locally.",
            build_id,
        )
        return
    logger.info(f"Resumed build: {build_id}")

build_complete

build_complete(build_id)

Mark a build as completed.

Source code in stardag/registry/_api_registry.py
def build_complete(self, build_id: UUID) -> None:
    """Mark a build as completed."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/complete",
        params=self._get_event_params(),
        operation="Complete build",
    )
    logger.info(f"Completed build: {build_id}")

build_fail

build_fail(build_id, error_message=None)

Mark a build as failed.

Source code in stardag/registry/_api_registry.py
def build_fail(self, build_id: UUID, error_message: str | None = None) -> None:
    """Mark a build as failed."""
    params = self._get_event_params()
    if error_message:
        params["error_message"] = error_message
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/fail",
        params=params,
        operation="Fail build",
    )
    logger.info(f"Marked build as failed: {build_id}")

build_cancel

build_cancel(build_id)

Cancel a build.

Source code in stardag/registry/_api_registry.py
def build_cancel(self, build_id: UUID) -> None:
    """Cancel a build."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/cancel",
        params=self._get_event_params(),
        operation="Cancel build",
    )
    logger.info(f"Cancelled build: {build_id}")

build_exit_early

build_exit_early(build_id, reason=None)

Mark a build as exited early.

Source code in stardag/registry/_api_registry.py
def build_exit_early(self, build_id: UUID, reason: str | None = None) -> None:
    """Mark a build as exited early."""
    params = self._get_event_params()
    if reason:
        params["reason"] = reason
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/exit-early",
        params=params,
        operation="Exit early",
    )
    logger.info(f"Build exited early: {build_id}")

task_register

task_register(build_id, task)

Register a task within a build.

Source code in stardag/registry/_api_registry.py
def task_register(self, build_id: UUID, task: "BaseTask") -> None:
    """Register a task within a build."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks",
        json=_get_task_data_for_registration(task),
        params=self._get_params(),
        operation=f"Register task {task.id}",
    )

task_register_bulk

task_register_bulk(build_id, tasks)

Bulk-register tasks via the /tasks/bulk endpoint.

Falls back to per-task task_register if the API doesn't support the endpoint (older deployments) — same backwards-compat pattern as task_add_dependencies.

Raises ValueError if the batch exceeds _MAX_BULK_REGISTER_TASKS (mirrors the server cap). The build engine chunks above this method; external callers of APIRegistry get an explicit client-side error rather than a 400 from the server.

Passes ?id_only=true so the server returns only the {id, task_id} mapping rather than echoing back full TaskResponse rows we'd discard anyway. Cuts response size by ~10× for batches with rich task_data.

Source code in stardag/registry/_api_registry.py
def task_register_bulk(self, build_id: UUID, tasks: Sequence["BaseTask"]) -> None:
    """Bulk-register tasks via the ``/tasks/bulk`` endpoint.

    Falls back to per-task ``task_register`` if the API doesn't
    support the endpoint (older deployments) — same backwards-compat
    pattern as ``task_add_dependencies``.

    Raises ``ValueError`` if the batch exceeds
    ``_MAX_BULK_REGISTER_TASKS`` (mirrors the server cap). The build
    engine chunks above this method; external callers of
    ``APIRegistry`` get an explicit client-side error rather than a
    400 from the server.

    Passes ``?id_only=true`` so the server returns only the
    ``{id, task_id}`` mapping rather than echoing back full
    ``TaskResponse`` rows we'd discard anyway. Cuts response size
    by ~10× for batches with rich task_data.
    """
    if not tasks:
        return
    if len(tasks) > _MAX_BULK_REGISTER_TASKS:
        raise ValueError(
            f"task_register_bulk supports at most {_MAX_BULK_REGISTER_TASKS} "
            f"tasks per call (got {len(tasks)}). Chunk the input on the "
            f"caller side."
        )
    try:
        self._request(
            "POST",
            f"{self.api_url}/api/v1/builds/{build_id}/tasks/bulk",
            json={"tasks": [_get_task_data_for_registration(t) for t in tasks]},
            params={**self._get_params(), "id_only": "true"},
            operation=f"Bulk-register {len(tasks)} tasks",
        )
    except NotFoundError as e:
        if not _is_route_not_found(e):
            raise
        logger.warning(
            "Registry API does not support POST /tasks/bulk; "
            "falling back to per-task registration. "
            "Upgrade the Registry API for batched registration."
        )
        for t in tasks:
            self.task_register(build_id, t)

task_start

task_start(build_id, task)

Mark a task as started.

Caller must have already registered the task (via task_register or as a side effect of a parent's static-deps reconciliation). The /start endpoint will 404 otherwise.

Source code in stardag/registry/_api_registry.py
def task_start(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as started.

    Caller must have already registered the task (via ``task_register`` or
    as a side effect of a parent's static-deps reconciliation). The /start
    endpoint will 404 otherwise.
    """
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/start",
        params=self._get_event_params(),
        operation=f"Start task {task.id}",
    )

task_complete

task_complete(build_id, task)

Mark a task as completed.

Source code in stardag/registry/_api_registry.py
def task_complete(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as completed."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/complete",
        params=self._get_event_params(),
        operation=f"Complete task {task.id}",
    )

task_fail

task_fail(build_id, task, error_message=None)

Mark a task as failed.

Source code in stardag/registry/_api_registry.py
def task_fail(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Mark a task as failed."""
    params = self._get_event_params()
    if error_message:
        params["error_message"] = error_message
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/fail",
        params=params,
        operation=f"Fail task {task.id}",
    )

task_suspend

task_suspend(build_id, task)

Mark a task as suspended (waiting for dynamic dependencies).

Source code in stardag/registry/_api_registry.py
def task_suspend(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as suspended (waiting for dynamic dependencies)."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/suspend",
        params=self._get_event_params(),
        operation=f"Suspend task {task.id}",
    )

task_add_dependencies

task_add_dependencies(
    build_id, task, upstream_tasks, is_dynamic=True
)

Record dependency edges for a task.

Backward-compat: an older Registry API that lacks the /dependencies endpoint returns FastAPI's default 404 with the generic "Not Found" detail. We swallow that specific response with a warning so builds don't break on version skew. All other 404s (e.g. our endpoint's explicit "Build not found" or "Task … not registered …" responses) re-raise normally.

Source code in stardag/registry/_api_registry.py
def task_add_dependencies(
    self,
    build_id: UUID,
    task: "BaseTask",
    upstream_tasks: Sequence["BaseTask"],
    is_dynamic: bool = True,
) -> None:
    """Record dependency edges for a task.

    Backward-compat: an older Registry API that lacks the
    ``/dependencies`` endpoint returns FastAPI's default 404 with the
    generic ``"Not Found"`` detail. We swallow that specific response
    with a warning so builds don't break on version skew. All other
    404s (e.g. our endpoint's explicit ``"Build not found"`` or
    ``"Task … not registered …"`` responses) re-raise normally.
    """
    if not upstream_tasks:
        return
    try:
        self._request(
            "POST",
            f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/dependencies",
            json={
                "upstream_task_ids": [str(u.id) for u in upstream_tasks],
                "is_dynamic": is_dynamic,
            },
            params=self._get_params(),
            operation=f"Add dependencies for task {task.id}",
        )
    except NotFoundError as e:
        if not _is_route_not_found(e):
            raise
        logger.warning(
            "Registry API does not support POST /dependencies; "
            "dynamic-dep edges for task %s will not be recorded. "
            "Upgrade the Registry API to see dynamic deps in the DAG view.",
            task.id,
        )

task_resume

task_resume(build_id, task)

Mark a task as resumed (dynamic dependencies completed).

Source code in stardag/registry/_api_registry.py
def task_resume(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as resumed (dynamic dependencies completed)."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/resume",
        params=self._get_event_params(),
        operation=f"Resume task {task.id}",
    )

task_cancel

task_cancel(build_id, task)

Cancel a task.

Source code in stardag/registry/_api_registry.py
def task_cancel(self, build_id: UUID, task: "BaseTask") -> None:
    """Cancel a task."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/cancel",
        params=self._get_event_params(),
        operation=f"Cancel task {task.id}",
    )

task_skip

task_skip(build_id, task)

Skip a task whose dependency failed or was cancelled.

Backward-compat: an older Registry API that lacks the /skip endpoint returns FastAPI's default 404 with the generic "Not Found" detail. We swallow that specific response with a warning so a new SDK against an old API doesn't fail builds on every fail-fast / blocked-dep path. All other 404s (e.g. "Build not found") re-raise normally.

Source code in stardag/registry/_api_registry.py
def task_skip(self, build_id: UUID, task: "BaseTask") -> None:
    """Skip a task whose dependency failed or was cancelled.

    Backward-compat: an older Registry API that lacks the ``/skip``
    endpoint returns FastAPI's default 404 with the generic
    ``"Not Found"`` detail. We swallow that specific response with a
    warning so a new SDK against an old API doesn't fail builds on
    every fail-fast / blocked-dep path. All other 404s (e.g.
    ``"Build not found"``) re-raise normally.
    """
    try:
        self._request(
            "POST",
            f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/skip",
            params=self._get_event_params(),
            operation=f"Skip task {task.id}",
        )
    except NotFoundError as e:
        if not _is_route_not_found(e):
            raise
        logger.warning(
            "Registry API does not support POST /skip; task %s will "
            "remain PENDING in the registry. Upgrade the Registry API "
            "to see SKIPPED status for tasks blocked by failed deps.",
            task.id,
        )

task_waiting_for_lock

task_waiting_for_lock(build_id, task, lock_owner=None)

Record that a task is waiting for a global lock.

Source code in stardag/registry/_api_registry.py
def task_waiting_for_lock(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Record that a task is waiting for a global lock."""
    params = self._get_event_params()
    if lock_owner:
        params["lock_owner"] = lock_owner
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/waiting-for-lock",
        params=params,
        operation=f"Task {task.id} waiting for lock",
    )

task_upload_artifacts

task_upload_artifacts(build_id, task, artifacts)

Upload artifacts for a completed task.

Source code in stardag/registry/_api_registry.py
def task_upload_artifacts(
    self, build_id: UUID, task: "BaseTask", artifacts: Sequence[Artifact]
) -> None:
    """Upload artifacts for a completed task."""
    if not artifacts:
        return

    # Serialize artifacts to API format
    # For all artifact types, body is stored as a dict in body_json
    # - markdown: {"content": "<markdown string>"}
    # - json: the actual JSON data dict
    artifacts_data = []
    for artifact in artifacts:
        data = artifact.model_dump(mode="json")
        if artifact.type == "markdown":
            # Wrap markdown body string in {"content": ...} dict
            data["body"] = {"content": data["body"]}
        artifacts_data.append(data)

    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/artifacts",
        json=artifacts_data,
        params=self._get_params(),
        operation=f"Upload artifacts for task {task.id}",
    )
    logger.debug(f"Uploaded {len(artifacts)} artifacts for task {task.id}")

task_get_metadata

task_get_metadata(task_id)

Get metadata for a registered task.

PARAMETER DESCRIPTION
task_id

The UUID of the task to get metadata for.

TYPE: UUID

RETURNS DESCRIPTION
TaskMetadata

A TaskMetadata object containing task metadata.

Source code in stardag/registry/_api_registry.py
def task_get_metadata(self, task_id: UUID) -> TaskMetadata:
    """Get metadata for a registered task.

    Args:
        task_id: The UUID of the task to get metadata for.

    Returns:
        A TaskMetadata object containing task metadata.
    """

    response = self._request(
        "GET",
        f"{self.api_url}/api/v1/tasks/{task_id}/metadata",
        params=self._get_params(),
        operation=f"Get metadata for task {task_id}",
    )
    data = response.json()

    return TaskMetadata.model_validate(data)

close

close()

Close the HTTP client.

Source code in stardag/registry/_api_registry.py
def close(self) -> None:
    """Close the HTTP client."""
    if self._client is not None:
        self._client.close()
        self._client = None

aclose async

aclose()

Close the async HTTP client.

Source code in stardag/registry/_api_registry.py
async def aclose(self) -> None:
    """Close the async HTTP client."""
    if self._async_client is not None:
        await self._async_client.aclose()
        self._async_client = None

build_start_aio async

build_start_aio(root_tasks=None, description=None)

Async version - start a new build and return its ID.

Source code in stardag/registry/_api_registry.py
async def build_start_aio(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Async version - start a new build and return its ID."""
    build_data = {
        "commit_hash": get_git_commit_hash(),
        "root_task_ids": [str(task.id) for task in (root_tasks or [])],
        "description": description,
    }

    response = await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds",
        json=build_data,
        params=self._get_params(),
        operation="Start build",
    )
    data = response.json()
    build_id = UUID(data["id"])
    logger.info(f"Started build: {data['name']} (ID: {build_id})")
    return build_id

build_resume_aio async

build_resume_aio(build_id)

Async version - mark an existing build as resumed.

See :meth:build_resume for the backward-compat 404 handling.

Source code in stardag/registry/_api_registry.py
async def build_resume_aio(self, build_id: UUID) -> None:
    """Async version - mark an existing build as resumed.

    See :meth:`build_resume` for the backward-compat 404 handling.
    """
    try:
        await self._arequest(
            "POST",
            f"{self.api_url}/api/v1/builds/{build_id}/resume",
            params=self._get_event_params(),
            operation="Resume build",
        )
    except NotFoundError as e:
        if not _is_route_not_found(e):
            raise
        logger.warning(
            "Registry API does not support POST /builds/%s/resume; "
            "the resumed build will keep its previous status in the "
            "registry until you upgrade the API. Build will still run "
            "to completion locally.",
            build_id,
        )
        return
    logger.info(f"Resumed build: {build_id}")

build_complete_aio async

build_complete_aio(build_id)

Async version - mark a build as completed.

Source code in stardag/registry/_api_registry.py
async def build_complete_aio(self, build_id: UUID) -> None:
    """Async version - mark a build as completed."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/complete",
        params=self._get_event_params(),
        operation="Complete build",
    )
    logger.info(f"Completed build: {build_id}")

build_fail_aio async

build_fail_aio(build_id, error_message=None)

Async version - mark a build as failed.

Source code in stardag/registry/_api_registry.py
async def build_fail_aio(
    self, build_id: UUID, error_message: str | None = None
) -> None:
    """Async version - mark a build as failed."""
    params = self._get_event_params()
    if error_message:
        params["error_message"] = error_message
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/fail",
        params=params,
        operation="Fail build",
    )
    logger.info(f"Marked build as failed: {build_id}")

build_cancel_aio async

build_cancel_aio(build_id)

Async version - cancel a build.

Source code in stardag/registry/_api_registry.py
async def build_cancel_aio(self, build_id: UUID) -> None:
    """Async version - cancel a build."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/cancel",
        params=self._get_event_params(),
        operation="Cancel build",
    )
    logger.info(f"Cancelled build: {build_id}")

build_exit_early_aio async

build_exit_early_aio(build_id, reason=None)

Async version - mark build as exited early.

Source code in stardag/registry/_api_registry.py
async def build_exit_early_aio(
    self, build_id: UUID, reason: str | None = None
) -> None:
    """Async version - mark build as exited early."""
    params = self._get_event_params()
    if reason:
        params["reason"] = reason
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/exit-early",
        params=params,
        operation="Exit early",
    )
    logger.info(f"Build exited early: {build_id}")

task_register_aio async

task_register_aio(build_id, task)

Async version - register a task within a build.

Source code in stardag/registry/_api_registry.py
async def task_register_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - register a task within a build."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks",
        json=_get_task_data_for_registration(task),
        params=self._get_params(),
        operation=f"Register task {task.id}",
    )

task_register_bulk_aio async

task_register_bulk_aio(build_id, tasks)

Async bulk-register via /tasks/bulk (one HTTP call instead of N).

Falls back to per-task task_register_aio if the API doesn't support the endpoint (older deployments).

Raises ValueError if the batch exceeds _MAX_BULK_REGISTER_TASKS (mirrors the server cap). The build engine chunks above this method; external callers get an explicit client-side error rather than a 400 from the server.

Passes ?id_only=true so the server returns only the {id, task_id} mapping rather than echoing full TaskResponse rows that we discard. Cuts response size by ~10× for batches with rich task_data.

Source code in stardag/registry/_api_registry.py
async def task_register_bulk_aio(
    self, build_id: UUID, tasks: Sequence["BaseTask"]
) -> None:
    """Async bulk-register via ``/tasks/bulk`` (one HTTP call instead of N).

    Falls back to per-task ``task_register_aio`` if the API doesn't
    support the endpoint (older deployments).

    Raises ``ValueError`` if the batch exceeds
    ``_MAX_BULK_REGISTER_TASKS`` (mirrors the server cap). The build
    engine chunks above this method; external callers get an
    explicit client-side error rather than a 400 from the server.

    Passes ``?id_only=true`` so the server returns only the
    ``{id, task_id}`` mapping rather than echoing full ``TaskResponse``
    rows that we discard. Cuts response size by ~10× for batches
    with rich task_data.
    """
    if not tasks:
        return
    if len(tasks) > _MAX_BULK_REGISTER_TASKS:
        raise ValueError(
            f"task_register_bulk_aio supports at most {_MAX_BULK_REGISTER_TASKS} "
            f"tasks per call (got {len(tasks)}). Chunk the input on the "
            f"caller side."
        )
    try:
        await self._arequest(
            "POST",
            f"{self.api_url}/api/v1/builds/{build_id}/tasks/bulk",
            json={"tasks": [_get_task_data_for_registration(t) for t in tasks]},
            params={**self._get_params(), "id_only": "true"},
            operation=f"Bulk-register {len(tasks)} tasks",
        )
    except NotFoundError as e:
        if not _is_route_not_found(e):
            raise
        logger.warning(
            "Registry API does not support POST /tasks/bulk; "
            "falling back to per-task registration. "
            "Upgrade the Registry API for batched registration."
        )
        for t in tasks:
            await self.task_register_aio(build_id, t)

task_start_aio async

task_start_aio(build_id, task)

Async version - mark a task as started.

Caller must have already registered the task (via task_register_aio or as a side effect of a parent's static-deps reconciliation). The /start endpoint will 404 otherwise.

Source code in stardag/registry/_api_registry.py
async def task_start_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as started.

    Caller must have already registered the task (via ``task_register_aio``
    or as a side effect of a parent's static-deps reconciliation). The
    /start endpoint will 404 otherwise.
    """
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/start",
        params=self._get_event_params(),
        operation=f"Start task {task.id}",
    )

task_complete_aio async

task_complete_aio(build_id, task)

Async version - mark a task as completed.

Source code in stardag/registry/_api_registry.py
async def task_complete_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as completed."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/complete",
        params=self._get_event_params(),
        operation=f"Complete task {task.id}",
    )

task_fail_aio async

task_fail_aio(build_id, task, error_message=None)

Async version - mark a task as failed.

Source code in stardag/registry/_api_registry.py
async def task_fail_aio(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Async version - mark a task as failed."""
    params = self._get_event_params()
    if error_message:
        params["error_message"] = error_message
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/fail",
        params=params,
        operation=f"Fail task {task.id}",
    )

task_suspend_aio async

task_suspend_aio(build_id, task)

Async version - mark a task as suspended.

Source code in stardag/registry/_api_registry.py
async def task_suspend_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as suspended."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/suspend",
        params=self._get_event_params(),
        operation=f"Suspend task {task.id}",
    )

task_add_dependencies_aio async

task_add_dependencies_aio(
    build_id, task, upstream_tasks, is_dynamic=True
)

Async version - record dependency edges for a task.

Same backward-compat behavior as the sync version: only swallow the specific "missing route" 404 (FastAPI default "Not Found"); re-raise genuine resource-not-found 404s.

Source code in stardag/registry/_api_registry.py
async def task_add_dependencies_aio(
    self,
    build_id: UUID,
    task: "BaseTask",
    upstream_tasks: Sequence["BaseTask"],
    is_dynamic: bool = True,
) -> None:
    """Async version - record dependency edges for a task.

    Same backward-compat behavior as the sync version: only swallow
    the specific "missing route" 404 (FastAPI default ``"Not Found"``);
    re-raise genuine resource-not-found 404s.
    """
    if not upstream_tasks:
        return
    try:
        await self._arequest(
            "POST",
            f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/dependencies",
            json={
                "upstream_task_ids": [str(u.id) for u in upstream_tasks],
                "is_dynamic": is_dynamic,
            },
            params=self._get_params(),
            operation=f"Add dependencies for task {task.id}",
        )
    except NotFoundError as e:
        if not _is_route_not_found(e):
            raise
        logger.warning(
            "Registry API does not support POST /dependencies; "
            "dynamic-dep edges for task %s will not be recorded. "
            "Upgrade the Registry API to see dynamic deps in the DAG view.",
            task.id,
        )

task_resume_aio async

task_resume_aio(build_id, task)

Async version - mark a task as resumed.

Source code in stardag/registry/_api_registry.py
async def task_resume_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as resumed."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/resume",
        params=self._get_event_params(),
        operation=f"Resume task {task.id}",
    )

task_cancel_aio async

task_cancel_aio(build_id, task)

Async version - cancel a task.

Source code in stardag/registry/_api_registry.py
async def task_cancel_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - cancel a task."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/cancel",
        params=self._get_event_params(),
        operation=f"Cancel task {task.id}",
    )

task_skip_aio async

task_skip_aio(build_id, task)

Async version - skip a task whose dep failed or was cancelled.

See :meth:task_skip for the backward-compat 404 handling.

Source code in stardag/registry/_api_registry.py
async def task_skip_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - skip a task whose dep failed or was cancelled.

    See :meth:`task_skip` for the backward-compat 404 handling.
    """
    try:
        await self._arequest(
            "POST",
            f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/skip",
            params=self._get_event_params(),
            operation=f"Skip task {task.id}",
        )
    except NotFoundError as e:
        if not _is_route_not_found(e):
            raise
        logger.warning(
            "Registry API does not support POST /skip; task %s will "
            "remain PENDING in the registry. Upgrade the Registry API "
            "to see SKIPPED status for tasks blocked by failed deps.",
            task.id,
        )

task_waiting_for_lock_aio async

task_waiting_for_lock_aio(build_id, task, lock_owner=None)

Async version - record that task is waiting for global lock.

Source code in stardag/registry/_api_registry.py
async def task_waiting_for_lock_aio(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Async version - record that task is waiting for global lock."""
    params = self._get_event_params()
    if lock_owner:
        params["lock_owner"] = lock_owner
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/waiting-for-lock",
        params=params,
        operation=f"Task {task.id} waiting for lock",
    )

task_upload_artifacts_aio async

task_upload_artifacts_aio(build_id, task, artifacts)

Async version - upload artifacts for a completed task.

Source code in stardag/registry/_api_registry.py
async def task_upload_artifacts_aio(
    self, build_id: UUID, task: "BaseTask", artifacts: Sequence[Artifact]
) -> None:
    """Async version - upload artifacts for a completed task."""
    if not artifacts:
        return

    artifacts_data = []
    for artifact in artifacts:
        data = artifact.model_dump(mode="json")
        if artifact.type == "markdown":
            data["body"] = {"content": data["body"]}
        artifacts_data.append(data)

    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/artifacts",
        json=artifacts_data,
        params=self._get_params(),
        operation=f"Upload artifacts for task {task.id}",
    )
    logger.debug(f"Uploaded {len(artifacts)} artifacts for task {task.id}")

task_get_metadata_aio async

task_get_metadata_aio(task_id)

Async version of task_get_metadata.

Source code in stardag/registry/_api_registry.py
async def task_get_metadata_aio(self, task_id: UUID) -> TaskMetadata:
    """Async version of task_get_metadata."""

    response = await self._arequest(
        "GET",
        f"{self.api_url}/api/v1/tasks/{task_id}/metadata",
        params=self._get_params(),
        operation=f"Get metadata for task {task_id}",
    )
    data = response.json()

    return TaskMetadata.model_validate(data)

RegistryABC

Abstract base class for task registries.

A registry tracks task execution within builds. Implementations must provide at least the task_register method. All other methods have default no-op implementations for backwards compatibility.

The registry is stateless with respect to build_id - the build_id is passed explicitly to all methods that need it. This allows a single registry instance to be reused across multiple builds.

Method naming convention: - Build methods: build_ (e.g., build_start, build_complete) - Task methods: task_ (e.g., task_register, task_start) - Async versions: _aio suffix (e.g., build_start_aio, task_register_aio)

build_start

build_start(root_tasks=None, description=None)

Start a new build session.

Called at the beginning of a build. Returns a build ID.

PARAMETER DESCRIPTION
root_tasks

The root tasks being built

TYPE: list[BaseTask] | None DEFAULT: None

description

Optional description of the build

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
UUID

Build ID (UUID) for the new build session.

Source code in stardag/registry/_base.py
def build_start(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Start a new build session.

    Called at the beginning of a build. Returns a build ID.

    Args:
        root_tasks: The root tasks being built
        description: Optional description of the build

    Returns:
        Build ID (UUID) for the new build session.
    """
    return UUID("00000000-0000-0000-0000-000000000000")

build_resume

build_resume(build_id)

Mark an existing build as resumed.

Called when sd.build(resume_build_id=...) reuses an existing build (potentially in a terminal state) instead of starting a new one. The registry should record a BUILD_RESUMED event so the build flips back to RUNNING and the UI can surface a "running (resumed)" affordance.

Default implementation is a no-op so older registry backends keep working unchanged.

PARAMETER DESCRIPTION
build_id

The build UUID being resumed.

TYPE: UUID

Source code in stardag/registry/_base.py
def build_resume(self, build_id: UUID) -> None:
    """Mark an existing build as resumed.

    Called when ``sd.build(resume_build_id=...)`` reuses an existing
    build (potentially in a terminal state) instead of starting a new
    one. The registry should record a BUILD_RESUMED event so the
    build flips back to RUNNING and the UI can surface a
    "running (resumed)" affordance.

    Default implementation is a no-op so older registry backends
    keep working unchanged.

    Args:
        build_id: The build UUID being resumed.
    """
    pass

build_complete

build_complete(build_id)

Mark a build as completed successfully.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

Source code in stardag/registry/_base.py
def build_complete(self, build_id: UUID) -> None:
    """Mark a build as completed successfully.

    Args:
        build_id: The build UUID returned by build_start.
    """
    pass

build_fail

build_fail(build_id, error_message=None)

Mark a build as failed.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

error_message

Optional error message describing the failure.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def build_fail(self, build_id: UUID, error_message: str | None = None) -> None:
    """Mark a build as failed.

    Args:
        build_id: The build UUID returned by build_start.
        error_message: Optional error message describing the failure.
    """
    pass

build_cancel

build_cancel(build_id)

Cancel a build.

Called when a build is explicitly cancelled by the user.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

Source code in stardag/registry/_base.py
def build_cancel(self, build_id: UUID) -> None:
    """Cancel a build.

    Called when a build is explicitly cancelled by the user.

    Args:
        build_id: The build UUID returned by build_start.
    """
    pass

build_exit_early

build_exit_early(build_id, reason=None)

Mark a build as exited early.

Called when all remaining tasks are running in other builds and this build should stop waiting.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

reason

Optional reason for exiting early.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def build_exit_early(self, build_id: UUID, reason: str | None = None) -> None:
    """Mark a build as exited early.

    Called when all remaining tasks are running in other builds
    and this build should stop waiting.

    Args:
        build_id: The build UUID returned by build_start.
        reason: Optional reason for exiting early.
    """
    pass

task_register abstractmethod

task_register(build_id, task)

Register a task as pending/scheduled.

This is called when a task is about to be executed.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task to register.

TYPE: BaseTask

Source code in stardag/registry/_base.py
@abc.abstractmethod
def task_register(self, build_id: UUID, task: "BaseTask") -> None:
    """Register a task as pending/scheduled.

    This is called when a task is about to be executed.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task to register.
    """
    pass

task_register_bulk

task_register_bulk(build_id, tasks)

Register many tasks to a build in a single call.

Default implementation falls back to task_register per task — backends that can batch (e.g. the API registry's bulk endpoint) should override this to make one HTTP call instead of N.

Order of tasks is significant: the SDK's post-order discover walk emits deps before parents so that dependency_task_ids lookups inside the registry resolve to existing rows (no phantom creation). Backends that process the batch as one transaction should preserve array order.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

tasks

Tasks to register, in registration order.

TYPE: Sequence[BaseTask]

Source code in stardag/registry/_base.py
def task_register_bulk(self, build_id: UUID, tasks: Sequence["BaseTask"]) -> None:
    """Register many tasks to a build in a single call.

    Default implementation falls back to ``task_register`` per task —
    backends that can batch (e.g. the API registry's bulk endpoint)
    should override this to make one HTTP call instead of N.

    Order of ``tasks`` is significant: the SDK's post-order discover
    walk emits deps before parents so that ``dependency_task_ids``
    lookups inside the registry resolve to existing rows (no phantom
    creation). Backends that process the batch as one transaction
    should preserve array order.

    Args:
        build_id: The build UUID returned by build_start.
        tasks: Tasks to register, in registration order.
    """
    for task in tasks:
        self.task_register(build_id, task)

task_start

task_start(build_id, task)

Mark a task as started/running.

Called immediately before a task begins execution. The caller is responsible for having already registered the task in the build — task_start only emits the started event.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that is starting.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_start(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as started/running.

    Called immediately before a task begins execution. The caller is
    responsible for having already registered the task in the build —
    ``task_start`` only emits the started event.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that is starting.
    """
    pass

task_complete

task_complete(build_id, task)

Mark a task as completed successfully.

Called after a task finishes execution without errors.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that completed.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_complete(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as completed successfully.

    Called after a task finishes execution without errors.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that completed.
    """
    pass

task_fail

task_fail(build_id, task, error_message=None)

Mark a task as failed.

Called when a task raises an exception during execution.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that failed.

TYPE: BaseTask

error_message

Optional error message describing the failure.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def task_fail(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Mark a task as failed.

    Called when a task raises an exception during execution.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that failed.
        error_message: Optional error message describing the failure.
    """
    pass

task_suspend

task_suspend(build_id, task)

Mark a task as suspended waiting for dynamic dependencies.

Called when a task yields dynamic deps that are not yet complete. The task will remain suspended until its dynamic deps are built.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that is suspended.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_suspend(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as suspended waiting for dynamic dependencies.

    Called when a task yields dynamic deps that are not yet complete.
    The task will remain suspended until its dynamic deps are built.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that is suspended.
    """
    pass

task_add_dependencies

task_add_dependencies(
    build_id, task, upstream_tasks, is_dynamic=True
)

Record dependency edges for a task.

Called by the build system when a task yields dynamic deps — the edges aren't known at task_register time (static requires() chain only), so this is how they reach the registry so that the full DAG renders correctly in the UI.

Registries that can't write to a graph (the in-memory cases) may treat this as a no-op. HTTP-backed implementations should tolerate 404 from older API versions that don't support the endpoint.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The downstream task whose deps are being added.

TYPE: BaseTask

upstream_tasks

The yielded deps to record as edges.

TYPE: Sequence[BaseTask]

is_dynamic

Marks the edges as dynamic (True by default — static requires() are recorded during task_register).

TYPE: bool DEFAULT: True

Source code in stardag/registry/_base.py
def task_add_dependencies(
    self,
    build_id: UUID,
    task: "BaseTask",
    upstream_tasks: Sequence["BaseTask"],
    is_dynamic: bool = True,
) -> None:
    """Record dependency edges for a task.

    Called by the build system when a task yields dynamic deps — the
    edges aren't known at ``task_register`` time (static ``requires()``
    chain only), so this is how they reach the registry so that the
    full DAG renders correctly in the UI.

    Registries that can't write to a graph (the in-memory cases) may
    treat this as a no-op. HTTP-backed implementations should tolerate
    404 from older API versions that don't support the endpoint.

    Args:
        build_id: The build UUID returned by build_start.
        task: The downstream task whose deps are being added.
        upstream_tasks: The yielded deps to record as edges.
        is_dynamic: Marks the edges as dynamic (True by default —
            static ``requires()`` are recorded during task_register).
    """
    pass

task_resume

task_resume(build_id, task)

Mark a task as resumed after dynamic dependencies completed.

Called when a task's dynamic dependencies are complete and the task is ready to continue execution (either by resuming a suspended generator or by re-executing the task).

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that is resuming.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_resume(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as resumed after dynamic dependencies completed.

    Called when a task's dynamic dependencies are complete and
    the task is ready to continue execution (either by resuming
    a suspended generator or by re-executing the task).

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that is resuming.
    """
    pass

task_cancel

task_cancel(build_id, task)

Cancel a task.

Called when a task is cancelled — by the user, or by the build engine when terminating in-flight siblings on a fail-fast failure.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task to cancel.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_cancel(self, build_id: UUID, task: "BaseTask") -> None:
    """Cancel a task.

    Called when a task is cancelled — by the user, or by the build
    engine when terminating in-flight siblings on a fail-fast failure.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task to cancel.
    """
    pass

task_skip

task_skip(build_id, task)

Mark a task as skipped.

Called when a task will not run because a dependency failed or was cancelled. Distinct from task_cancel: skipped tasks never started executing.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task to skip.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_skip(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as skipped.

    Called when a task will not run because a dependency failed or
    was cancelled. Distinct from ``task_cancel``: skipped tasks
    never started executing.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task to skip.
    """
    pass

task_waiting_for_lock

task_waiting_for_lock(build_id, task, lock_owner=None)

Record that a task is waiting for a global lock.

Called when a task cannot acquire its lock because another build is holding it.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task waiting for the lock.

TYPE: BaseTask

lock_owner

Optional identifier of who holds the lock.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def task_waiting_for_lock(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Record that a task is waiting for a global lock.

    Called when a task cannot acquire its lock because another
    build is holding it.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task waiting for the lock.
        lock_owner: Optional identifier of who holds the lock.
    """
    pass

task_upload_artifacts

task_upload_artifacts(build_id, task, artifacts)

Upload artifacts for a completed task.

Called after a task completes successfully if it has artifacts.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The completed task.

TYPE: BaseTask

artifacts

List of artifacts to upload.

TYPE: Sequence[Artifact]

Source code in stardag/registry/_base.py
def task_upload_artifacts(
    self, build_id: UUID, task: "BaseTask", artifacts: Sequence["Artifact"]
) -> None:
    """Upload artifacts for a completed task.

    Called after a task completes successfully if it has artifacts.

    Args:
        build_id: The build UUID returned by build_start.
        task: The completed task.
        artifacts: List of artifacts to upload.
    """
    pass

task_get_metadata abstractmethod

task_get_metadata(task_id)

Get metadata for a registered task.

PARAMETER DESCRIPTION
task_id

The ID of the task to get metadata for.

TYPE: UUID

Returns: A TaskMetadata object containing task metadata.

Source code in stardag/registry/_base.py
@abc.abstractmethod
def task_get_metadata(self, task_id: UUID) -> TaskMetadata:
    """Get metadata for a registered task.

    Args:
        task_id: The ID of the task to get metadata for.
    Returns:
        A TaskMetadata object containing task metadata.
    """
    pass

build_start_aio async

build_start_aio(root_tasks=None, description=None)

Async version of build_start.

Source code in stardag/registry/_base.py
async def build_start_aio(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Async version of build_start."""
    return self.build_start(root_tasks, description)

build_resume_aio async

build_resume_aio(build_id)

Async version of build_resume.

Source code in stardag/registry/_base.py
async def build_resume_aio(self, build_id: UUID) -> None:
    """Async version of build_resume."""
    self.build_resume(build_id)

build_complete_aio async

build_complete_aio(build_id)

Async version of build_complete.

Source code in stardag/registry/_base.py
async def build_complete_aio(self, build_id: UUID) -> None:
    """Async version of build_complete."""
    self.build_complete(build_id)

build_fail_aio async

build_fail_aio(build_id, error_message=None)

Async version of build_fail.

Source code in stardag/registry/_base.py
async def build_fail_aio(
    self, build_id: UUID, error_message: str | None = None
) -> None:
    """Async version of build_fail."""
    self.build_fail(build_id, error_message)

build_cancel_aio async

build_cancel_aio(build_id)

Async version of build_cancel.

Source code in stardag/registry/_base.py
async def build_cancel_aio(self, build_id: UUID) -> None:
    """Async version of build_cancel."""
    self.build_cancel(build_id)

build_exit_early_aio async

build_exit_early_aio(build_id, reason=None)

Async version of build_exit_early.

Source code in stardag/registry/_base.py
async def build_exit_early_aio(
    self, build_id: UUID, reason: str | None = None
) -> None:
    """Async version of build_exit_early."""
    self.build_exit_early(build_id, reason)

task_register_aio async

task_register_aio(build_id, task)

Async version of task_register.

Source code in stardag/registry/_base.py
async def task_register_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_register."""
    self.task_register(build_id, task)

task_register_bulk_aio async

task_register_bulk_aio(build_id, tasks)

Async version of task_register_bulk.

Default implementation falls back to task_register_aio per task. Override for backends that can batch (the API registry does so with the /tasks/bulk endpoint).

Source code in stardag/registry/_base.py
async def task_register_bulk_aio(
    self, build_id: UUID, tasks: Sequence["BaseTask"]
) -> None:
    """Async version of task_register_bulk.

    Default implementation falls back to ``task_register_aio`` per
    task. Override for backends that can batch (the API registry
    does so with the ``/tasks/bulk`` endpoint).
    """
    for task in tasks:
        await self.task_register_aio(build_id, task)

task_start_aio async

task_start_aio(build_id, task)

Async version of task_start.

Source code in stardag/registry/_base.py
async def task_start_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_start."""
    self.task_start(build_id, task)

task_complete_aio async

task_complete_aio(build_id, task)

Async version of task_complete.

Source code in stardag/registry/_base.py
async def task_complete_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_complete."""
    self.task_complete(build_id, task)

task_fail_aio async

task_fail_aio(build_id, task, error_message=None)

Async version of task_fail.

Source code in stardag/registry/_base.py
async def task_fail_aio(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Async version of task_fail."""
    self.task_fail(build_id, task, error_message)

task_suspend_aio async

task_suspend_aio(build_id, task)

Async version of task_suspend.

Source code in stardag/registry/_base.py
async def task_suspend_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_suspend."""
    self.task_suspend(build_id, task)

task_add_dependencies_aio async

task_add_dependencies_aio(
    build_id, task, upstream_tasks, is_dynamic=True
)

Async version of task_add_dependencies.

Source code in stardag/registry/_base.py
async def task_add_dependencies_aio(
    self,
    build_id: UUID,
    task: "BaseTask",
    upstream_tasks: Sequence["BaseTask"],
    is_dynamic: bool = True,
) -> None:
    """Async version of task_add_dependencies."""
    self.task_add_dependencies(build_id, task, upstream_tasks, is_dynamic)

task_resume_aio async

task_resume_aio(build_id, task)

Async version of task_resume.

Source code in stardag/registry/_base.py
async def task_resume_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_resume."""
    self.task_resume(build_id, task)

task_cancel_aio async

task_cancel_aio(build_id, task)

Async version of task_cancel.

Source code in stardag/registry/_base.py
async def task_cancel_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_cancel."""
    self.task_cancel(build_id, task)

task_skip_aio async

task_skip_aio(build_id, task)

Async version of task_skip.

Source code in stardag/registry/_base.py
async def task_skip_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_skip."""
    self.task_skip(build_id, task)

task_waiting_for_lock_aio async

task_waiting_for_lock_aio(build_id, task, lock_owner=None)

Async version of task_waiting_for_lock.

Source code in stardag/registry/_base.py
async def task_waiting_for_lock_aio(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Async version of task_waiting_for_lock."""
    self.task_waiting_for_lock(build_id, task, lock_owner)

task_upload_artifacts_aio async

task_upload_artifacts_aio(build_id, task, artifacts)

Async version of task_upload_artifacts.

Source code in stardag/registry/_base.py
async def task_upload_artifacts_aio(
    self, build_id: UUID, task: "BaseTask", artifacts: Sequence["Artifact"]
) -> None:
    """Async version of task_upload_artifacts."""
    self.task_upload_artifacts(build_id, task, artifacts)

task_get_metadata_aio async

task_get_metadata_aio(task_id)

Async version of task_get_metadata.

Source code in stardag/registry/_base.py
async def task_get_metadata_aio(self, task_id: UUID) -> TaskMetadata:
    """Async version of task_get_metadata."""
    return self.task_get_metadata(task_id)

NoOpRegistry

Bases: RegistryABC

A registry that does nothing.

Used as a default when no registry is configured.

build_start

build_start(root_tasks=None, description=None)

Return a placeholder build ID.

Source code in stardag/registry/_base.py
def build_start(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Return a placeholder build ID."""
    return UUID("00000000-0000-0000-0000-000000000000")

Configuration

config

Centralized configuration for Stardag SDK.

This module provides a unified configuration system that consolidates: - Target factory settings (target roots) - Registry settings (URL, workspace, environment, auth, timeout) - Config context (provenance: which profile/registry name was used)

Configuration is loaded from multiple sources with the following priority: 1. Environment variables (STARDAG_*) 2. Project config (.stardag/config.toml in working directory or parents) 3. User config (~/.stardag/config.toml) 4. Defaults

Usage

from stardag.config import get_config

config = get_config() if config.registry: print(config.registry.url) print(config.target.roots)

Environment Variables (highest priority): STARDAG_PROFILE - Profile name to use (looks up in config.toml) STARDAG_API_URL - Registry API URL override STARDAG_REGISTRY_URL - Deprecated alias for STARDAG_API_URL STARDAG_WORKSPACE_ID - Direct workspace ID override STARDAG_ENVIRONMENT_ID - Direct environment ID override STARDAG_API_KEY - API key for authentication STARDAG_TARGET_ROOTS - JSON dict of target roots (override) STARDAG_NO_REGISTRY - Set to 1/true to force offline/local mode

load_config

load_config(use_project_config=True)

Load configuration from all sources.

Priority (highest to lowest): 1. Environment variables (STARDAG_*) 2. Project config (.stardag/config.toml in repo) 3. User config (~/.stardag/config.toml) 4. Defaults

PARAMETER DESCRIPTION
use_project_config

Whether to load .stardag/config.toml from project.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
StardagConfig

Fully resolved StardagConfig (actual type is StardagConfig).

Source code in stardag/config/loader.py
def load_config(
    use_project_config: bool = True,
) -> StardagConfig:
    """Load configuration from all sources.

    Priority (highest to lowest):
    1. Environment variables (STARDAG_*)
    2. Project config (.stardag/config.toml in repo)
    3. User config (~/.stardag/config.toml)
    4. Defaults

    Args:
        use_project_config: Whether to load .stardag/config.toml from project.

    Returns:
        Fully resolved StardagConfig (actual type is StardagConfig).
    """
    # 1. Load env vars first (highest priority)
    env_settings = StardagSettings()

    # Short-circuit: STARDAG_NO_REGISTRY forces offline/local mode
    if env_settings.no_registry:
        env_target_roots = _parse_target_roots_from_env()
        target_roots = env_target_roots or {
            DEFAULT_TARGET_ROOT_KEY: DEFAULT_TARGET_ROOT
        }
        return StardagConfig(
            registry=None,
            target=TargetConfig(roots=target_roots),
        )

    # 2. Load user and project TOML configs
    user_toml = load_toml_file(get_user_config_path())
    project_toml = {}
    if use_project_config:
        project_path = find_project_config()
        if project_path:
            project_toml = load_toml_file(project_path)

    # Merge configs (project overrides user)
    toml_config = _merge_toml_configs(user_toml, project_toml)

    # 3. Resolve profile -> (registry, user, workspace, environment)
    profile_name: str | None = None
    registry_name: str | None = None
    registry_url: str | None = None
    user: str | None = None
    workspace_id: str | None = None
    environment_id: str | None = None

    # Resolve API URL: STARDAG_API_URL (canonical) or STARDAG_REGISTRY_URL (deprecated)
    explicit_url = env_settings.api_url
    if not explicit_url:
        legacy_url = os.environ.get("STARDAG_REGISTRY_URL")
        if legacy_url:
            import warnings

            warnings.warn(
                "STARDAG_REGISTRY_URL is deprecated, use STARDAG_API_URL instead.",
                DeprecationWarning,
                stacklevel=2,
            )
            explicit_url = legacy_url

    # Check for direct env var overrides first
    if explicit_url:
        registry_url = explicit_url
        workspace_id = env_settings.workspace_id
        environment_id = env_settings.environment_id
        # Even with direct env var overrides, try to inherit user/registry_name
        # from the active profile so that token auth (OIDC refresh) still works.
        _profile_name = env_settings.profile or toml_config.default.get("profile")
        if _profile_name:
            _profile = toml_config.profile.get(_profile_name)
            if _profile:
                profile_name = _profile_name
                registry_name = _profile.registry
                user = _profile.user
    # Then check for profile-based config
    elif env_settings.profile:
        profile_name = env_settings.profile
    # Fall back to default profile from config
    elif toml_config.default.get("profile"):
        profile_name = toml_config.default["profile"]

    # If we have a profile, look it up
    if profile_name and not registry_url:
        profile = toml_config.profile.get(profile_name)
        if profile:
            registry_name = profile.registry
            user = profile.user  # Optional user for multi-user support
            workspace_value = profile.workspace  # Could be slug or ID
            environment_value = profile.environment  # Could be slug or ID

            # Look up registry URL from registry name
            registry_url_from_toml = toml_config.registry.get(registry_name)
            if registry_url_from_toml:
                registry_url = registry_url_from_toml
            else:
                logger.warning(
                    f"Profile '{profile_name}' references unknown registry '{registry_name}'"
                )

            # Resolve workspace slug to ID if needed
            if _looks_like_uuid(workspace_value):
                workspace_id = workspace_value
            else:
                # Try to resolve from cache
                cached_workspace_id = get_cached_workspace_id(
                    registry_name, workspace_value
                )
                if cached_workspace_id:
                    workspace_id = cached_workspace_id
                else:
                    # Store the slug - will need to be resolved at runtime
                    workspace_id = workspace_value
                    logger.debug(
                        f"Workspace '{workspace_value}' is a slug, not cached. "
                        "Run 'stardag auth refresh' to resolve."
                    )

            # Resolve environment slug to ID if needed
            if _looks_like_uuid(environment_value):
                environment_id = environment_value
            elif workspace_id and _looks_like_uuid(workspace_id):
                # Can only resolve environment if we have a resolved workspace ID
                cached_env_id = get_cached_environment_id(
                    registry_name, workspace_id, environment_value
                )
                if cached_env_id:
                    environment_id = cached_env_id
                else:
                    # Store the slug - will need to be resolved at runtime
                    environment_id = environment_value
                    logger.debug(
                        f"Environment '{environment_value}' is a slug, not cached. "
                        "Run 'stardag auth refresh' to resolve."
                    )
            else:
                # Workspace is not resolved, can't resolve environment either
                environment_id = environment_value
        else:
            logger.warning(f"Profile '{profile_name}' not found in config")

    # 4. Resolve target roots
    # Priority: env > cached > default
    target_roots: dict[str, str]
    env_target_roots = _parse_target_roots_from_env()
    if env_target_roots:
        target_roots = env_target_roots
    elif registry_url and workspace_id and environment_id:
        cached_roots = get_cached_target_roots(
            registry_url, workspace_id, environment_id
        )
        if cached_roots:
            target_roots = cached_roots
        else:
            target_roots = {DEFAULT_TARGET_ROOT_KEY: DEFAULT_TARGET_ROOT}
    else:
        target_roots = {DEFAULT_TARGET_ROOT_KEY: DEFAULT_TARGET_ROOT}

    # 5. Load access token from cache (if we have profile info)
    # If token is expired, try to refresh it automatically
    access_token: str | None = None
    if registry_name and workspace_id and user:
        token_cache_path = get_access_token_cache_path(
            registry_name, workspace_id, user
        )
        if token_cache_path.exists():
            token_data = load_json_file(token_cache_path)
            # Check if token is still valid
            import time

            expires_at = token_data.get("expires_at", 0)
            if expires_at > time.time():
                access_token = token_data.get("access_token")

        # If no valid token in cache, try to refresh it
        if not access_token:
            try:
                from stardag.registry._auth import (
                    ensure_access_token as _ensure_token,
                )

                access_token = _ensure_token(
                    registry_name, workspace_id, user, registry_url=registry_url
                )
            except Exception:
                # Silently fail - user can manually refresh with `stardag auth refresh`
                pass

    # 6. Get API key from env
    api_key_raw = os.environ.get("STARDAG_API_KEY")
    api_key: SecretStr | None = env_settings.api_key or (
        SecretStr(api_key_raw) if api_key_raw else None
    )

    # 7. Build canonical RegistryConfig (or None for offline mode)
    registry_cfg: RegistryConfig | None = None
    if registry_url:
        registry_cfg = RegistryConfig(
            url=registry_url,
            workspace_id=workspace_id or "",
            environment_id=environment_id or "",
            auth=RegistryAuth(
                api_key=api_key,
                user_email=user,
                access_token=SecretStr(access_token) if access_token else None,
            ),
            timeout=env_settings.api_timeout or DEFAULT_API_TIMEOUT,
        )

    return StardagConfig(
        registry=registry_cfg,
        target=TargetConfig(roots=target_roots),
        context=ConfigContext(
            profile=profile_name,
            registry_name=registry_name,
        ),
    )