Skip to content

API Reference

Auto-generated API documentation from source code.

Core Module

Stardag: Declarative and composable DAG framework for Python.

Stardag provides a clean Python API for representing persistently stored assets as a declarative Directed Acyclic Graph (DAG).

Basic usage::

import stardag as sd

@sd.task
def get_range(limit: int) -> list[int]:
    return list(range(limit))

@sd.task
def get_sum(integers: sd.Depends[list[int]]) -> int:
    return sum(integers)

task = get_sum(integers=get_range(limit=10))
sd.build(task)
print(task.target().load())  # 45

Core components:

  • :func:task - Decorator for creating tasks from functions
  • :class:Task - Task with automatic serialization and filesystem targets
  • :class:LoadableTask - Abstract base for tasks with load() -> T
  • :class:TargetTask - Base class for tasks with typed target outputs
  • :class:Depends - Dependency injection type annotation
  • :func:build - Execute task and its dependencies

See https://docs.stardag.com for full documentation.

TODO: Expand docstrings for all public API components.

Depends module-attribute

Depends = Annotated[_DependsT, _DependsOnMarker]

TaskLoads module-attribute

TaskLoads = Annotated[
    LoadableTask[LoadedT_co], Polymorphic()
]

TaskStruct module-attribute

TaskStruct = Union[
    "BaseTask",
    Sequence["TaskStruct"],
    Mapping[str, "TaskStruct"],
]

target_factory_provider module-attribute

target_factory_provider = resource_provider(
    type_=TargetFactory, default_factory=TargetFactory
)

BaseTask

Bases: PolymorphicRoot

__init_subclass__

__init_subclass__(**kwargs)

Validate that subclasses implement either run() or run_aio().

Also wraps run() and run_aio() methods with precheck validation.

complete abstractmethod

complete()

Declare if the task is complete.

complete_aio async

complete_aio()

Asynchronously declare if the task is complete.

run

run()

Execute the task logic (sync).

Override this method for synchronous tasks. If you only override run_aio(), this method will automatically run it via asyncio.run().

RETURNS DESCRIPTION
None | Generator[TaskStruct, None, None]

None for simple tasks, or a Generator yielding TaskStruct for

None | Generator[TaskStruct, None, None]

tasks with dynamic dependencies (See Dynamic Dependencies Contract below).

RAISES DESCRIPTION
RuntimeError

If called from within an existing event loop when only run_aio() is implemented. In that case, call run_aio() directly instead.

NotImplementedError

If run_aio() is an async generator (dynamic deps). Async generators cannot be automatically converted to sync generators.

Dynamic Dependencies Contract: When a task yields dynamic dependencies via a generator, the BUILD SYSTEM guarantees that ALL yielded tasks are COMPLETE before the generator is resumed. The task can rely on this contract:

def run(self):
    # Do some initial work to get info about what additional dependencies
    # are needed
    initial_data = "..."

    # Yield deps we need to be built first
    task_a = TaskA(input=initial_data)
    task_b = TaskB(input=initial_data)
    yield [task_a, task_b]

    # CONTRACT: When we reach here, ALL deps are complete.
    # We can safely access their outputs.
    result_a = task_a.target().load()
    result_b = task_b.target().load()

    # Yield more deps if needed
    task_c = TaskC(input=result_a)
    yield task_c

    # Again, TaskC is complete when we reach here
    self.target().save(task_c.target().load() + result_b)

This contract is essential for correctness - tasks can depend on previously yielded tasks being complete before continuing execution.

run_aio async

run_aio()

Execute the task logic (async).

Override this method for asynchronous tasks. If you only override run(), this method will automatically delegate to it.

For dynamic dependencies, you can use 'yield' which makes this an async generator. Note that async generator methods have different type signatures that may require type: ignore comments.

RETURNS DESCRIPTION
None | Generator[TaskStruct, None, None]

None for simple tasks, or a Generator/AsyncGenerator for

None | Generator[TaskStruct, None, None]

tasks with dynamic dependencies.

Dynamic Dependencies Contract

Same as run() - the build system guarantees that ALL yielded tasks are COMPLETE before the generator is resumed. See run() docstring for detailed documentation and examples.

artifacts

artifacts()

Return artifacts to be stored in the registry after task completion.

Override this method to expose rich outputs (reports, summaries, structured data) that will be viewable in the registry UI.

This method is called after the task completes successfully. It should be stateless - loading any required data from the task's target rather than relying on in-memory state.

RETURNS DESCRIPTION
Sequence[Artifact]

Sequence of artifacts (MarkdownArtifact, JSONArtifact, etc.)

artifacts_aio async

artifacts_aio()

Asynchronously return artifacts to be stored in the registry after task completion.

resolve classmethod

resolve(namespace, name, extra)

Override PolymorphicRoot.resolve to handle AliasTask deserialization.

from_registry classmethod

from_registry(id, registry=None)

Instantiate the task from the registry.

PARAMETER DESCRIPTION
id

The UUID (or string representation) of the task to load.

TYPE: UUID | str

registry

An optional registry instance to use for loading metadata. If not provided, the default registry from registry_provider will be used.

TYPE: Union[RegistryABC, None] DEFAULT: None

Returns: An AliasTask instance referencing the specified task.

LoadableTask

Bases: BaseTask, ABC, Generic[LoadedT_co]

A task that can load its output as a typed value.

This is the minimal interface required by :class:~stardag.TaskLoads: any BaseTask subclass that implements load() -> T is compatible with TaskLoads[T].

Both :class:~stardag.Task (via diamond inheritance) and bare subclasses of LoadableTask satisfy TaskLoads[T].

Subclasses must implement at least one of load() or load_aio(). The missing method will delegate to the other automatically (mirroring the run/run_aio pattern on BaseTask).

load

load()

Load the output of this task (sync).

If only load_aio() is implemented, this delegates via asyncio.run(). Raises RuntimeError if called from within an existing event loop.

load_aio async

load_aio()

Asynchronously load the output of this task.

If only load() is implemented, this delegates to it.

TargetTask

Bases: BaseTask, Generic[TargetType]

Base class for tasks that produce a target output.

Extends BaseTask with a typed target() method and a default complete() implementation that checks whether the target exists.

Most users should subclass :class:~stardag.Task (which extends this class with automatic serialization and filesystem target management) rather than using TargetTask directly.

complete

complete()

Check if the task is complete.

complete_aio async

complete_aio()

Asynchronously check if the task is complete.

target abstractmethod

target()

The task output target.

Task

Bases: TargetTask[LoadableSaveableFileSystemTarget[LoadedT]], LoadableTask[LoadedT], ABC, Generic[LoadedT]

A base class for tasks with automatic serialization and filesystem targets.

The target of a Task is a LoadableSaveableFileSystemTarget that uses a serializer inferred from the generic type parameter LoadedT.

The target file path is automatically constructed based on the task's namespace, name, version, and unique ID and has the following structure:

[<relpath_base>/][<namespace>/]<name>/v<version>/[<relpath_extra>/]
<id>[:2]/<id>[2:4]/<id>[/<relpath_filename>].<relpath_extension>

You can override the following properties to customize the target path: _relpath_base, _relpath_extra, _relpath_filename, and _relpath_extension.

See stardag.target.serialize.get_serializer for details on how the serializer is inferred from the generic type parameter, and how to customize it.

Example:

import stardag as sd

class MyTask(sd.Task[dict[str, int]]):
    def run(self):
        self._save({"a": 1, "b": 2})

my_task = MyTask()

print(my_task.target())
# FileSerializable(../MyTask/03/6f/036f6e71-1b3c-54b8-aec1-182359f1e09a.json)

print(my_task.target().serializer)
# <stardag.target.serialize.JSONSerializer at 0x1064e4710>

serializer property

serializer

The serializer used for this task's target.

__map_generic_args_to_ancestor__ classmethod

__map_generic_args_to_ancestor__(ancestor_origin, args)

Map generic args from Task to how they appear on an ancestor class.

This enables type compatibility checking when using Task with polymorphic annotations like TaskLoads[T] and SubClass[TargetTask[LoadableTarget[T]]].

PARAMETER DESCRIPTION
ancestor_origin

The ancestor class to map args to

TYPE: type

args

The generic args of this class (e.g., (str,) for Task[str])

TYPE: tuple

RETURNS DESCRIPTION
tuple | None

The mapped args for the ancestor, or None if mapping is not applicable.

load

load()

Load the task target and run any LoadValidators.

load_aio async

load_aio()

Async load — delegates to the target's load_aio and validates.

TaskRef dataclass

TaskRef(name, version, id)

task

task(
    _func: Callable[_PWrapped, _FuncReturnT],
    *,
    name: str | None = None,
    version: str = "",
    relpath: RelpathSettings
    | _RelpathOverride
    | None = None,
    target_root_key: str | None = None,
) -> Type[_FunctionTask[_FuncReturnT, _PWrapped]]
task(
    *,
    name: str | None = None,
    version: str = "",
    relpath: RelpathSettings
    | _RelpathOverride
    | None = None,
    target_root_key: str | None = None,
) -> _TaskWrapper
task(
    _func=None,
    *,
    name=None,
    version="",
    relpath=None,
    target_root_key=None,
)

build_aio async

build_aio(
    tasks,
    task_executor=None,
    fail_mode=FAIL_FAST,
    registry=None,
    max_concurrent_discover=50,
    global_lock_manager=None,
    global_lock_config=None,
    resume_build_id=None,
    register_all=False,
    on_registry_failure="raise",
)

Build tasks concurrently using hybrid async/thread/process execution.

This is the main build function for production use. It: - Discovers all tasks in the DAG(s) - Schedules tasks for execution when dependencies are met - Handles dynamic dependencies via generator suspension - Supports multiple root tasks (built concurrently) - Routes tasks to async/thread/process based on ExecutionModeSelector - Manages all registry interactions (start/complete/fail task) - Optionally uses global concurrency locks for distributed execution

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

task_executor

TaskExecutor for executing tasks (default: HybridConcurrentTaskExecutor). Use RoutedTaskExecutor to route tasks to different executors (e.g., Modal).

TYPE: TaskExecutorABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

registry

Registry for tracking builds (default: from registry_provider)

TYPE: RegistryABC | None DEFAULT: None

max_concurrent_discover

Maximum concurrent completion checks during DAG discovery. Higher values speed up discovery for large DAGs with remote targets.

TYPE: int DEFAULT: 50

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution to ensure exactly-once execution across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

build_sequential

build_sequential(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    dual_run_default="sync",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
    register_all=False,
    on_registry_failure="raise",
)

Sync API for building tasks sequentially.

This is intended primarily for debugging and testing.

Task execution policy: - Sync-only tasks: run via run() - Async-only tasks: run via asyncio.run(run_aio()). (Does not work if called from within an existing event loop.) - Dual tasks: run via run() if dual_run_default=="sync" (default), else (dual_run_default=="async") via asyncio.run(run_aio()).

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

dual_run_default

For dual tasks, prefer sync or async execution

TYPE: Literal['sync', 'async'] DEFAULT: 'sync'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

build_sequential_aio async

build_sequential_aio(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    sync_run_default="blocking",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
    register_all=False,
    on_registry_failure="raise",
)

Async API for building tasks sequentially.

This is intended primarily for debugging and testing.

Task execution policy: - Sync-only tasks: runs blocking via run() in main event loop if sync_run_default=="blocking" (default), else (sync_run_default=="thread") in thread pool. - Async-only tasks: run via await run_aio(). - Dual tasks: run via await run_aio().

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

sync_run_default

For sync-only tasks, block or use thread pool

TYPE: Literal['thread', 'blocking'] DEFAULT: 'blocking'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

namespace

namespace(namespace, scope)

Set the task namespace for the module and any submodules.

PARAMETER DESCRIPTION
namespace

The namespace to set for the module.

TYPE: str

scope

The module scope, typically passed as __name__.

TYPE: str

Usage:

```python import stardag as sd sd.namespace("my_custom_namespace", name)

class MyNamespacedTask(sd.Task[int]): a: int

def run(self):
    self._save(self.a)

assert MyNamespacedTask.get_namespace() == "my_custom_namespace"

auto_namespace

auto_namespace(scope)

Set the task namespace for the module to the module import path.

PARAMETER DESCRIPTION
scope

The module scope, typically passed as __name__.

TYPE: str

Usage:

import stardag as sd

sd.auto_namespace(__name__)

class MyAutoNamespacedTask(sd.Task[int]):
    a: int

    def run(self):
        self._save(self.a)

assert MyAutoNamespacedTask.get_namespace() == __name__

get_file_target

get_file_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Get a file target for the given relative path.

get_directory_target

get_directory_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Build Module

build

Build module for stardag.

This module provides functions and classes for building task DAGs.

Primary build functions: - build(): Concurrent build, recommended for real workloads from a sync context - build_aio(): Async concurrent build, recommended for real workloads from an async context or already running event loop - build_sequential(): Sync sequential build (for debugging) - build_sequential_aio(): Async sequential build (for debugging)

Task executor: - HybridConcurrentTaskExecutor: Routes tasks to async/thread/process pools

Interfaces: - TaskExecutorABC: Abstract base class for custom task executors - ExecutionModeSelector: Protocol for custom execution mode selection

Global concurrency locking: - GlobalConcurrencyLockManager: Protocol for distributed lock implementations - LockHandle: Protocol for lock handles (async context manager) - GlobalLockConfig: Configuration for global locking behavior

BuildSummary dataclass

BuildSummary(status, task_count, build_id=None, error=None)

Summary of a build execution.

raise_on_failure

raise_on_failure()

Raise :class:BuildFailed if the build status is FAILURE.

Source code in stardag/build/_base.py
def raise_on_failure(self) -> None:
    """Raise :class:`BuildFailed` if the build status is ``FAILURE``."""
    if self.status == BuildExitStatus.FAILURE:
        raise BuildFailed(self)

__repr__

__repr__()

Return a human-readable summary of the build.

Source code in stardag/build/_base.py
def __repr__(self) -> str:
    """Return a human-readable summary of the build."""
    tc = self.task_count
    status_icon = "✓" if self.status == BuildExitStatus.SUCCESS else "✗"
    lines = [
        f"Build {self.status.value.upper()} {status_icon}",
    ]
    if self.build_id:
        lines.append(f"  Build ID: {self.build_id}")
    lines.extend(
        [
            f"  Discovered: {tc.discovered}",
            f"  Previously completed: {tc.previously_completed}",
            f"  Succeeded: {tc.succeeded}",
            f"  Failed: {tc.failed}",
        ]
    )
    if tc.pending > 0:
        lines.append(f"  Pending: {tc.pending}")
    if self.error:
        lines.append(f"  Error: {self.error}")
    return "\n".join(lines)

BuildExitStatus

Bases: StrEnum

FailMode

Bases: StrEnum

How to handle task failures during build.

ATTRIBUTE DESCRIPTION
FAIL_FAST

Stop the build at the first task failure.

CONTINUE

Continue executing all tasks whose dependencies are met, even if some tasks have failed.

HybridConcurrentTaskExecutor

HybridConcurrentTaskExecutor(
    execution_mode_selector=None,
    max_async_workers=10,
    max_thread_workers=10,
    max_process_workers=None,
)

Bases: TaskExecutorABC

Task executor with async, thread, and process pools.

Routes tasks to appropriate execution context based on ExecutionModeSelector. Handles generator suspension for dynamic dependencies.

Note: This executor does not handle registry calls - those are managed by the build() function. The executor only executes tasks and returns results.

For routing tasks to different executors (e.g., some to Modal, some local), use RoutedTaskExecutor to compose multiple executors.

Alternative: For fully async multiprocessing without thread pools, one could implement an AIOMultiprocessingTaskExecutor using libraries like aiomultiprocess.

PARAMETER DESCRIPTION
execution_mode_selector

Callable to select execution mode per task.

TYPE: ExecutionModeSelector | None DEFAULT: None

max_async_workers

Maximum concurrent async tasks (semaphore-based).

TYPE: int DEFAULT: 10

max_thread_workers

Maximum concurrent thread pool workers.

TYPE: int DEFAULT: 10

max_process_workers

Maximum concurrent process pool workers.

TYPE: int | None DEFAULT: None

Source code in stardag/build/_concurrent.py
def __init__(
    self,
    execution_mode_selector: ExecutionModeSelector | None = None,
    max_async_workers: int = 10,
    max_thread_workers: int = 10,
    max_process_workers: int | None = None,
) -> None:
    self.execution_mode_selector = (
        execution_mode_selector or DefaultExecutionModeSelector()
    )
    self.max_async_workers = max_async_workers
    self.max_thread_workers = max_thread_workers
    self.max_process_workers = max_process_workers

    # Pools - initialized in setup()
    self._async_semaphore: asyncio.Semaphore | None = None
    self._thread_pool: ThreadPoolExecutor | None = None
    self._process_pool: ProcessPoolExecutor | None = None

    # Track suspended generators (task_id -> generator)
    # For in-process execution where we can suspend and resume
    self._suspended_generators: dict[UUID, Generator[TaskStruct, None, None]] = {}

    # Track tasks pending re-execution (task_id -> True)
    # For cross-process/remote execution: when task yields incomplete deps,
    # it's re-executed from scratch after deps complete (idempotent re-execution)
    self._pending_reexecution: set[UUID] = set()

setup async

setup()

Initialize worker pools.

Source code in stardag/build/_concurrent.py
async def setup(self) -> None:
    """Initialize worker pools."""
    import multiprocessing as mp

    self._async_semaphore = asyncio.Semaphore(self.max_async_workers)
    self._thread_pool = ThreadPoolExecutor(max_workers=self.max_thread_workers)
    if self.max_process_workers:
        # Use 'spawn' explicitly for cross-platform compatibility.
        # Python 3.14 changed the default from 'fork' to 'forkserver' on Linux,
        # which can cause issues with environment variable inheritance.
        # 'spawn' is the safest option and works consistently across platforms.
        self._process_pool = ProcessPoolExecutor(
            max_workers=self.max_process_workers,
            mp_context=mp.get_context("spawn"),
        )

teardown async

teardown()

Shutdown worker pools.

Source code in stardag/build/_concurrent.py
async def teardown(self) -> None:
    """Shutdown worker pools."""
    if self._thread_pool:
        self._thread_pool.shutdown(wait=True)
        self._thread_pool = None
    if self._process_pool:
        self._process_pool.shutdown(wait=True)
        self._process_pool = None
    self._async_semaphore = None
    self._suspended_generators.clear()
    self._pending_reexecution.clear()

submit async

submit(task)

Execute a task and return result.

Note: This method does not make any registry calls. The build function is responsible for calling start_task, complete_task, and fail_task.

Source code in stardag/build/_concurrent.py
async def submit(self, task: BaseTask) -> None | TaskStruct | TaskExecutionError:
    """Execute a task and return result.

    Note: This method does not make any registry calls. The build function
    is responsible for calling start_task, complete_task, and fail_task.
    """
    # Check if we're resuming a suspended generator (in-process dynamic deps)
    if task.id in self._suspended_generators:
        return self._resume_generator(task)

    # Check if task is pending re-execution (cross-process dynamic deps)
    # Task yielded incomplete deps, deps are now built, re-execute task
    if task.id in self._pending_reexecution:
        self._pending_reexecution.discard(task.id)

    mode = self.execution_mode_selector(task)

    try:
        result = await self._execute_task(task, mode)
        return self._handle_result(task, result)
    except Exception as e:
        return TaskExecutionError(
            exception=e,
            traceback="".join(tb_module.format_exception(e)),
        )

TaskExecutorABC

Bases: ABC

Abstract base for task executors.

Receives tasks and executes them according to some policy. The executor is responsible for: - Executing tasks in the appropriate context (async/thread/process) - Handling generator suspension for dynamic dependencies

The executor is NOT responsible for: - Dependency resolution - handled by build() - Registry calls (start_task, complete_task, etc.) - handled by build()

submit abstractmethod async

submit(task)

Submit a task for execution.

PARAMETER DESCRIPTION
task

The task to execute.

TYPE: BaseTask

RETURNS DESCRIPTION
None | TaskStruct | TaskExecutionError
  • None: Task completed successfully with no dynamic dependencies.
None | TaskStruct | TaskExecutionError
  • TaskStruct: Task "suspended" because it yielded dynamic dependencies. The returned TaskStruct contains the discovered dependencies.
None | TaskStruct | TaskExecutionError
  • TaskExecutionError: Task failed. Contains the exception and a pre-formatted traceback string captured at the point of failure.
Source code in stardag/build/_base.py
@abstractmethod
async def submit(self, task: BaseTask) -> None | TaskStruct | TaskExecutionError:
    """Submit a task for execution.

    Args:
        task: The task to execute.

    Returns:
        - None: Task completed successfully with no dynamic dependencies.
        - TaskStruct: Task "suspended" because it yielded dynamic dependencies.
            The returned TaskStruct contains the discovered dependencies.
        - TaskExecutionError: Task failed. Contains the exception and a
            pre-formatted traceback string captured at the point of failure.
    """
    ...

setup abstractmethod async

setup()

Setup any resources needed for the task runner (pools, etc.).

Source code in stardag/build/_base.py
@abstractmethod
async def setup(self) -> None:
    """Setup any resources needed for the task runner (pools, etc.)."""
    ...

teardown abstractmethod async

teardown()

Teardown any resources used by the task executor.

Source code in stardag/build/_base.py
@abstractmethod
async def teardown(self) -> None:
    """Teardown any resources used by the task executor."""
    ...

build

build(
    tasks,
    task_executor=None,
    fail_mode=FAIL_FAST,
    registry=None,
    max_concurrent_discover=50,
    global_lock_manager=None,
    global_lock_config=None,
    resume_build_id=None,
    register_all=False,
    on_registry_failure="raise",
)

Build tasks concurrently (sync wrapper for build_aio).

This is the recommended entry point for building tasks from synchronous code. Wraps the async build_aio() function.

Note

This function cannot be called from within an already running event loop. If you're in an async context (e.g., inside an async function, or using frameworks like Playwright, FastAPI, etc.), use await build_aio() instead.

Source code in stardag/build/_concurrent.py
def build(
    tasks: Sequence[BaseTask] | BaseTask,
    task_executor: TaskExecutorABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    registry: RegistryABC | None = None,
    max_concurrent_discover: int = 50,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
    resume_build_id: UUID | None = None,
    register_all: bool = False,
    on_registry_failure: OnRegistryFailure = "raise",
) -> BuildSummary:
    """Build tasks concurrently (sync wrapper for build_aio).

    This is the recommended entry point for building tasks from synchronous code.
    Wraps the async build_aio() function.

    Note:
        This function cannot be called from within an already running event loop.
        If you're in an async context (e.g., inside an async function, or using
        frameworks like Playwright, FastAPI, etc.), use `await build_aio()` instead.
    """
    try:
        return asyncio.run(
            build_aio(
                tasks,
                task_executor,
                fail_mode,
                registry,
                max_concurrent_discover,
                global_lock_manager,
                global_lock_config,
                resume_build_id,
                register_all,
                on_registry_failure,
            )
        )
    except RuntimeError as e:
        if "cannot be called from a running event loop" in str(e):
            raise RuntimeError(
                "build() cannot be used from within an already running event loop. "
                "Use 'await build_aio()' instead, or 'build_sequential()' if you "
                "need synchronous execution without an event loop."
            ) from e
        raise

build_aio async

build_aio(
    tasks,
    task_executor=None,
    fail_mode=FAIL_FAST,
    registry=None,
    max_concurrent_discover=50,
    global_lock_manager=None,
    global_lock_config=None,
    resume_build_id=None,
    register_all=False,
    on_registry_failure="raise",
)

Build tasks concurrently using hybrid async/thread/process execution.

This is the main build function for production use. It: - Discovers all tasks in the DAG(s) - Schedules tasks for execution when dependencies are met - Handles dynamic dependencies via generator suspension - Supports multiple root tasks (built concurrently) - Routes tasks to async/thread/process based on ExecutionModeSelector - Manages all registry interactions (start/complete/fail task) - Optionally uses global concurrency locks for distributed execution

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

task_executor

TaskExecutor for executing tasks (default: HybridConcurrentTaskExecutor). Use RoutedTaskExecutor to route tasks to different executors (e.g., Modal).

TYPE: TaskExecutorABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

registry

Registry for tracking builds (default: from registry_provider)

TYPE: RegistryABC | None DEFAULT: None

max_concurrent_discover

Maximum concurrent completion checks during DAG discovery. Higher values speed up discovery for large DAGs with remote targets.

TYPE: int DEFAULT: 50

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution to ensure exactly-once execution across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

Source code in stardag/build/_concurrent.py
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
async def build_aio(
    tasks: Sequence[BaseTask] | BaseTask,
    task_executor: TaskExecutorABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    registry: RegistryABC | None = None,
    max_concurrent_discover: int = 50,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
    resume_build_id: UUID | None = None,
    register_all: bool = False,
    on_registry_failure: OnRegistryFailure = "raise",
) -> BuildSummary:
    """Build tasks concurrently using hybrid async/thread/process execution.

    This is the main build function for production use. It:
    - Discovers all tasks in the DAG(s)
    - Schedules tasks for execution when dependencies are met
    - Handles dynamic dependencies via generator suspension
    - Supports multiple root tasks (built concurrently)
    - Routes tasks to async/thread/process based on ExecutionModeSelector
    - Manages all registry interactions (start/complete/fail task)
    - Optionally uses global concurrency locks for distributed execution

    Args:
        tasks: List of root tasks to build (and their dependencies) or a single root
            task.
        task_executor: TaskExecutor for executing tasks (default: HybridConcurrentTaskExecutor).
            Use RoutedTaskExecutor to route tasks to different executors (e.g., Modal).
        fail_mode: How to handle task failures
        registry: Registry for tracking builds (default: from registry_provider)
        max_concurrent_discover: Maximum concurrent completion checks during DAG discovery.
            Higher values speed up discovery for large DAGs with remote targets.
        global_lock_manager: Global concurrency lock manager for distributed builds.
            If provided with global_lock_config.enabled=True, tasks will acquire locks
            before execution to ensure exactly-once execution across processes.
        global_lock_config: Configuration for global locking behavior.
        resume_build_id: Optional build ID to resume. If provided, continues tracking
            events under this existing build instead of starting a new one.
        register_all: If True, discovery continues recursing into dependencies of
            already-complete tasks. This ensures all tasks in the DAG get registered
            in the registry (useful for complete DAG visualization). Default False
            for performance — skipping complete subgraphs avoids unnecessary I/O.
        on_registry_failure: How to handle registry call failures. "raise" (default)
            propagates the exception; "warn" logs a warning and continues.

    Returns:
        BuildSummary with status, task counts, and build_id
    """
    if isinstance(tasks, BaseTask):
        tasks = [tasks]
    else:
        tasks = list(tasks)
        for idx, task in enumerate(tasks):
            if not isinstance(task, BaseTask):
                raise ValueError(
                    f"Invalid task at index {idx}: {task} (must be BaseTask)"
                )

    # Determine registry: explicit > registry_provider
    if registry is None:
        registry = registry_provider.get()
    logger.info(f"Using registry: {type(registry).__name__}")

    if task_executor is None:
        task_executor = HybridConcurrentTaskExecutor()

    # Setup global lock selector
    if global_lock_config is None:
        global_lock_config = GlobalLockConfig()
    lock_selector: GlobalLockSelector = DefaultGlobalLockSelector(global_lock_config)

    # Track locks held by this build for manual release
    held_locks: set[str] = set()

    task_count = TaskCount()
    completion_cache: set[UUID] = set()
    error: BaseException | None = None

    # Task execution states
    task_states: dict[UUID, TaskExecutionState] = {}
    # Events for completion signaling
    completion_events: dict[UUID, asyncio.Event] = {}
    # Currently executing tasks
    executing: set[UUID] = set()

    # Tasks found to be already complete during discovery (to register after build starts)
    previously_completed_tasks: list[BaseTask] = []

    # Synchronization for concurrent discovery
    discover_lock = asyncio.Lock()
    discover_semaphore = asyncio.Semaphore(max_concurrent_discover)

    async def discover(task: BaseTask) -> None:
        """Recursively discover tasks, stopping at already-complete tasks.

        This optimization avoids traversing into dependency subgraphs that
        are already complete, which can significantly reduce discovery time
        for large DAGs with cached results.

        Uses concurrent recursion with TaskGroup for parallel discovery,
        with a lock protecting shared data structures and a semaphore
        limiting concurrent completion checks.
        """
        # Check if already discovered and reserve our spot (with lock)
        async with discover_lock:
            if task.id in task_states:
                return
            static_deps = flatten_task_struct(task.requires())
            task_states[task.id] = TaskExecutionState(
                task=task, static_deps=static_deps
            )
            completion_events[task.id] = asyncio.Event()
            task_count.discovered += 1

        # Check completion outside lock (I/O bound, use semaphore to limit concurrency)
        async with discover_semaphore:
            is_complete = await task.complete_aio()

        if is_complete:
            async with discover_lock:
                completion_cache.add(task.id)
                task_states[task.id].completed = True
                completion_events[task.id].set()
                task_count.previously_completed += 1
                previously_completed_tasks.append(task)
            if not register_all:
                # Don't recurse into deps - they're already built
                return

        # Task not complete (or register_all) - recurse into dependencies
        async with asyncio.TaskGroup() as tg:
            for dep in static_deps:
                tg.create_task(discover(dep))

    # Discover all tasks from roots concurrently
    async with asyncio.TaskGroup() as tg:
        for root in tasks:
            tg.create_task(discover(root))

    # Start or resume build
    if resume_build_id is not None:
        build_id = resume_build_id
        logger.info(f"Resuming build: {build_id}")
    else:
        build_id = await registry.build_start_aio(root_tasks=tasks)
        logger.info(f"Started build: {build_id}")

    # Register previously completed tasks so they appear in the build's task list.
    # These get TASK_REFERENCED events since they already exist. We also call
    # task_complete_aio to self-heal tasks that were left in "Started" state from
    # a previous build that crashed or timed out — their target exists, so they
    # are complete, but the registry still shows them as running.
    for task in previously_completed_tasks:
        try:
            await registry.task_register_aio(build_id, task)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to register previously completed task {task.id}",
                on_registry_failure,
            )
            continue
        try:
            await registry.task_complete_aio(build_id, task)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to mark previously completed task {task.id} as complete",
                on_registry_failure,
            )

    await task_executor.setup()

    # Map task_id -> asyncio.Task for in-flight executions
    pending_futures: dict[UUID, asyncio.Task] = {}

    async def process_result(
        task: BaseTask,
        result: LockAcquisitionResult
        | TaskExecutionError
        | BaseException
        | TaskStruct
        | None,
    ):
        """Process a single task result (including lock acquisition results)."""
        nonlocal error
        state = task_states[task.id]

        # Handle lock acquisition results (lock was not acquired)
        if isinstance(result, LockAcquisitionResult):
            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                # Task completed externally - wait for visibility then mark complete
                await wait_for_completion_with_retry(task)
                state.completed = True
                completion_cache.add(task.id)
                completion_events[task.id].set()
                task_count.previously_completed += 1
            else:
                # ERROR, HELD_BY_OTHER, or CONCURRENCY_LIMIT_REACHED after timeout
                msg = f"Lock {result.status.value}"
                if result.error_message:
                    msg += f": {result.error_message}"
                state.exception = Exception(msg)
                task_count.failed += 1
                error = state.exception
                if fail_mode == FailMode.FAIL_FAST:
                    raise state.exception
            return

        # Handle normal task execution results
        if isinstance(result, TaskExecutionError):
            # Task failed - release lock (not completed) and notify registry
            await release_lock_for_task(task, completed=False)
            try:
                await registry.task_fail_aio(build_id, task, str(result))
            except Exception as reg_err:
                handle_registry_error(
                    reg_err,
                    f"Failed to notify registry of task {task.id} failure",
                    on_registry_failure,
                )
            state.exception = result.exception
            task_count.failed += 1
            error = result.exception
            if fail_mode == FailMode.FAIL_FAST:
                raise result.exception

        elif isinstance(result, BaseException):
            # Backward compat: custom executor returned a bare exception
            await release_lock_for_task(task, completed=False)
            try:
                await registry.task_fail_aio(build_id, task, str(result))
            except Exception as reg_err:
                handle_registry_error(
                    reg_err,
                    f"Failed to notify registry of task {task.id} failure",
                    on_registry_failure,
                )
            state.exception = result
            task_count.failed += 1
            error = result
            if fail_mode == FailMode.FAIL_FAST:
                raise result

        elif result is None:
            # Task completed - release lock (completed) and notify registry
            await release_lock_for_task(task, completed=True)
            try:
                await registry.task_complete_aio(build_id, task)
            except Exception as reg_err:
                handle_registry_error(
                    reg_err,
                    f"Failed to notify registry of task {task.id} completion",
                    on_registry_failure,
                )
            # Upload artifacts if any
            try:
                artifacts = await task.artifacts_aio()
                if artifacts:
                    await registry.task_upload_artifacts_aio(build_id, task, artifacts)
            except Exception as artifact_err:
                handle_registry_error(
                    artifact_err,
                    f"Failed to collect/upload artifacts for task {task.id}",
                    on_registry_failure,
                )
            state.completed = True
            completion_cache.add(task.id)
            completion_events[task.id].set()
            task_count.succeeded += 1

        else:
            # Dynamic deps returned (TaskStruct) - task is suspended
            # Note: Lock is still held - release on final completion/failure
            dynamic_deps = flatten_task_struct(result)

            # Notify registry that task is suspended waiting for dynamic deps
            try:
                await registry.task_suspend_aio(build_id, task)
            except Exception as reg_err:
                handle_registry_error(
                    reg_err,
                    f"Failed to notify registry of task {task.id} suspension",
                    on_registry_failure,
                )

            # Discover any new dynamic deps (discover handles counting)
            for dep in dynamic_deps:
                if dep.id not in task_states:
                    await discover(dep)

            # Accumulate dynamic deps (don't overwrite)
            existing_dyn_ids = {d.id for d in state.dynamic_deps}
            for dep in dynamic_deps:
                if dep.id not in existing_dyn_ids:
                    state.dynamic_deps.append(dep)

    def find_ready_tasks() -> list[BaseTask]:
        """Find tasks that are ready to execute."""
        ready: list[BaseTask] = []
        for state in task_states.values():
            if state.completed or state.task.id in executing:
                continue
            if state.exception is not None:
                continue

            # Check all deps (static + dynamic) complete
            all_deps_complete = all(
                task_states[dep.id].completed for dep in state.all_deps
            )
            if all_deps_complete:
                ready.append(state.task)
                executing.add(state.task.id)
        return ready

    async def wait_for_completion_with_retry(task: BaseTask) -> bool:
        """Wait for task.complete_aio() to return True (handles eventual consistency).

        When the lock reports ALREADY_COMPLETED, the task output may not be
        immediately visible due to eventual consistency (e.g., S3). This function
        retries until the output exists or timeout is reached.
        """
        assert global_lock_config is not None
        timeout = global_lock_config.completion_retry_timeout_seconds
        interval = global_lock_config.completion_retry_interval_seconds
        start_time = asyncio.get_event_loop().time()

        while True:
            if await task.complete_aio():
                return True
            elapsed = asyncio.get_event_loop().time() - start_time
            if elapsed >= timeout:
                logger.warning(
                    f"Task {task.id} reported as completed by lock service, "
                    f"but complete_aio() returned False after {timeout}s. "
                    "Treating as complete (eventual consistency)."
                )
                return True
            await asyncio.sleep(interval)

    async def release_lock_for_task(task: BaseTask, completed: bool) -> None:
        """Release lock for task if held."""
        if global_lock_manager is None:
            return
        task_id = str(task.id)
        if task_id not in held_locks:
            return
        try:
            await global_lock_manager.release(task_id, task_completed=completed)
        except Exception as e:
            logger.warning(f"Failed to release lock for task {task_id}: {e}")
        finally:
            held_locks.discard(task_id)

    async def acquire_lock_with_completion_check(
        task: BaseTask,
        task_id: str,
        lock_manager: GlobalConcurrencyLockManager,
        config: GlobalLockConfig,
    ) -> LockAcquisitionResult:
        """Acquire lock with retry/backoff and external completion checking.

        During the retry loop, we also check if the task was completed externally
        (e.g., by another process). This handles the race condition where:
        1. Lock is held by another process
        2. That process completes the task and releases the lock
        3. Before we can re-acquire, we should notice the task is complete
        """
        timeout = config.lock_wait_timeout_seconds
        current_interval = config.lock_wait_initial_interval_seconds
        max_interval = config.lock_wait_max_interval_seconds
        backoff_factor = config.lock_wait_backoff_factor
        state = task_states[task.id]
        notified_waiting = False  # Track if we've already notified registry

        loop = asyncio.get_event_loop()
        start_time = loop.time()

        while True:
            # Try to acquire the lock
            result = await lock_manager.acquire(task_id)

            if result.status == LockAcquisitionStatus.ACQUIRED:
                # Clear waiting flag if we were waiting
                state.waiting_for_lock = False
                return result

            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                state.waiting_for_lock = False
                return result

            if result.status == LockAcquisitionStatus.ERROR:
                state.waiting_for_lock = False
                return result

            # HELD_BY_OTHER or CONCURRENCY_LIMIT_REACHED - retry with backoff
            # Mark task as waiting for lock and notify registry (once)
            if not notified_waiting:
                state.waiting_for_lock = True
                notified_waiting = True
                lock_owner = result.error_message  # May contain owner info
                try:
                    await registry.task_waiting_for_lock_aio(build_id, task, lock_owner)
                except Exception as e:
                    handle_registry_error(
                        e,
                        f"Failed to notify registry of lock wait for task {task.id}",
                        on_registry_failure,
                    )

            if timeout is None:
                return result

            elapsed = loop.time() - start_time
            if elapsed >= timeout:
                return LockAcquisitionResult(
                    status=result.status,
                    acquired=False,
                    error_message=f"Timeout after {timeout}s: {result.status.value}",
                )

            # Check if task was completed externally during the wait
            if await task.complete_aio():
                state.waiting_for_lock = False
                return LockAcquisitionResult(
                    status=LockAcquisitionStatus.ALREADY_COMPLETED,
                    acquired=False,
                )

            logger.debug(
                f"Lock for {task_id} unavailable ({result.status}), "
                f"retrying in {current_interval:.1f}s..."
            )
            await asyncio.sleep(current_interval)
            current_interval = min(current_interval * backoff_factor, max_interval)

    async def submit_with_lock(
        task: BaseTask,
    ) -> LockAcquisitionResult | TaskExecutionError | TaskStruct | None:
        """Submit task for execution, acquiring lock first if enabled.

        This wraps lock acquisition + task execution as a single async unit,
        allowing the main loop to remain non-blocking while waiting for locks.

        Returns:
            - LockAcquisitionResult: If lock was not acquired (ALREADY_COMPLETED,
                ERROR, or timeout). The task was NOT executed.
            - TaskExecutionError | TaskStruct | None: Normal task result if lock was
                acquired (or locking wasn't needed) and task was executed.
        """
        state = task_states[task.id]
        use_lock = global_lock_manager is not None and lock_selector(task)

        if use_lock:
            assert global_lock_manager is not None  # For type checker
            task_id_str = str(task.id)

            # Always acquire lock (even if we think we hold it from a previous
            # dynamic deps yield). This handles:
            # 1. Fresh task execution - normal acquire
            # 2. Task resuming after dynamic deps - re-acquire is safe since
            #    we're the same owner, and handles the case where lock expired
            #    during the wait for deps
            lock_result = await acquire_lock_with_completion_check(
                task, task_id_str, global_lock_manager, global_lock_config
            )

            if lock_result.status != LockAcquisitionStatus.ACQUIRED:
                # Lock not acquired - return the lock result for handling
                return lock_result

            # Lock acquired - track it for release later
            held_locks.add(task_id_str)

        # Now we have the lock (or locking wasn't needed)
        # Start the task in registry
        if not state.started:
            await registry.task_start_aio(build_id, task)
            state.started = True
        elif state.dynamic_deps:
            # Task was suspended waiting for dynamic deps, now resuming
            await registry.task_resume_aio(build_id, task)

        # Execute the task via the executor
        return await task_executor.submit(task)

    try:
        # Main build loop using as_completed pattern
        while True:
            # Check if all roots complete
            all_roots_complete = all(task_states[root.id].completed for root in tasks)
            if all_roots_complete:
                break

            # Find and submit ready tasks
            ready = find_ready_tasks()

            # Submit ready tasks (lock acquisition + execution as single async unit)
            for task in ready:
                async_task = asyncio.create_task(submit_with_lock(task))
                pending_futures[task.id] = async_task

            # If nothing is pending, check for deadlock or completion
            if not pending_futures:
                incomplete = [
                    s
                    for s in task_states.values()
                    if not s.completed and s.exception is None
                ]
                if incomplete:
                    # Check if all incomplete tasks are blocked by failed dependencies
                    def has_failed_dep(state: TaskExecutionState) -> bool:
                        for dep in state.all_deps:
                            dep_state = task_states[dep.id]
                            if dep_state.exception is not None:
                                return True
                        return False

                    truly_blocked = [s for s in incomplete if not has_failed_dep(s)]
                    if truly_blocked:
                        # Real deadlock - tasks blocked without failed deps
                        raise RuntimeError(
                            f"Deadlock: {len(truly_blocked)} tasks cannot proceed. "
                            f"Tasks: {[s.task.id for s in truly_blocked[:5]]}"
                        )
                    # All remaining tasks are blocked by failed deps - exit gracefully
                break

            # Wait for at least one task to complete
            done, _ = await asyncio.wait(
                pending_futures.values(), return_when=asyncio.FIRST_COMPLETED
            )

            # Process completed tasks
            for async_task in done:
                # Find which task this was
                task_id = None
                for tid, fut in pending_futures.items():
                    if fut is async_task:
                        task_id = tid
                        break
                assert task_id is not None

                # Remove from pending and executing
                del pending_futures[task_id]
                executing.discard(task_id)

                # Get result and process
                task = task_states[task_id].task
                try:
                    result = async_task.result()
                except Exception as e:
                    result = TaskExecutionError(
                        exception=e,
                        traceback="".join(tb_module.format_exception(e)),
                    )
                await process_result(task, result)

            # Check for exit-early condition: all remaining tasks waiting for locks
            if global_lock_config.exit_early_when_all_locked:
                remaining_tasks = [
                    s
                    for s in task_states.values()
                    if not s.completed and s.exception is None
                ]
                if remaining_tasks:
                    all_waiting = all(s.waiting_for_lock for s in remaining_tasks)
                    if all_waiting:
                        reason = (
                            f"All {len(remaining_tasks)} remaining tasks "
                            "are running in other builds"
                        )
                        logger.info(f"Exiting early: {reason}")
                        try:
                            await registry.build_exit_early_aio(build_id, reason)
                        except Exception as reg_err:
                            handle_registry_error(
                                reg_err,
                                "Failed to notify registry of exit early",
                                on_registry_failure,
                            )
                        return BuildSummary(
                            status=BuildExitStatus.EXIT_EARLY,
                            task_count=task_count,
                            build_id=build_id,
                            error=None,
                        )

        await registry.build_complete_aio(build_id)
        return BuildSummary(
            status=BuildExitStatus.SUCCESS
            if error is None
            else BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=error,
        )

    except Exception as e:
        await registry.build_fail_aio(build_id, str(e))
        if fail_mode == FailMode.FAIL_FAST:
            raise
        return BuildSummary(
            status=BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=e,
        )

    finally:
        await task_executor.teardown()

build_sequential

build_sequential(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    dual_run_default="sync",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
    register_all=False,
    on_registry_failure="raise",
)

Sync API for building tasks sequentially.

This is intended primarily for debugging and testing.

Task execution policy: - Sync-only tasks: run via run() - Async-only tasks: run via asyncio.run(run_aio()). (Does not work if called from within an existing event loop.) - Dual tasks: run via run() if dual_run_default=="sync" (default), else (dual_run_default=="async") via asyncio.run(run_aio()).

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

dual_run_default

For dual tasks, prefer sync or async execution

TYPE: Literal['sync', 'async'] DEFAULT: 'sync'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

Source code in stardag/build/_sequential.py
def build_sequential(
    tasks: Sequence[BaseTask] | BaseTask,
    registry: RegistryABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    dual_run_default: Literal["sync", "async"] = "sync",
    resume_build_id: UUID | None = None,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
    register_all: bool = False,
    on_registry_failure: OnRegistryFailure = "raise",
) -> BuildSummary:
    """Sync API for building tasks sequentially.

    This is intended primarily for debugging and testing.

    Task execution policy:
    - Sync-only tasks: run via `run()`
    - Async-only tasks: run via `asyncio.run(run_aio())`. (Does not work if called
        from within an existing event loop.)
    - Dual tasks: run via `run()` if `dual_run_default=="sync"` (default), else
        (`dual_run_default=="async"`) via `asyncio.run(run_aio())`.

    Args:
        tasks: List of root tasks to build (and their dependencies) or a single root
            task.
        registry: Registry for tracking builds
        fail_mode: How to handle task failures
        dual_run_default: For dual tasks, prefer sync or async execution
        resume_build_id: Optional build ID to resume. If provided, continues tracking
            events under this existing build instead of starting a new one.
        global_lock_manager: Global concurrency lock manager for distributed builds.
            If provided with global_lock_config.enabled=True, tasks will acquire locks
            before execution for "exactly once" semantics across processes.
        global_lock_config: Configuration for global locking behavior.
        register_all: If True, discovery continues recursing into dependencies of
            already-complete tasks. This ensures all tasks in the DAG get registered
            in the registry (useful for complete DAG visualization). Default False
            for performance — skipping complete subgraphs avoids unnecessary I/O.
        on_registry_failure: How to handle registry call failures. "raise" (default)
            propagates the exception; "warn" logs a warning and continues.

    Returns:
        BuildSummary with status, task counts, and build_id
    """
    tasks_list = _validate_tasks(tasks)

    if registry is None:
        registry = registry_provider.get()
    if global_lock_config is None:
        global_lock_config = GlobalLockConfig()
    lock_selector: GlobalLockSelector = DefaultGlobalLockSelector(global_lock_config)
    held_locks: set[str] = set()

    task_count = TaskCount()
    completion_cache: set[UUID] = set()
    failed_cache: set[UUID] = set()
    error: Exception | None = None

    # Discover all tasks, stopping at already-complete tasks
    all_tasks: dict[UUID, BaseTask] = {}
    previously_completed_tasks: list[BaseTask] = []

    def discover(task: BaseTask) -> None:
        """Recursively discover tasks, stopping at already-complete tasks."""
        if task.id in all_tasks:
            return
        all_tasks[task.id] = task
        task_count.discovered += 1

        # Check if this task is already complete
        if task.complete():
            completion_cache.add(task.id)
            task_count.previously_completed += 1
            previously_completed_tasks.append(task)
            if not register_all:
                # Don't recurse into deps - they're already built
                return

        # Task not complete (or register_all) - recurse into dependencies
        for dep in flatten_task_struct(task.requires()):
            discover(dep)

    for root in tasks_list:
        discover(root)

    # Start or resume build
    if resume_build_id is not None:
        build_id = resume_build_id
    else:
        build_id = registry.build_start(root_tasks=tasks_list)

    # Register previously completed tasks so they appear in the build's task list.
    # We also call task_complete to mark them as done — otherwise they remain in
    # PENDING state in the registry (e.g. WrapperTasks that are complete because
    # their deps are complete, but were never explicitly run).
    for task in previously_completed_tasks:
        try:
            registry.task_register(build_id, task)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to register previously completed task {task.id}",
                on_registry_failure,
            )
            continue
        try:
            registry.task_complete(build_id, task)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to mark previously completed task {task.id} as complete",
                on_registry_failure,
            )

    def acquire_lock_sync(task: BaseTask) -> LockAcquisitionResult:
        """Acquire lock synchronously with retry/backoff."""
        assert global_lock_manager is not None
        assert global_lock_config is not None
        task_id = str(task.id)
        timeout = global_lock_config.lock_wait_timeout_seconds
        current_interval = global_lock_config.lock_wait_initial_interval_seconds
        max_interval = global_lock_config.lock_wait_max_interval_seconds
        backoff_factor = global_lock_config.lock_wait_backoff_factor
        start_time = time.time()

        while True:
            result = asyncio.run(global_lock_manager.acquire(task_id))

            if result.status == LockAcquisitionStatus.ACQUIRED:
                return result

            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                return result

            if result.status == LockAcquisitionStatus.ERROR:
                return result

            # HELD_BY_OTHER or CONCURRENCY_LIMIT_REACHED - retry with backoff
            if timeout is None:
                return result

            elapsed = time.time() - start_time
            if elapsed >= timeout:
                return LockAcquisitionResult(
                    status=result.status,
                    acquired=False,
                    error_message=f"Timeout after {timeout}s: {result.status.value}",
                )

            # Check if task was completed externally during the wait
            if task.complete():
                return LockAcquisitionResult(
                    status=LockAcquisitionStatus.ALREADY_COMPLETED,
                    acquired=False,
                )

            logger.debug(
                f"Lock for {task_id} unavailable ({result.status}), "
                f"retrying in {current_interval:.1f}s..."
            )
            time.sleep(current_interval)
            current_interval = min(current_interval * backoff_factor, max_interval)

    def release_lock_sync(task: BaseTask, completed: bool) -> None:
        """Release lock for task if held."""
        if global_lock_manager is None:
            return
        task_id = str(task.id)
        if task_id not in held_locks:
            return
        try:
            asyncio.run(global_lock_manager.release(task_id, task_completed=completed))
        except Exception as e:
            logger.warning(f"Failed to release lock for task {task_id}: {e}")
        finally:
            held_locks.discard(task_id)

    try:
        # Build in topological order
        while True:
            ready_task = _find_ready_task(all_tasks, completion_cache, failed_cache)

            if ready_task is None:
                _check_for_deadlock(all_tasks, completion_cache, failed_cache)
                # All remaining tasks are blocked by failed deps - exit gracefully
                break

            # Acquire lock if needed
            use_lock = global_lock_manager is not None and lock_selector(ready_task)
            if use_lock:
                lock_result = acquire_lock_sync(ready_task)

                if lock_result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                    # Task completed elsewhere - skip execution
                    completion_cache.add(ready_task.id)
                    task_count.previously_completed += 1
                    continue

                if lock_result.status != LockAcquisitionStatus.ACQUIRED:
                    # Lock not acquired - treat as failure
                    task_count.failed += 1
                    failed_cache.add(ready_task.id)
                    error = RuntimeError(
                        f"Failed to acquire lock: {lock_result.error_message}"
                    )
                    try:
                        # Register first so the registry has a task row to fail
                        registry.task_register(build_id, ready_task)
                        registry.task_fail(build_id, ready_task, str(error))
                    except Exception as reg_err:
                        handle_registry_error(
                            reg_err,
                            "Failed to notify registry of lock failure",
                            on_registry_failure,
                        )
                    if fail_mode == FailMode.FAIL_FAST:
                        raise error
                    continue

                # Lock acquired - track it
                held_locks.add(str(ready_task.id))

            # Execute the task
            task_completed = False
            try:
                _run_task_sequential(
                    ready_task,
                    completion_cache,
                    all_tasks,
                    build_id,
                    registry,
                    dual_run_default,
                    discover,
                    task_count,
                    on_registry_failure,
                )
                task_count.succeeded += 1
                task_completed = True
            except Exception as e:
                task_count.failed += 1
                failed_cache.add(ready_task.id)
                error = e
                try:
                    registry.task_fail(build_id, ready_task, str(e))
                except Exception as reg_err:
                    handle_registry_error(
                        reg_err,
                        f"Failed to notify registry of task {ready_task.id} failure",
                        on_registry_failure,
                    )
                if fail_mode == FailMode.FAIL_FAST:
                    raise
            finally:
                if use_lock:
                    release_lock_sync(ready_task, completed=task_completed)

        registry.build_complete(build_id)
        return BuildSummary(
            status=BuildExitStatus.SUCCESS
            if error is None
            else BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=error,
        )

    except Exception as e:
        registry.build_fail(build_id, str(e))
        if fail_mode == FailMode.FAIL_FAST:
            raise
        return BuildSummary(
            status=BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=e,
        )

build_sequential_aio async

build_sequential_aio(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    sync_run_default="blocking",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
    register_all=False,
    on_registry_failure="raise",
)

Async API for building tasks sequentially.

This is intended primarily for debugging and testing.

Task execution policy: - Sync-only tasks: runs blocking via run() in main event loop if sync_run_default=="blocking" (default), else (sync_run_default=="thread") in thread pool. - Async-only tasks: run via await run_aio(). - Dual tasks: run via await run_aio().

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

sync_run_default

For sync-only tasks, block or use thread pool

TYPE: Literal['thread', 'blocking'] DEFAULT: 'blocking'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

register_all

If True, discovery continues recursing into dependencies of already-complete tasks. This ensures all tasks in the DAG get registered in the registry (useful for complete DAG visualization). Default False for performance — skipping complete subgraphs avoids unnecessary I/O.

TYPE: bool DEFAULT: False

on_registry_failure

How to handle registry call failures. "raise" (default) propagates the exception; "warn" logs a warning and continues.

TYPE: OnRegistryFailure DEFAULT: 'raise'

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

Source code in stardag/build/_sequential.py
async def build_sequential_aio(
    tasks: Sequence[BaseTask] | BaseTask,
    registry: RegistryABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    sync_run_default: Literal["thread", "blocking"] = "blocking",
    resume_build_id: UUID | None = None,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
    register_all: bool = False,
    on_registry_failure: OnRegistryFailure = "raise",
) -> BuildSummary:
    """Async API for building tasks sequentially.

    This is intended primarily for debugging and testing.

    Task execution policy:
    - Sync-only tasks: runs *blocking* via `run()` in main event loop if
        `sync_run_default=="blocking"` (default), else (`sync_run_default=="thread"`)
        in thread pool.
    - Async-only tasks: run via `await run_aio()`.
    - Dual tasks: run via `await run_aio()`.

    Args:
        tasks: List of root tasks to build (and their dependencies) or a single root
            task.
        registry: Registry for tracking builds
        fail_mode: How to handle task failures
        sync_run_default: For sync-only tasks, block or use thread pool
        resume_build_id: Optional build ID to resume. If provided, continues tracking
            events under this existing build instead of starting a new one.
        global_lock_manager: Global concurrency lock manager for distributed builds.
            If provided with global_lock_config.enabled=True, tasks will acquire locks
            before execution for "exactly once" semantics across processes.
        global_lock_config: Configuration for global locking behavior.
        register_all: If True, discovery continues recursing into dependencies of
            already-complete tasks. This ensures all tasks in the DAG get registered
            in the registry (useful for complete DAG visualization). Default False
            for performance — skipping complete subgraphs avoids unnecessary I/O.
        on_registry_failure: How to handle registry call failures. "raise" (default)
            propagates the exception; "warn" logs a warning and continues.

    Returns:
        BuildSummary with status, task counts, and build_id
    """
    tasks_list = _validate_tasks(tasks)

    if registry is None:
        registry = registry_provider.get()
    if global_lock_config is None:
        global_lock_config = GlobalLockConfig()
    lock_selector: GlobalLockSelector = DefaultGlobalLockSelector(global_lock_config)
    held_locks: set[str] = set()

    task_count = TaskCount()
    completion_cache: set[UUID] = set()
    failed_cache: set[UUID] = set()
    error: Exception | None = None

    # Discover all tasks, stopping at already-complete tasks
    all_tasks: dict[UUID, BaseTask] = {}
    previously_completed_tasks: list[BaseTask] = []

    async def discover(task: BaseTask) -> None:
        """Recursively discover tasks, stopping at already-complete tasks."""
        if task.id in all_tasks:
            return
        all_tasks[task.id] = task
        task_count.discovered += 1

        # Check if this task is already complete
        if await task.complete_aio():
            completion_cache.add(task.id)
            task_count.previously_completed += 1
            previously_completed_tasks.append(task)
            if not register_all:
                # Don't recurse into deps - they're already built
                return

        # Task not complete (or register_all) - recurse into dependencies
        for dep in flatten_task_struct(task.requires()):
            await discover(dep)

    for root in tasks_list:
        await discover(root)

    # Start or resume build
    if resume_build_id is not None:
        build_id = resume_build_id
    else:
        build_id = await registry.build_start_aio(root_tasks=tasks_list)

    # Register previously completed tasks so they appear in the build's task list.
    # We also call task_complete_aio to mark them as done — otherwise they remain in
    # PENDING state in the registry (e.g. WrapperTasks that are complete because
    # their deps are complete, but were never explicitly run).
    for task in previously_completed_tasks:
        try:
            await registry.task_register_aio(build_id, task)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to register previously completed task {task.id}",
                on_registry_failure,
            )
            continue
        try:
            await registry.task_complete_aio(build_id, task)
        except Exception as reg_err:
            handle_registry_error(
                reg_err,
                f"Failed to mark previously completed task {task.id} as complete",
                on_registry_failure,
            )

    async def acquire_lock_aio(task: BaseTask) -> LockAcquisitionResult:
        """Acquire lock asynchronously with retry/backoff."""
        assert global_lock_manager is not None
        assert global_lock_config is not None
        task_id = str(task.id)
        timeout = global_lock_config.lock_wait_timeout_seconds
        current_interval = global_lock_config.lock_wait_initial_interval_seconds
        max_interval = global_lock_config.lock_wait_max_interval_seconds
        backoff_factor = global_lock_config.lock_wait_backoff_factor

        loop = asyncio.get_event_loop()
        start_time = loop.time()

        while True:
            result = await global_lock_manager.acquire(task_id)

            if result.status == LockAcquisitionStatus.ACQUIRED:
                return result

            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                return result

            if result.status == LockAcquisitionStatus.ERROR:
                return result

            # HELD_BY_OTHER or CONCURRENCY_LIMIT_REACHED - retry with backoff
            if timeout is None:
                return result

            elapsed = loop.time() - start_time
            if elapsed >= timeout:
                return LockAcquisitionResult(
                    status=result.status,
                    acquired=False,
                    error_message=f"Timeout after {timeout}s: {result.status.value}",
                )

            # Check if task was completed externally during the wait
            if await task.complete_aio():
                return LockAcquisitionResult(
                    status=LockAcquisitionStatus.ALREADY_COMPLETED,
                    acquired=False,
                )

            logger.debug(
                f"Lock for {task_id} unavailable ({result.status}), "
                f"retrying in {current_interval:.1f}s..."
            )
            await asyncio.sleep(current_interval)
            current_interval = min(current_interval * backoff_factor, max_interval)

    async def release_lock_aio(task: BaseTask, completed: bool) -> None:
        """Release lock for task if held."""
        if global_lock_manager is None:
            return
        task_id = str(task.id)
        if task_id not in held_locks:
            return
        try:
            await global_lock_manager.release(task_id, task_completed=completed)
        except Exception as e:
            logger.warning(f"Failed to release lock for task {task_id}: {e}")
        finally:
            held_locks.discard(task_id)

    try:
        # Build in topological order
        while True:
            ready_task = _find_ready_task(all_tasks, completion_cache, failed_cache)

            if ready_task is None:
                _check_for_deadlock(all_tasks, completion_cache, failed_cache)
                # All remaining tasks are blocked by failed deps - exit gracefully
                break

            # Acquire lock if needed
            use_lock = global_lock_manager is not None and lock_selector(ready_task)
            if use_lock:
                lock_result = await acquire_lock_aio(ready_task)

                if lock_result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                    # Task completed elsewhere - skip execution
                    completion_cache.add(ready_task.id)
                    task_count.previously_completed += 1
                    continue

                if lock_result.status != LockAcquisitionStatus.ACQUIRED:
                    # Lock not acquired - treat as failure
                    task_count.failed += 1
                    failed_cache.add(ready_task.id)
                    error = RuntimeError(
                        f"Failed to acquire lock: {lock_result.error_message}"
                    )
                    try:
                        # Register first so the registry has a task row to fail
                        await registry.task_register_aio(build_id, ready_task)
                        await registry.task_fail_aio(build_id, ready_task, str(error))
                    except Exception as reg_err:
                        handle_registry_error(
                            reg_err,
                            "Failed to notify registry of lock failure",
                            on_registry_failure,
                        )
                    if fail_mode == FailMode.FAIL_FAST:
                        raise error
                    continue

                # Lock acquired - track it
                held_locks.add(str(ready_task.id))

            # Execute the task
            task_completed = False
            try:
                await _run_task_sequential_aio(
                    ready_task,
                    completion_cache,
                    all_tasks,
                    build_id,
                    registry,
                    sync_run_default,
                    discover,
                    task_count,
                    on_registry_failure,
                )
                task_count.succeeded += 1
                task_completed = True
            except Exception as e:
                task_count.failed += 1
                failed_cache.add(ready_task.id)
                error = e
                try:
                    await registry.task_fail_aio(build_id, ready_task, str(e))
                except Exception as reg_err:
                    handle_registry_error(
                        reg_err,
                        f"Failed to notify registry of task {ready_task.id} failure",
                        on_registry_failure,
                    )
                if fail_mode == FailMode.FAIL_FAST:
                    raise
            finally:
                if use_lock:
                    await release_lock_aio(ready_task, completed=task_completed)

        await registry.build_complete_aio(build_id)
        return BuildSummary(
            status=BuildExitStatus.SUCCESS
            if error is None
            else BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=error,
        )

    except Exception as e:
        await registry.build_fail_aio(build_id, str(e))
        if fail_mode == FailMode.FAIL_FAST:
            raise
        return BuildSummary(
            status=BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=e,
        )

Target Module

target

target_factory_provider module-attribute

target_factory_provider = resource_provider(
    type_=TargetFactory, default_factory=TargetFactory
)

FileSystemTarget

Bases: Target, Protocol

Minimal base protocol for filesystem-backed targets.

Both FileTarget (file-oriented) and DirectoryTarget (directory-oriented) implement this protocol.

FileTarget

FileTarget(uri)

Bases: _FileTargetGeneric[bytes], Protocol

A file-oriented filesystem target with open/read/write capabilities.

Inherits all file I/O methods from _FileTargetGeneric: open(), proxy_path(), exists(), and their async variants. Concrete implementations: LocalFileTarget, RemoteFileTarget, InMemoryFileTarget.

Source code in stardag/target/_base.py
def __init__(self, uri: str) -> None:
    self.uri = uri

DirectoryTarget

DirectoryTarget(uri, prototype)

Bases: FileSystemTarget

A target representing a directory of file targets.

Manages a collection of sub-targets (files) under a common URI prefix, with a flag file to track completion. Sub-targets are created via get_sub_target() or the / operator.

Source code in stardag/target/_base.py
def __init__(
    self,
    uri: str,
    prototype: typing.Type[FileTarget] | typing.Callable[[str], FileTarget],
) -> None:
    self.uri = uri.removesuffix("/") + "/"
    self.prototype = prototype
    self._flag_target = prototype(self.uri[:-1] + "._DONE")
    self._sub_keys: set[str] = set()

exists_aio async

exists_aio()

Async check if directory is marked as done.

Source code in stardag/target/_base.py
async def exists_aio(self) -> bool:
    """Async check if directory is marked as done."""
    return await self._flag_target.exists_aio()

mark_done_aio async

mark_done_aio()

Async version of mark_done().

Source code in stardag/target/_base.py
async def mark_done_aio(self) -> None:
    """Async version of mark_done()."""
    async with self.sub_keys_target().proxy_path_aio("w") as path:
        async with aiofiles.open(path, "w") as f:
            await f.write("\n".join(sorted(self._sub_keys)))
    async with self._flag_target.proxy_path_aio("w") as path:
        async with aiofiles.open(path, "w") as f:
            await f.write("")  # empty file

LoadableSaveableFileSystemTarget

Bases: LoadableSaveableTarget[LoadedT], FileSystemTarget, Generic[LoadedT], Protocol

A filesystem target (file or directory) that supports load/save.

This is the return type of Task.target(). It provides: - load() -> LoadedT and save(obj: LoadedT) (from LoadableSaveableTarget) - uri: str and exists() -> bool (from FileSystemTarget)

LocalFileTarget

LocalFileTarget(uri)

Bases: FileTarget

TODO use luigi-style atomic writes.

Source code in stardag/target/_base.py
def __init__(self, uri: str) -> None:
    # Expand ~ to user home directory
    self.uri = os.path.expanduser(uri)

exists_aio async

exists_aio()

Asynchronously check if the local file exists.

Source code in stardag/target/_base.py
async def exists_aio(self) -> bool:
    """Asynchronously check if the local file exists."""
    return await aiofiles.os.path.exists(self.path)

TargetFactory

TargetFactory(
    target_roots=None, prefix_to_target_prototype=None
)
Source code in stardag/target/_factory.py
def __init__(
    self,
    target_roots: dict[str, str] | None = None,
    prefix_to_target_prototype: PrefixToFileTargetPrototype | None = None,
) -> None:
    # If no target_roots provided, get from central config
    if target_roots is None:
        target_roots = config_provider.get().target.roots

    self.target_roots = {
        key: value.removesuffix("/") + "/" for key, value in target_roots.items()
    }
    self.prefix_to_target_prototype = (
        prefix_to_target_prototype or get_default_prefix_to_target_prototype()
    )

get_file_target

get_file_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Get a file target.

PARAMETER DESCRIPTION
relpath

The path to the target, relative to the configured root path for target_root_key.

TYPE: str

target_root_key

The key to the target root to use.

TYPE: str DEFAULT: DEFAULT_TARGET_ROOT_KEY

RETURNS DESCRIPTION
FileTarget

A file target.

Source code in stardag/target/_factory.py
def get_file_target(
    self,
    relpath: str,
    target_root_key: str = DEFAULT_TARGET_ROOT_KEY,
) -> FileTarget:
    """Get a file target.

    Args:
        relpath: The path to the target, relative to the configured root path for
            `target_root_key`.
        target_root_key: The key to the target root to use.

    Returns:
        A file target.
    """
    if self._is_full_path(relpath):
        path = relpath
    else:
        path = self.get_path(relpath, target_root_key)
    target_prototype = self._get_target_prototype(path)
    return target_prototype(path)

get_directory_target

get_directory_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Get a directory target.

PARAMETER DESCRIPTION
relpath

The path to the target, relative to the configured root path for target_root_key.

TYPE: str

target_root_key

The key to the target root to use.

TYPE: str DEFAULT: DEFAULT_TARGET_ROOT_KEY

RETURNS DESCRIPTION
DirectoryTarget

A directory target.

Source code in stardag/target/_factory.py
def get_directory_target(
    self,
    relpath: str,
    target_root_key: str = DEFAULT_TARGET_ROOT_KEY,
) -> DirectoryTarget:
    """Get a directory target.

    Args:
        relpath: The path to the target, relative to the configured root path for
            `target_root_key`.
        target_root_key: The key to the target root to use.

    Returns:
        A directory target.
    """
    if self._is_full_path(relpath):
        path = relpath
    else:
        path = self.get_path(relpath, target_root_key)
    target_prototype = self._get_target_prototype(path)
    return DirectoryTarget(path, target_prototype)

get_path

get_path(relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY)

Get the full (/"absolute") path (/"URI") to the target.

Source code in stardag/target/_factory.py
def get_path(
    self, relpath: str, target_root_key: str = DEFAULT_TARGET_ROOT_KEY
) -> str:
    """Get the full (/"absolute") path (/"URI") to the target."""
    target_root = self.target_roots.get(target_root_key)
    if target_root is None:
        example_json = json.dumps({target_root_key: "...", "default": "..."})
        raise ValueError(
            f"No target root is configured for key: '{target_root_key}'. "
            f"Available keys are: {list(self.target_roots.keys())}. Set the missing "
            "target root in your registry config or via environment variable: "
            f"`STARDAG_TARGET_ROOTS='{example_json}'`."
        )

    return f"{target_root}{relpath}"

Registry Module

registry

Task registry module for stardag.

This module provides registry implementations for tracking task execution. The main classes are:

  • RegistryABC: Abstract base class defining the registry interface
  • APIRegistry: Registry that communicates with the stardag-api service
  • NoOpRegistry: A do-nothing registry (default when unconfigured)
  • registry_provider: Resource provider for getting the configured registry
  • RegistryGlobalConcurrencyLockManager: GlobalConcurrencyLockManager using Registry API
  • RegistryLockHandle: LockHandle implementation with automatic TTL renewal

registry_provider module-attribute

registry_provider = resource_provider(
    RegistryABC, init_registry
)

APIRegistry

APIRegistry(
    api_url=None,
    timeout=None,
    environment_id=None,
    api_key=None,
)

Bases: RegistryABC

Registry that stores task information via the stardag-api REST service.

This registry is stateless with respect to build_id - the build_id is passed explicitly to all methods that need it. This allows a single registry instance to be reused across multiple builds (via registry_provider).

Usage

build_id = await registry.build_start_aio(root_tasks=tasks) await registry.task_register_aio(build_id, task) await registry.task_start_aio(build_id, task)

... execute task ...

await registry.task_complete_aio(build_id, task) await registry.build_complete_aio(build_id)

Authentication: - API key can be provided directly or via STARDAG_API_KEY env var - JWT token from browser login (stored in registry credentials)

Configuration is loaded from the central config module (stardag.config).

Source code in stardag/registry/_api_registry.py
def __init__(
    self,
    api_url: str | None = None,
    timeout: float | None = None,
    environment_id: str | None = None,
    api_key: str | None = None,
):
    # Load central config
    config = config_provider.get()

    # API key: explicit > config (which includes env var)
    self.api_key = api_key or config.api_key

    # Access token from config (browser login, only if no API key)
    self.access_token = config.access_token if not self.api_key else None

    # API URL: explicit > config
    resolved_url = api_url or config.api.url
    if not resolved_url:
        raise ValueError(
            "APIRegistry requires a registry URL. "
            "Set STARDAG_REGISTRY_URL or configure a profile with a registry."
        )
    self.api_url = resolved_url.rstrip("/")

    # Timeout: explicit > config
    self.timeout = timeout if timeout is not None else config.api.timeout

    # Environment ID: explicit > config
    self.environment_id = environment_id or config.context.environment_id

    self._client = None
    self._async_client = None
    self._async_client_loop = (
        None  # Track which event loop the async client belongs to
    )

    if self.api_key:
        logger.debug("APIRegistry initialized with API key authentication")
    elif self.access_token:
        if not self.environment_id:
            logger.warning(
                "APIRegistry: JWT auth requires environment_id. "
                "Run 'stardag config set environment <id>' to set it."
            )
        else:
            logger.debug(
                "APIRegistry initialized with browser login (JWT) authentication"
            )
    else:
        logger.warning(
            "APIRegistry initialized without authentication. "
            "Run 'stardag auth login' or set STARDAG_API_KEY env var."
        )

async_client property

async_client

Lazy-initialized async HTTP client with retry transport.

The client is recreated if the event loop changes, which can happen when running in frameworks like Prefect that create new event loops for task execution.

build_start

build_start(root_tasks=None, description=None)

Start a new build and return its ID.

Source code in stardag/registry/_api_registry.py
def build_start(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Start a new build and return its ID."""
    build_data = {
        "commit_hash": get_git_commit_hash(),
        "root_task_ids": [str(task.id) for task in (root_tasks or [])],
        "description": description,
    }

    response = self._request(
        "POST",
        f"{self.api_url}/api/v1/builds",
        json=build_data,
        params=self._get_params(),
        operation="Start build",
    )
    data = response.json()
    build_id = UUID(data["id"])
    logger.info(f"Started build: {data['name']} (ID: {build_id})")
    return build_id

build_complete

build_complete(build_id)

Mark a build as completed.

Source code in stardag/registry/_api_registry.py
def build_complete(self, build_id: UUID) -> None:
    """Mark a build as completed."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/complete",
        params=self._get_event_params(),
        operation="Complete build",
    )
    logger.info(f"Completed build: {build_id}")

build_fail

build_fail(build_id, error_message=None)

Mark a build as failed.

Source code in stardag/registry/_api_registry.py
def build_fail(self, build_id: UUID, error_message: str | None = None) -> None:
    """Mark a build as failed."""
    params = self._get_event_params()
    if error_message:
        params["error_message"] = error_message
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/fail",
        params=params,
        operation="Fail build",
    )
    logger.info(f"Marked build as failed: {build_id}")

build_cancel

build_cancel(build_id)

Cancel a build.

Source code in stardag/registry/_api_registry.py
def build_cancel(self, build_id: UUID) -> None:
    """Cancel a build."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/cancel",
        params=self._get_event_params(),
        operation="Cancel build",
    )
    logger.info(f"Cancelled build: {build_id}")

build_exit_early

build_exit_early(build_id, reason=None)

Mark a build as exited early.

Source code in stardag/registry/_api_registry.py
def build_exit_early(self, build_id: UUID, reason: str | None = None) -> None:
    """Mark a build as exited early."""
    params = self._get_event_params()
    if reason:
        params["reason"] = reason
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/exit-early",
        params=params,
        operation="Exit early",
    )
    logger.info(f"Build exited early: {build_id}")

task_register

task_register(build_id, task)

Register a task within a build.

Source code in stardag/registry/_api_registry.py
def task_register(self, build_id: UUID, task: "BaseTask") -> None:
    """Register a task within a build."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks",
        json=_get_task_data_for_registration(task),
        params=self._get_params(),
        operation=f"Register task {task.id}",
    )

task_start

task_start(build_id, task)

Mark a task as started.

Source code in stardag/registry/_api_registry.py
def task_start(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as started."""
    # Ensure task is registered first
    self.task_register(build_id, task)

    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/start",
        params=self._get_event_params(),
        operation=f"Start task {task.id}",
    )

task_complete

task_complete(build_id, task)

Mark a task as completed.

Source code in stardag/registry/_api_registry.py
def task_complete(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as completed."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/complete",
        params=self._get_event_params(),
        operation=f"Complete task {task.id}",
    )

task_fail

task_fail(build_id, task, error_message=None)

Mark a task as failed.

Source code in stardag/registry/_api_registry.py
def task_fail(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Mark a task as failed."""
    params = self._get_event_params()
    if error_message:
        params["error_message"] = error_message
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/fail",
        params=params,
        operation=f"Fail task {task.id}",
    )

task_suspend

task_suspend(build_id, task)

Mark a task as suspended (waiting for dynamic dependencies).

Source code in stardag/registry/_api_registry.py
def task_suspend(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as suspended (waiting for dynamic dependencies)."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/suspend",
        params=self._get_event_params(),
        operation=f"Suspend task {task.id}",
    )

task_resume

task_resume(build_id, task)

Mark a task as resumed (dynamic dependencies completed).

Source code in stardag/registry/_api_registry.py
def task_resume(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as resumed (dynamic dependencies completed)."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/resume",
        params=self._get_event_params(),
        operation=f"Resume task {task.id}",
    )

task_cancel

task_cancel(build_id, task)

Cancel a task.

Source code in stardag/registry/_api_registry.py
def task_cancel(self, build_id: UUID, task: "BaseTask") -> None:
    """Cancel a task."""
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/cancel",
        params=self._get_event_params(),
        operation=f"Cancel task {task.id}",
    )

task_waiting_for_lock

task_waiting_for_lock(build_id, task, lock_owner=None)

Record that a task is waiting for a global lock.

Source code in stardag/registry/_api_registry.py
def task_waiting_for_lock(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Record that a task is waiting for a global lock."""
    params = self._get_event_params()
    if lock_owner:
        params["lock_owner"] = lock_owner
    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/waiting-for-lock",
        params=params,
        operation=f"Task {task.id} waiting for lock",
    )

task_upload_artifacts

task_upload_artifacts(build_id, task, artifacts)

Upload artifacts for a completed task.

Source code in stardag/registry/_api_registry.py
def task_upload_artifacts(
    self, build_id: UUID, task: "BaseTask", artifacts: Sequence[Artifact]
) -> None:
    """Upload artifacts for a completed task."""
    if not artifacts:
        return

    # Serialize artifacts to API format
    # For all artifact types, body is stored as a dict in body_json
    # - markdown: {"content": "<markdown string>"}
    # - json: the actual JSON data dict
    artifacts_data = []
    for artifact in artifacts:
        data = artifact.model_dump(mode="json")
        if artifact.type == "markdown":
            # Wrap markdown body string in {"content": ...} dict
            data["body"] = {"content": data["body"]}
        artifacts_data.append(data)

    self._request(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/artifacts",
        json=artifacts_data,
        params=self._get_params(),
        operation=f"Upload artifacts for task {task.id}",
    )
    logger.debug(f"Uploaded {len(artifacts)} artifacts for task {task.id}")

task_get_metadata

task_get_metadata(task_id)

Get metadata for a registered task.

PARAMETER DESCRIPTION
task_id

The UUID of the task to get metadata for.

TYPE: UUID

RETURNS DESCRIPTION
TaskMetadata

A TaskMetadata object containing task metadata.

Source code in stardag/registry/_api_registry.py
def task_get_metadata(self, task_id: UUID) -> TaskMetadata:
    """Get metadata for a registered task.

    Args:
        task_id: The UUID of the task to get metadata for.

    Returns:
        A TaskMetadata object containing task metadata.
    """

    response = self._request(
        "GET",
        f"{self.api_url}/api/v1/tasks/{task_id}/metadata",
        params=self._get_params(),
        operation=f"Get metadata for task {task_id}",
    )
    data = response.json()

    return TaskMetadata.model_validate(data)

close

close()

Close the HTTP client.

Source code in stardag/registry/_api_registry.py
def close(self) -> None:
    """Close the HTTP client."""
    if self._client is not None:
        self._client.close()
        self._client = None

aclose async

aclose()

Close the async HTTP client.

Source code in stardag/registry/_api_registry.py
async def aclose(self) -> None:
    """Close the async HTTP client."""
    if self._async_client is not None:
        await self._async_client.aclose()
        self._async_client = None

build_start_aio async

build_start_aio(root_tasks=None, description=None)

Async version - start a new build and return its ID.

Source code in stardag/registry/_api_registry.py
async def build_start_aio(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Async version - start a new build and return its ID."""
    build_data = {
        "commit_hash": get_git_commit_hash(),
        "root_task_ids": [str(task.id) for task in (root_tasks or [])],
        "description": description,
    }

    response = await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds",
        json=build_data,
        params=self._get_params(),
        operation="Start build",
    )
    data = response.json()
    build_id = UUID(data["id"])
    logger.info(f"Started build: {data['name']} (ID: {build_id})")
    return build_id

build_complete_aio async

build_complete_aio(build_id)

Async version - mark a build as completed.

Source code in stardag/registry/_api_registry.py
async def build_complete_aio(self, build_id: UUID) -> None:
    """Async version - mark a build as completed."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/complete",
        params=self._get_event_params(),
        operation="Complete build",
    )
    logger.info(f"Completed build: {build_id}")

build_fail_aio async

build_fail_aio(build_id, error_message=None)

Async version - mark a build as failed.

Source code in stardag/registry/_api_registry.py
async def build_fail_aio(
    self, build_id: UUID, error_message: str | None = None
) -> None:
    """Async version - mark a build as failed."""
    params = self._get_event_params()
    if error_message:
        params["error_message"] = error_message
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/fail",
        params=params,
        operation="Fail build",
    )
    logger.info(f"Marked build as failed: {build_id}")

build_cancel_aio async

build_cancel_aio(build_id)

Async version - cancel a build.

Source code in stardag/registry/_api_registry.py
async def build_cancel_aio(self, build_id: UUID) -> None:
    """Async version - cancel a build."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/cancel",
        params=self._get_event_params(),
        operation="Cancel build",
    )
    logger.info(f"Cancelled build: {build_id}")

build_exit_early_aio async

build_exit_early_aio(build_id, reason=None)

Async version - mark build as exited early.

Source code in stardag/registry/_api_registry.py
async def build_exit_early_aio(
    self, build_id: UUID, reason: str | None = None
) -> None:
    """Async version - mark build as exited early."""
    params = self._get_event_params()
    if reason:
        params["reason"] = reason
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/exit-early",
        params=params,
        operation="Exit early",
    )
    logger.info(f"Build exited early: {build_id}")

task_register_aio async

task_register_aio(build_id, task)

Async version - register a task within a build.

Source code in stardag/registry/_api_registry.py
async def task_register_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - register a task within a build."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks",
        json=_get_task_data_for_registration(task),
        params=self._get_params(),
        operation=f"Register task {task.id}",
    )

task_start_aio async

task_start_aio(build_id, task)

Async version - mark a task as started.

Source code in stardag/registry/_api_registry.py
async def task_start_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as started."""
    await self.task_register_aio(build_id, task)

    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/start",
        params=self._get_event_params(),
        operation=f"Start task {task.id}",
    )

task_complete_aio async

task_complete_aio(build_id, task)

Async version - mark a task as completed.

Source code in stardag/registry/_api_registry.py
async def task_complete_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as completed."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/complete",
        params=self._get_event_params(),
        operation=f"Complete task {task.id}",
    )

task_fail_aio async

task_fail_aio(build_id, task, error_message=None)

Async version - mark a task as failed.

Source code in stardag/registry/_api_registry.py
async def task_fail_aio(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Async version - mark a task as failed."""
    params = self._get_event_params()
    if error_message:
        params["error_message"] = error_message
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/fail",
        params=params,
        operation=f"Fail task {task.id}",
    )

task_suspend_aio async

task_suspend_aio(build_id, task)

Async version - mark a task as suspended.

Source code in stardag/registry/_api_registry.py
async def task_suspend_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as suspended."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/suspend",
        params=self._get_event_params(),
        operation=f"Suspend task {task.id}",
    )

task_resume_aio async

task_resume_aio(build_id, task)

Async version - mark a task as resumed.

Source code in stardag/registry/_api_registry.py
async def task_resume_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as resumed."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/resume",
        params=self._get_event_params(),
        operation=f"Resume task {task.id}",
    )

task_cancel_aio async

task_cancel_aio(build_id, task)

Async version - cancel a task.

Source code in stardag/registry/_api_registry.py
async def task_cancel_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - cancel a task."""
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/cancel",
        params=self._get_event_params(),
        operation=f"Cancel task {task.id}",
    )

task_waiting_for_lock_aio async

task_waiting_for_lock_aio(build_id, task, lock_owner=None)

Async version - record that task is waiting for global lock.

Source code in stardag/registry/_api_registry.py
async def task_waiting_for_lock_aio(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Async version - record that task is waiting for global lock."""
    params = self._get_event_params()
    if lock_owner:
        params["lock_owner"] = lock_owner
    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/waiting-for-lock",
        params=params,
        operation=f"Task {task.id} waiting for lock",
    )

task_upload_artifacts_aio async

task_upload_artifacts_aio(build_id, task, artifacts)

Async version - upload artifacts for a completed task.

Source code in stardag/registry/_api_registry.py
async def task_upload_artifacts_aio(
    self, build_id: UUID, task: "BaseTask", artifacts: Sequence[Artifact]
) -> None:
    """Async version - upload artifacts for a completed task."""
    if not artifacts:
        return

    artifacts_data = []
    for artifact in artifacts:
        data = artifact.model_dump(mode="json")
        if artifact.type == "markdown":
            data["body"] = {"content": data["body"]}
        artifacts_data.append(data)

    await self._arequest(
        "POST",
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/artifacts",
        json=artifacts_data,
        params=self._get_params(),
        operation=f"Upload artifacts for task {task.id}",
    )
    logger.debug(f"Uploaded {len(artifacts)} artifacts for task {task.id}")

task_get_metadata_aio async

task_get_metadata_aio(task_id)

Async version of task_get_metadata.

Source code in stardag/registry/_api_registry.py
async def task_get_metadata_aio(self, task_id: UUID) -> TaskMetadata:
    """Async version of task_get_metadata."""

    response = await self._arequest(
        "GET",
        f"{self.api_url}/api/v1/tasks/{task_id}/metadata",
        params=self._get_params(),
        operation=f"Get metadata for task {task_id}",
    )
    data = response.json()

    return TaskMetadata.model_validate(data)

RegistryABC

Abstract base class for task registries.

A registry tracks task execution within builds. Implementations must provide at least the task_register method. All other methods have default no-op implementations for backwards compatibility.

The registry is stateless with respect to build_id - the build_id is passed explicitly to all methods that need it. This allows a single registry instance to be reused across multiple builds.

Method naming convention: - Build methods: build_ (e.g., build_start, build_complete) - Task methods: task_ (e.g., task_register, task_start) - Async versions: _aio suffix (e.g., build_start_aio, task_register_aio)

build_start

build_start(root_tasks=None, description=None)

Start a new build session.

Called at the beginning of a build. Returns a build ID.

PARAMETER DESCRIPTION
root_tasks

The root tasks being built

TYPE: list[BaseTask] | None DEFAULT: None

description

Optional description of the build

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
UUID

Build ID (UUID) for the new build session.

Source code in stardag/registry/_base.py
def build_start(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Start a new build session.

    Called at the beginning of a build. Returns a build ID.

    Args:
        root_tasks: The root tasks being built
        description: Optional description of the build

    Returns:
        Build ID (UUID) for the new build session.
    """
    return UUID("00000000-0000-0000-0000-000000000000")

build_complete

build_complete(build_id)

Mark a build as completed successfully.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

Source code in stardag/registry/_base.py
def build_complete(self, build_id: UUID) -> None:
    """Mark a build as completed successfully.

    Args:
        build_id: The build UUID returned by build_start.
    """
    pass

build_fail

build_fail(build_id, error_message=None)

Mark a build as failed.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

error_message

Optional error message describing the failure.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def build_fail(self, build_id: UUID, error_message: str | None = None) -> None:
    """Mark a build as failed.

    Args:
        build_id: The build UUID returned by build_start.
        error_message: Optional error message describing the failure.
    """
    pass

build_cancel

build_cancel(build_id)

Cancel a build.

Called when a build is explicitly cancelled by the user.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

Source code in stardag/registry/_base.py
def build_cancel(self, build_id: UUID) -> None:
    """Cancel a build.

    Called when a build is explicitly cancelled by the user.

    Args:
        build_id: The build UUID returned by build_start.
    """
    pass

build_exit_early

build_exit_early(build_id, reason=None)

Mark a build as exited early.

Called when all remaining tasks are running in other builds and this build should stop waiting.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

reason

Optional reason for exiting early.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def build_exit_early(self, build_id: UUID, reason: str | None = None) -> None:
    """Mark a build as exited early.

    Called when all remaining tasks are running in other builds
    and this build should stop waiting.

    Args:
        build_id: The build UUID returned by build_start.
        reason: Optional reason for exiting early.
    """
    pass

task_register abstractmethod

task_register(build_id, task)

Register a task as pending/scheduled.

This is called when a task is about to be executed.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task to register.

TYPE: BaseTask

Source code in stardag/registry/_base.py
@abc.abstractmethod
def task_register(self, build_id: UUID, task: "BaseTask") -> None:
    """Register a task as pending/scheduled.

    This is called when a task is about to be executed.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task to register.
    """
    pass

task_start

task_start(build_id, task)

Mark a task as started/running.

Called immediately before a task begins execution.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that is starting.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_start(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as started/running.

    Called immediately before a task begins execution.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that is starting.
    """
    pass

task_complete

task_complete(build_id, task)

Mark a task as completed successfully.

Called after a task finishes execution without errors.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that completed.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_complete(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as completed successfully.

    Called after a task finishes execution without errors.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that completed.
    """
    pass

task_fail

task_fail(build_id, task, error_message=None)

Mark a task as failed.

Called when a task raises an exception during execution.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that failed.

TYPE: BaseTask

error_message

Optional error message describing the failure.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def task_fail(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Mark a task as failed.

    Called when a task raises an exception during execution.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that failed.
        error_message: Optional error message describing the failure.
    """
    pass

task_suspend

task_suspend(build_id, task)

Mark a task as suspended waiting for dynamic dependencies.

Called when a task yields dynamic deps that are not yet complete. The task will remain suspended until its dynamic deps are built.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that is suspended.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_suspend(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as suspended waiting for dynamic dependencies.

    Called when a task yields dynamic deps that are not yet complete.
    The task will remain suspended until its dynamic deps are built.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that is suspended.
    """
    pass

task_resume

task_resume(build_id, task)

Mark a task as resumed after dynamic dependencies completed.

Called when a task's dynamic dependencies are complete and the task is ready to continue execution (either by resuming a suspended generator or by re-executing the task).

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that is resuming.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_resume(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as resumed after dynamic dependencies completed.

    Called when a task's dynamic dependencies are complete and
    the task is ready to continue execution (either by resuming
    a suspended generator or by re-executing the task).

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that is resuming.
    """
    pass

task_cancel

task_cancel(build_id, task)

Cancel a task.

Called when a task is explicitly cancelled by the user.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task to cancel.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_cancel(self, build_id: UUID, task: "BaseTask") -> None:
    """Cancel a task.

    Called when a task is explicitly cancelled by the user.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task to cancel.
    """
    pass

task_waiting_for_lock

task_waiting_for_lock(build_id, task, lock_owner=None)

Record that a task is waiting for a global lock.

Called when a task cannot acquire its lock because another build is holding it.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task waiting for the lock.

TYPE: BaseTask

lock_owner

Optional identifier of who holds the lock.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def task_waiting_for_lock(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Record that a task is waiting for a global lock.

    Called when a task cannot acquire its lock because another
    build is holding it.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task waiting for the lock.
        lock_owner: Optional identifier of who holds the lock.
    """
    pass

task_upload_artifacts

task_upload_artifacts(build_id, task, artifacts)

Upload artifacts for a completed task.

Called after a task completes successfully if it has artifacts.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The completed task.

TYPE: BaseTask

artifacts

List of artifacts to upload.

TYPE: Sequence[Artifact]

Source code in stardag/registry/_base.py
def task_upload_artifacts(
    self, build_id: UUID, task: "BaseTask", artifacts: Sequence["Artifact"]
) -> None:
    """Upload artifacts for a completed task.

    Called after a task completes successfully if it has artifacts.

    Args:
        build_id: The build UUID returned by build_start.
        task: The completed task.
        artifacts: List of artifacts to upload.
    """
    pass

task_get_metadata abstractmethod

task_get_metadata(task_id)

Get metadata for a registered task.

PARAMETER DESCRIPTION
task_id

The ID of the task to get metadata for.

TYPE: UUID

Returns: A TaskMetadata object containing task metadata.

Source code in stardag/registry/_base.py
@abc.abstractmethod
def task_get_metadata(self, task_id: UUID) -> TaskMetadata:
    """Get metadata for a registered task.

    Args:
        task_id: The ID of the task to get metadata for.
    Returns:
        A TaskMetadata object containing task metadata.
    """
    pass

build_start_aio async

build_start_aio(root_tasks=None, description=None)

Async version of build_start.

Source code in stardag/registry/_base.py
async def build_start_aio(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Async version of build_start."""
    return self.build_start(root_tasks, description)

build_complete_aio async

build_complete_aio(build_id)

Async version of build_complete.

Source code in stardag/registry/_base.py
async def build_complete_aio(self, build_id: UUID) -> None:
    """Async version of build_complete."""
    self.build_complete(build_id)

build_fail_aio async

build_fail_aio(build_id, error_message=None)

Async version of build_fail.

Source code in stardag/registry/_base.py
async def build_fail_aio(
    self, build_id: UUID, error_message: str | None = None
) -> None:
    """Async version of build_fail."""
    self.build_fail(build_id, error_message)

build_cancel_aio async

build_cancel_aio(build_id)

Async version of build_cancel.

Source code in stardag/registry/_base.py
async def build_cancel_aio(self, build_id: UUID) -> None:
    """Async version of build_cancel."""
    self.build_cancel(build_id)

build_exit_early_aio async

build_exit_early_aio(build_id, reason=None)

Async version of build_exit_early.

Source code in stardag/registry/_base.py
async def build_exit_early_aio(
    self, build_id: UUID, reason: str | None = None
) -> None:
    """Async version of build_exit_early."""
    self.build_exit_early(build_id, reason)

task_register_aio async

task_register_aio(build_id, task)

Async version of task_register.

Source code in stardag/registry/_base.py
async def task_register_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_register."""
    self.task_register(build_id, task)

task_start_aio async

task_start_aio(build_id, task)

Async version of task_start.

Source code in stardag/registry/_base.py
async def task_start_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_start."""
    self.task_start(build_id, task)

task_complete_aio async

task_complete_aio(build_id, task)

Async version of task_complete.

Source code in stardag/registry/_base.py
async def task_complete_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_complete."""
    self.task_complete(build_id, task)

task_fail_aio async

task_fail_aio(build_id, task, error_message=None)

Async version of task_fail.

Source code in stardag/registry/_base.py
async def task_fail_aio(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Async version of task_fail."""
    self.task_fail(build_id, task, error_message)

task_suspend_aio async

task_suspend_aio(build_id, task)

Async version of task_suspend.

Source code in stardag/registry/_base.py
async def task_suspend_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_suspend."""
    self.task_suspend(build_id, task)

task_resume_aio async

task_resume_aio(build_id, task)

Async version of task_resume.

Source code in stardag/registry/_base.py
async def task_resume_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_resume."""
    self.task_resume(build_id, task)

task_cancel_aio async

task_cancel_aio(build_id, task)

Async version of task_cancel.

Source code in stardag/registry/_base.py
async def task_cancel_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_cancel."""
    self.task_cancel(build_id, task)

task_waiting_for_lock_aio async

task_waiting_for_lock_aio(build_id, task, lock_owner=None)

Async version of task_waiting_for_lock.

Source code in stardag/registry/_base.py
async def task_waiting_for_lock_aio(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Async version of task_waiting_for_lock."""
    self.task_waiting_for_lock(build_id, task, lock_owner)

task_upload_artifacts_aio async

task_upload_artifacts_aio(build_id, task, artifacts)

Async version of task_upload_artifacts.

Source code in stardag/registry/_base.py
async def task_upload_artifacts_aio(
    self, build_id: UUID, task: "BaseTask", artifacts: Sequence["Artifact"]
) -> None:
    """Async version of task_upload_artifacts."""
    self.task_upload_artifacts(build_id, task, artifacts)

task_get_metadata_aio async

task_get_metadata_aio(task_id)

Async version of task_get_metadata.

Source code in stardag/registry/_base.py
async def task_get_metadata_aio(self, task_id: UUID) -> TaskMetadata:
    """Async version of task_get_metadata."""
    return self.task_get_metadata(task_id)

NoOpRegistry

Bases: RegistryABC

A registry that does nothing.

Used as a default when no registry is configured.

build_start

build_start(root_tasks=None, description=None)

Return a placeholder build ID.

Source code in stardag/registry/_base.py
def build_start(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Return a placeholder build ID."""
    return UUID("00000000-0000-0000-0000-000000000000")

Configuration

config

Centralized configuration for Stardag SDK.

This module provides a unified configuration system that consolidates: - Target factory settings (target roots) - API registry settings (URL, timeout, environment) - Active context (registry, workspace, environment)

Configuration is loaded from multiple sources with the following priority: 1. Environment variables (STARDAG_*) 2. Project config (.stardag/config.toml in working directory or parents) 3. User config (~/.stardag/config.toml) 4. Defaults

Usage

from stardag.config import get_config

config = get_config() print(config.api.url) print(config.target.roots)

Environment Variables (highest priority): STARDAG_PROFILE - Profile name to use (looks up in config.toml) STARDAG_REGISTRY_URL - Direct registry URL override STARDAG_WORKSPACE_ID - Direct workspace ID override STARDAG_ENVIRONMENT_ID - Direct environment ID override STARDAG_API_KEY - API key for authentication STARDAG_TARGET_ROOTS - JSON dict of target roots (override)

load_config

load_config(use_project_config=True)

Load configuration from all sources.

Priority (highest to lowest): 1. Environment variables (STARDAG_*) 2. Project config (.stardag/config.toml in repo) 3. User config (~/.stardag/config.toml) 4. Defaults

PARAMETER DESCRIPTION
use_project_config

Whether to load .stardag/config.toml from project.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
StardagConfig

Fully resolved StardagConfig.

Source code in stardag/config.py
def load_config(
    use_project_config: bool = True,
) -> StardagConfig:
    """Load configuration from all sources.

    Priority (highest to lowest):
    1. Environment variables (STARDAG_*)
    2. Project config (.stardag/config.toml in repo)
    3. User config (~/.stardag/config.toml)
    4. Defaults

    Args:
        use_project_config: Whether to load .stardag/config.toml from project.

    Returns:
        Fully resolved StardagConfig.
    """
    # 1. Load env vars first (highest priority)
    env_settings = StardagSettings()

    # 2. Load user and project TOML configs
    user_toml = load_toml_file(get_user_config_path())
    project_toml = {}
    if use_project_config:
        project_path = find_project_config()
        if project_path:
            project_toml = load_toml_file(project_path)

    # Merge configs (project overrides user)
    toml_config = _merge_toml_configs(user_toml, project_toml)

    # 3. Resolve profile → (registry, user, workspace, environment)
    profile_name: str | None = None
    registry_name: str | None = None
    registry_url: str | None = None
    user: str | None = None
    workspace_id: str | None = None
    environment_id: str | None = None

    # Check for direct env var overrides first
    if env_settings.registry_url:
        registry_url = env_settings.registry_url
        workspace_id = env_settings.workspace_id
        environment_id = env_settings.environment_id
    # Then check for profile-based config
    elif env_settings.profile:
        profile_name = env_settings.profile
    # Fall back to default profile from config
    elif toml_config.default.get("profile"):
        profile_name = toml_config.default["profile"]

    # If we have a profile, look it up
    if profile_name and not registry_url:
        profile = toml_config.profile.get(profile_name)
        if profile:
            registry_name = profile.registry
            user = profile.user  # Optional user for multi-user support
            workspace_value = profile.workspace  # Could be slug or ID
            environment_value = profile.environment  # Could be slug or ID

            # Look up registry URL from registry name
            registry_config = toml_config.registry.get(registry_name)
            if registry_config:
                registry_url = registry_config.url
            else:
                logger.warning(
                    f"Profile '{profile_name}' references unknown registry '{registry_name}'"
                )

            # Resolve workspace slug to ID if needed
            if _looks_like_uuid(workspace_value):
                workspace_id = workspace_value
            else:
                # Try to resolve from cache
                cached_workspace_id = get_cached_workspace_id(
                    registry_name, workspace_value
                )
                if cached_workspace_id:
                    workspace_id = cached_workspace_id
                else:
                    # Store the slug - will need to be resolved at runtime
                    workspace_id = workspace_value
                    logger.debug(
                        f"Workspace '{workspace_value}' is a slug, not cached. "
                        "Run 'stardag auth refresh' to resolve."
                    )

            # Resolve environment slug to ID if needed
            if _looks_like_uuid(environment_value):
                environment_id = environment_value
            elif workspace_id and _looks_like_uuid(workspace_id):
                # Can only resolve environment if we have a resolved workspace ID
                cached_env_id = get_cached_environment_id(
                    registry_name, workspace_id, environment_value
                )
                if cached_env_id:
                    environment_id = cached_env_id
                else:
                    # Store the slug - will need to be resolved at runtime
                    environment_id = environment_value
                    logger.debug(
                        f"Environment '{environment_value}' is a slug, not cached. "
                        "Run 'stardag auth refresh' to resolve."
                    )
            else:
                # Workspace is not resolved, can't resolve environment either
                environment_id = environment_value
        else:
            logger.warning(f"Profile '{profile_name}' not found in config")

    # 4. Resolve target roots
    # Priority: env > cached > default
    target_roots: dict[str, str]
    env_target_roots = _parse_target_roots_from_env()
    if env_target_roots:
        target_roots = env_target_roots
    elif registry_url and workspace_id and environment_id:
        cached_roots = get_cached_target_roots(
            registry_url, workspace_id, environment_id
        )
        if cached_roots:
            target_roots = cached_roots
        else:
            target_roots = {DEFAULT_TARGET_ROOT_KEY: DEFAULT_TARGET_ROOT}
    else:
        target_roots = {DEFAULT_TARGET_ROOT_KEY: DEFAULT_TARGET_ROOT}

    # 5. Load access token from cache (if we have profile info)
    # If token is expired, try to refresh it automatically
    access_token: str | None = None
    if registry_name and workspace_id and user:
        token_cache_path = get_access_token_cache_path(
            registry_name, workspace_id, user
        )
        if token_cache_path.exists():
            token_data = load_json_file(token_cache_path)
            # Check if token is still valid
            import time

            expires_at = token_data.get("expires_at", 0)
            if expires_at > time.time():
                access_token = token_data.get("access_token")

        # If no valid token in cache, try to refresh it
        if not access_token:
            try:
                # Lazy import to avoid circular dependency
                from stardag._cli.credentials import ensure_access_token

                access_token = ensure_access_token(
                    registry_name, workspace_id, user, quiet=True
                )
            except Exception:
                # Silently fail - user can manually refresh with `stardag auth refresh`
                pass

    # 6. Get API key from env
    api_key = env_settings.api_key or os.environ.get("STARDAG_API_KEY")

    return StardagConfig(
        target=TargetConfig(roots=target_roots),
        api=APIConfig(
            url=registry_url,
            timeout=env_settings.api_timeout or DEFAULT_API_TIMEOUT,
        ),
        context=ContextConfig(
            profile=profile_name,
            registry_name=registry_name,
            user=user,
            workspace_id=workspace_id,
            environment_id=environment_id,
        ),
        access_token=access_token,
        api_key=api_key,
    )