Skip to content

API Reference

Auto-generated API documentation from source code.

Core Module

Stardag: Declarative and composable DAG framework for Python.

Stardag provides a clean Python API for representing persistently stored assets as a declarative Directed Acyclic Graph (DAG).

Basic usage::

import stardag as sd

@sd.task
def get_range(limit: int) -> list[int]:
    return list(range(limit))

@sd.task
def get_sum(integers: sd.Depends[list[int]]) -> int:
    return sum(integers)

task = get_sum(integers=get_range(limit=10))
sd.build(task)
print(task.output().load())  # 45

Core components:

  • :func:task - Decorator for creating tasks from functions
  • :class:Task - Base class for all tasks
  • :class:AutoTask - Task with automatic filesystem targets
  • :class:Depends - Dependency injection type annotation
  • :func:build - Execute task and its dependencies

See https://docs.stardag.com for full documentation.

TODO: Expand docstrings for all public API components.

Depends module-attribute

Depends = Annotated[_DependsT, _DependsOnMarker]

TaskLoads module-attribute

TaskLoads = Annotated[
    Task[LoadableTarget[LoadedT_co]], Polymorphic()
]

TaskStruct module-attribute

TaskStruct = Union[
    "BaseTask",
    Sequence["TaskStruct"],
    Mapping[str, "TaskStruct"],
]

target_factory_provider module-attribute

target_factory_provider = resource_provider(
    type_=TargetFactory, default_factory=TargetFactory
)

Task

Bases: BaseTask, Generic[TargetType]

complete

complete()

Check if the task is complete.

complete_aio async

complete_aio()

Asynchronously check if the task is complete.

output abstractmethod

output()

The task output target.

AutoTask

Bases: Task[LoadableSaveableFileSystemTarget[LoadedT]], ABC, Generic[LoadedT]

A base class for automatically serializing task outputs.

The output of an AutoTask is a LoadableSaveableFileSystemTarget that uses a serializer inferred from the generic type parameter LoadedT.

The output file path is automatically constructed based on the task's namespace, name, version, and unique ID and has the following structure:

[<relpath_base>/][<namespace>/]<name>/v<version>/[<relpath_extra>/]
<id>[:2]/<id>[2:4]/<id>[/<relpath_filename>].<relpath_extension>

You can override the following properties to customize the output path: _relpath_base, _relpath_extra, _relpath_filename, and _relpath_extension.

See stardag.target.serialize.get_serializer for details on how the serializer is inferred from the generic type parameter, and how to customize it.

Example:

import stardag as sd

class MyAutoTask(sd.AutoTask[dict[str, int]]):
    def run(self):
        self.output().save({"a": 1, "b": 2})

my_task = MyAutoTask()

print(my_task.output())
# Serializable(../MyAutoTask/03/6f/036f6e71-1b3c-54b8-aec1-182359f1e09a.json)

print(my_task.output().serializer)
# <stardag.target.serialize.JSONSerializer at 0x1064e4710>

serializer property

serializer

The serializer used for this task's output.

__map_generic_args_to_ancestor__ classmethod

__map_generic_args_to_ancestor__(ancestor_origin, args)

Map generic args from AutoTask to how they appear on an ancestor class.

This enables type compatibility checking when using AutoTask with TaskLoads. For example, AutoTask[str] maps to Task[LoadableSaveableFileSystemTarget[str]], which is compatible with TaskLoads[str] (= Task[LoadableTarget[str]]) because LoadableSaveableFileSystemTarget is a subtype of LoadableTarget.

PARAMETER DESCRIPTION
ancestor_origin

The ancestor class to map args to (e.g., Task)

TYPE: type

args

The generic args of this class (e.g., (str,) for AutoTask[str])

TYPE: tuple

RETURNS DESCRIPTION
tuple | None

The mapped args for the ancestor, or None if mapping is not applicable.

BaseTask

Bases: PolymorphicRoot

__init_subclass__

__init_subclass__(**kwargs)

Validate that subclasses implement either run() or run_aio().

Also wraps run() and run_aio() methods with precheck validation.

complete abstractmethod

complete()

Declare if the task is complete.

complete_aio async

complete_aio()

Asynchronously declare if the task is complete.

run

run()

Execute the task logic (sync).

Override this method for synchronous tasks. If you only override run_aio(), this method will automatically run it via asyncio.run().

RETURNS DESCRIPTION
None | Generator[TaskStruct, None, None]

None for simple tasks, or a Generator yielding TaskStruct for

None | Generator[TaskStruct, None, None]

tasks with dynamic dependencies (See Dynamic Dependencies Contract below).

RAISES DESCRIPTION
RuntimeError

If called from within an existing event loop when only run_aio() is implemented. In that case, call run_aio() directly instead.

NotImplementedError

If run_aio() is an async generator (dynamic deps). Async generators cannot be automatically converted to sync generators.

Dynamic Dependencies Contract: When a task yields dynamic dependencies via a generator, the BUILD SYSTEM guarantees that ALL yielded tasks are COMPLETE before the generator is resumed. The task can rely on this contract:

def run(self):
    # Do some initial work to get info about what additional dependencies
    # are needed
    initial_data = "..."

    # Yield deps we need to be built first
    task_a = TaskA(input=initial_data)
    task_b = TaskB(input=initial_data)
    yield [task_a, task_b]

    # CONTRACT: When we reach here, ALL deps are complete.
    # We can safely access their outputs.
    result_a = task_a.output().load()
    result_b = task_b.output().load()

    # Yield more deps if needed
    task_c = TaskC(input=result_a)
    yield task_c

    # Again, TaskC is complete when we reach here
    self.output().save(task_c.output().load() + result_b)

This contract is essential for correctness - tasks can depend on previously yielded tasks being complete before continuing execution.

run_aio async

run_aio()

Execute the task logic (async).

Override this method for asynchronous tasks. If you only override run(), this method will automatically delegate to it.

For dynamic dependencies, you can use 'yield' which makes this an async generator. Note that async generator methods have different type signatures that may require type: ignore comments.

RETURNS DESCRIPTION
None | Generator[TaskStruct, None, None]

None for simple tasks, or a Generator/AsyncGenerator for

None | Generator[TaskStruct, None, None]

tasks with dynamic dependencies.

Dynamic Dependencies Contract

Same as run() - the build system guarantees that ALL yielded tasks are COMPLETE before the generator is resumed. See run() docstring for detailed documentation and examples.

registry_assets

registry_assets()

Return assets to be stored in the registry after task completion.

Override this method to expose rich outputs (reports, summaries, structured data) that will be viewable in the registry UI.

This method is called after the task completes successfully. It should be stateless - loading any required data from the task's target rather than relying on in-memory state.

RETURNS DESCRIPTION
list[RegistryAsset]

List of registry assets (MarkdownRegistryAsset, JSONRegistryAsset, etc.)

registry_assets_aio

registry_assets_aio()

Asynchronously return assets to be stored in the registry after task completion.

resolve classmethod

resolve(namespace, name, extra)

Override PolymorphicRoot.resolve to handle AliasTask deserialization.

from_registry classmethod

from_registry(id, registry=None)

Instantiate the task from the registry.

PARAMETER DESCRIPTION
id

The UUID of the task to alias.

TYPE: UUID

registry

An optional registry instance to use for loading metadata. If not provided, the default registry from registry_provider will be used.

TYPE: Union[RegistryABC, None] DEFAULT: None

Returns: An AliasTask instance referencing the specified task.

TaskRef dataclass

TaskRef(name, version, id)

task

task(
    _func: Callable[_PWrapped, LoadedT],
    *,
    name: str | None = None,
    version: str = "",
    relpath: RelpathSettings
    | _RelpathOverride
    | None = None,
) -> Type[_FunctionTask[LoadedT, _PWrapped]]
task(
    *,
    name: str | None = None,
    version: str = "",
    relpath: RelpathSettings
    | _RelpathOverride
    | None = None,
) -> _TaskWrapper
task(_func=None, *, name=None, version='', relpath=None)

build_aio async

build_aio(
    tasks,
    task_executor=None,
    fail_mode=FAIL_FAST,
    registry=None,
    max_concurrent_discover=50,
    global_lock_manager=None,
    global_lock_config=None,
    resume_build_id=None,
)

Build tasks concurrently using hybrid async/thread/process execution.

This is the main build function for production use. It: - Discovers all tasks in the DAG(s) - Schedules tasks for execution when dependencies are met - Handles dynamic dependencies via generator suspension - Supports multiple root tasks (built concurrently) - Routes tasks to async/thread/process based on ExecutionModeSelector - Manages all registry interactions (start/complete/fail task) - Optionally uses global concurrency locks for distributed execution

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

task_executor

TaskExecutor for executing tasks (default: HybridConcurrentTaskExecutor). Use RoutedTaskExecutor to route tasks to different executors (e.g., Modal).

TYPE: TaskExecutorABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

registry

Registry for tracking builds (default: from init_registry())

TYPE: RegistryABC | None DEFAULT: None

max_concurrent_discover

Maximum concurrent completion checks during DAG discovery. Higher values speed up discovery for large DAGs with remote targets.

TYPE: int DEFAULT: 50

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution to ensure exactly-once execution across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

build_sequential

build_sequential(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    dual_run_default="sync",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
)

Sync API for building tasks sequentially.

This is intended primarily for debugging and testing.

Task execution policy: - Sync-only tasks: run via run() - Async-only tasks: run via asyncio.run(run_aio()). (Does not work if called from within an existing event loop.) - Dual tasks: run via run() if dual_run_default=="sync" (default), else (dual_run_default=="async") via asyncio.run(run_aio()).

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: list[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

dual_run_default

For dual tasks, prefer sync or async execution

TYPE: Literal['sync', 'async'] DEFAULT: 'sync'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

build_sequential_aio async

build_sequential_aio(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    sync_run_default="blocking",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
)

Async API for building tasks sequentially.

This is intended primarily for debugging and testing.

Task execution policy: - Sync-only tasks: runs blocking via run() in main event loop if sync_run_default=="blocking" (default), else (sync_run_default=="thread") in thread pool. - Async-only tasks: run via await run_aio(). - Dual tasks: run via await run_aio().

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: list[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

sync_run_default

For sync-only tasks, block or use thread pool

TYPE: Literal['thread', 'blocking'] DEFAULT: 'blocking'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

namespace

namespace(namespace, scope)

Set the task namespace for the module and any submodules.

PARAMETER DESCRIPTION
namespace

The namespace to set for the module.

TYPE: str

scope

The module scope, typically passed as __name__.

TYPE: str

Usage:

```python import stardag as sd sd.namespace("my_custom_namespace", name)

class MyNamespacedTask(sd.Task[int]): a: int

def run(self):
    self.output().save(self.a)

assert MyNamespacedTask.get_namespace() == "my_custom_namespace"

auto_namespace

auto_namespace(scope)

Set the task namespace for the module to the module import path.

PARAMETER DESCRIPTION
scope

The module scope, typically passed as __name__.

TYPE: str

Usage:

import stardag as sd

sd.auto_namespace(__name__)

class MyAutoNamespacedTask(sd.AutoTask[int]):
    a: int

    def run(self):
        self.output().save(self.a)

assert MyAutoNamespacedTask.get_namespace() == __name__

get_target

get_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

get_directory_target

get_directory_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Build Module

build

Build module for stardag.

This module provides functions and classes for building task DAGs.

Primary build functions: - build(): Concurrent build, recommended for real workloads from a sync context - build_aio(): Async concurrent build, recommended for real workloads from an async context or already running event loop - build_sequential(): Sync sequential build (for debugging) - build_sequential_aio(): Async sequential build (for debugging)

Task executor: - HybridConcurrentTaskExecutor: Routes tasks to async/thread/process pools

Interfaces: - TaskExecutorABC: Abstract base class for custom task executors - ExecutionModeSelector: Protocol for custom execution mode selection

Global concurrency locking: - GlobalConcurrencyLockManager: Protocol for distributed lock implementations - LockHandle: Protocol for lock handles (async context manager) - GlobalLockConfig: Configuration for global locking behavior

BuildSummary dataclass

BuildSummary(status, task_count, build_id=None, error=None)

Summary of a build execution.

__repr__

__repr__()

Return a human-readable summary of the build.

Source code in stardag/build/_base.py
def __repr__(self) -> str:
    """Return a human-readable summary of the build."""
    tc = self.task_count
    status_icon = "✓" if self.status == BuildExitStatus.SUCCESS else "✗"
    lines = [
        f"Build {self.status.value.upper()} {status_icon}",
    ]
    if self.build_id:
        lines.append(f"  Build ID: {self.build_id}")
    lines.extend(
        [
            f"  Discovered: {tc.discovered}",
            f"  Previously completed: {tc.previously_completed}",
            f"  Succeeded: {tc.succeeded}",
            f"  Failed: {tc.failed}",
        ]
    )
    if tc.pending > 0:
        lines.append(f"  Pending: {tc.pending}")
    if self.error:
        lines.append(f"  Error: {self.error}")
    return "\n".join(lines)

BuildExitStatus

Bases: StrEnum

FailMode

Bases: StrEnum

How to handle task failures during build.

ATTRIBUTE DESCRIPTION
FAIL_FAST

Stop the build at the first task failure.

CONTINUE

Continue executing all tasks whose dependencies are met, even if some tasks have failed.

HybridConcurrentTaskExecutor

HybridConcurrentTaskExecutor(
    execution_mode_selector=None,
    max_async_workers=10,
    max_thread_workers=10,
    max_process_workers=None,
)

Bases: TaskExecutorABC

Task executor with async, thread, and process pools.

Routes tasks to appropriate execution context based on ExecutionModeSelector. Handles generator suspension for dynamic dependencies.

Note: This executor does not handle registry calls - those are managed by the build() function. The executor only executes tasks and returns results.

For routing tasks to different executors (e.g., some to Modal, some local), use RoutedTaskExecutor to compose multiple executors.

Alternative: For fully async multiprocessing without thread pools, one could implement an AIOMultiprocessingTaskExecutor using libraries like aiomultiprocess.

PARAMETER DESCRIPTION
execution_mode_selector

Callable to select execution mode per task.

TYPE: ExecutionModeSelector | None DEFAULT: None

max_async_workers

Maximum concurrent async tasks (semaphore-based).

TYPE: int DEFAULT: 10

max_thread_workers

Maximum concurrent thread pool workers.

TYPE: int DEFAULT: 10

max_process_workers

Maximum concurrent process pool workers.

TYPE: int | None DEFAULT: None

Source code in stardag/build/_concurrent.py
def __init__(
    self,
    execution_mode_selector: ExecutionModeSelector | None = None,
    max_async_workers: int = 10,
    max_thread_workers: int = 10,
    max_process_workers: int | None = None,
) -> None:
    self.execution_mode_selector = (
        execution_mode_selector or DefaultExecutionModeSelector()
    )
    self.max_async_workers = max_async_workers
    self.max_thread_workers = max_thread_workers
    self.max_process_workers = max_process_workers

    # Pools - initialized in setup()
    self._async_semaphore: asyncio.Semaphore | None = None
    self._thread_pool: ThreadPoolExecutor | None = None
    self._process_pool: ProcessPoolExecutor | None = None

    # Track suspended generators (task_id -> generator)
    # For in-process execution where we can suspend and resume
    self._suspended_generators: dict[UUID, Generator[TaskStruct, None, None]] = {}

    # Track tasks pending re-execution (task_id -> True)
    # For cross-process/remote execution: when task yields incomplete deps,
    # it's re-executed from scratch after deps complete (idempotent re-execution)
    self._pending_reexecution: set[UUID] = set()

setup async

setup()

Initialize worker pools.

Source code in stardag/build/_concurrent.py
async def setup(self) -> None:
    """Initialize worker pools."""
    import multiprocessing as mp

    self._async_semaphore = asyncio.Semaphore(self.max_async_workers)
    self._thread_pool = ThreadPoolExecutor(max_workers=self.max_thread_workers)
    if self.max_process_workers:
        # Use 'spawn' explicitly for cross-platform compatibility.
        # Python 3.14 changed the default from 'fork' to 'forkserver' on Linux,
        # which can cause issues with environment variable inheritance.
        # 'spawn' is the safest option and works consistently across platforms.
        self._process_pool = ProcessPoolExecutor(
            max_workers=self.max_process_workers,
            mp_context=mp.get_context("spawn"),
        )

teardown async

teardown()

Shutdown worker pools.

Source code in stardag/build/_concurrent.py
async def teardown(self) -> None:
    """Shutdown worker pools."""
    if self._thread_pool:
        self._thread_pool.shutdown(wait=True)
        self._thread_pool = None
    if self._process_pool:
        self._process_pool.shutdown(wait=True)
        self._process_pool = None
    self._async_semaphore = None
    self._suspended_generators.clear()
    self._pending_reexecution.clear()

submit async

submit(task)

Execute a task and return result.

Note: This method does not make any registry calls. The build function is responsible for calling start_task, complete_task, and fail_task.

Source code in stardag/build/_concurrent.py
async def submit(self, task: BaseTask) -> None | TaskStruct | Exception:
    """Execute a task and return result.

    Note: This method does not make any registry calls. The build function
    is responsible for calling start_task, complete_task, and fail_task.
    """
    # Check if we're resuming a suspended generator (in-process dynamic deps)
    if task.id in self._suspended_generators:
        return self._resume_generator(task)

    # Check if task is pending re-execution (cross-process dynamic deps)
    # Task yielded incomplete deps, deps are now built, re-execute task
    if task.id in self._pending_reexecution:
        self._pending_reexecution.discard(task.id)

    mode = self.execution_mode_selector(task)

    try:
        result = await self._execute_task(task, mode)
        return self._handle_result(task, result)
    except Exception as e:
        return e

TaskExecutorABC

Bases: ABC

Abstract base for task executors.

Receives tasks and executes them according to some policy. The executor is responsible for: - Executing tasks in the appropriate context (async/thread/process) - Handling generator suspension for dynamic dependencies

The executor is NOT responsible for: - Dependency resolution - handled by build() - Registry calls (start_task, complete_task, etc.) - handled by build()

submit abstractmethod async

submit(task)

Submit a task for execution.

PARAMETER DESCRIPTION
task

The task to execute.

TYPE: BaseTask

RETURNS DESCRIPTION
None | TaskStruct | Exception
  • None: Task completed successfully with no dynamic dependencies.
None | TaskStruct | Exception
  • TaskStruct: Task "suspended" because it yielded dynamic dependencies. The returned TaskStruct contains the discovered dependencies.
None | TaskStruct | Exception
  • Exception: Task failed with the given exception.
Source code in stardag/build/_base.py
@abstractmethod
async def submit(self, task: BaseTask) -> None | TaskStruct | Exception:
    """Submit a task for execution.

    Args:
        task: The task to execute.

    Returns:
        - None: Task completed successfully with no dynamic dependencies.
        - TaskStruct: Task "suspended" because it yielded dynamic dependencies.
            The returned TaskStruct contains the discovered dependencies.
        - Exception: Task failed with the given exception.
    """
    ...

setup abstractmethod async

setup()

Setup any resources needed for the task runner (pools, etc.).

Source code in stardag/build/_base.py
@abstractmethod
async def setup(self) -> None:
    """Setup any resources needed for the task runner (pools, etc.)."""
    ...

teardown abstractmethod async

teardown()

Teardown any resources used by the task executor.

Source code in stardag/build/_base.py
@abstractmethod
async def teardown(self) -> None:
    """Teardown any resources used by the task executor."""
    ...

build

build(
    tasks,
    task_executor=None,
    fail_mode=FAIL_FAST,
    registry=None,
    max_concurrent_discover=50,
    global_lock_manager=None,
    global_lock_config=None,
    resume_build_id=None,
)

Build tasks concurrently (sync wrapper for build_aio).

This is the recommended entry point for building tasks from synchronous code. Wraps the async build_aio() function.

Note

This function cannot be called from within an already running event loop. If you're in an async context (e.g., inside an async function, or using frameworks like Playwright, FastAPI, etc.), use await build_aio() instead.

Source code in stardag/build/_concurrent.py
def build(
    tasks: Sequence[BaseTask] | BaseTask,
    task_executor: TaskExecutorABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    registry: RegistryABC | None = None,
    max_concurrent_discover: int = 50,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
    resume_build_id: UUID | None = None,
) -> BuildSummary:
    """Build tasks concurrently (sync wrapper for build_aio).

    This is the recommended entry point for building tasks from synchronous code.
    Wraps the async build_aio() function.

    Note:
        This function cannot be called from within an already running event loop.
        If you're in an async context (e.g., inside an async function, or using
        frameworks like Playwright, FastAPI, etc.), use `await build_aio()` instead.
    """
    try:
        return asyncio.run(
            build_aio(
                tasks,
                task_executor,
                fail_mode,
                registry,
                max_concurrent_discover,
                global_lock_manager,
                global_lock_config,
                resume_build_id,
            )
        )
    except RuntimeError as e:
        if "cannot be called from a running event loop" in str(e):
            raise RuntimeError(
                "build() cannot be used from within an already running event loop. "
                "Use 'await build_aio()' instead, or 'build_sequential()' if you "
                "need synchronous execution without an event loop."
            ) from e
        raise

build_aio async

build_aio(
    tasks,
    task_executor=None,
    fail_mode=FAIL_FAST,
    registry=None,
    max_concurrent_discover=50,
    global_lock_manager=None,
    global_lock_config=None,
    resume_build_id=None,
)

Build tasks concurrently using hybrid async/thread/process execution.

This is the main build function for production use. It: - Discovers all tasks in the DAG(s) - Schedules tasks for execution when dependencies are met - Handles dynamic dependencies via generator suspension - Supports multiple root tasks (built concurrently) - Routes tasks to async/thread/process based on ExecutionModeSelector - Manages all registry interactions (start/complete/fail task) - Optionally uses global concurrency locks for distributed execution

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: Sequence[BaseTask] | BaseTask

task_executor

TaskExecutor for executing tasks (default: HybridConcurrentTaskExecutor). Use RoutedTaskExecutor to route tasks to different executors (e.g., Modal).

TYPE: TaskExecutorABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

registry

Registry for tracking builds (default: from init_registry())

TYPE: RegistryABC | None DEFAULT: None

max_concurrent_discover

Maximum concurrent completion checks during DAG discovery. Higher values speed up discovery for large DAGs with remote targets.

TYPE: int DEFAULT: 50

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution to ensure exactly-once execution across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

Source code in stardag/build/_concurrent.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
async def build_aio(
    tasks: Sequence[BaseTask] | BaseTask,
    task_executor: TaskExecutorABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    registry: RegistryABC | None = None,
    max_concurrent_discover: int = 50,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
    resume_build_id: UUID | None = None,
) -> BuildSummary:
    """Build tasks concurrently using hybrid async/thread/process execution.

    This is the main build function for production use. It:
    - Discovers all tasks in the DAG(s)
    - Schedules tasks for execution when dependencies are met
    - Handles dynamic dependencies via generator suspension
    - Supports multiple root tasks (built concurrently)
    - Routes tasks to async/thread/process based on ExecutionModeSelector
    - Manages all registry interactions (start/complete/fail task)
    - Optionally uses global concurrency locks for distributed execution

    Args:
        tasks: List of root tasks to build (and their dependencies) or a single root
            task.
        task_executor: TaskExecutor for executing tasks (default: HybridConcurrentTaskExecutor).
            Use RoutedTaskExecutor to route tasks to different executors (e.g., Modal).
        fail_mode: How to handle task failures
        registry: Registry for tracking builds (default: from init_registry())
        max_concurrent_discover: Maximum concurrent completion checks during DAG discovery.
            Higher values speed up discovery for large DAGs with remote targets.
        global_lock_manager: Global concurrency lock manager for distributed builds.
            If provided with global_lock_config.enabled=True, tasks will acquire locks
            before execution to ensure exactly-once execution across processes.
        global_lock_config: Configuration for global locking behavior.
        resume_build_id: Optional build ID to resume. If provided, continues tracking
            events under this existing build instead of starting a new one.

    Returns:
        BuildSummary with status, task counts, and build_id
    """
    if isinstance(tasks, BaseTask):
        tasks = [tasks]
    else:
        tasks = list(tasks)
        for idx, task in enumerate(tasks):
            if not isinstance(task, BaseTask):
                raise ValueError(
                    f"Invalid task at index {idx}: {task} (must be BaseTask)"
                )

    # Determine registry: explicit > init_registry()
    if registry is None:
        registry = init_registry()
    logger.info(f"Using registry: {type(registry).__name__}")

    if task_executor is None:
        task_executor = HybridConcurrentTaskExecutor()

    # Setup global lock selector
    if global_lock_config is None:
        global_lock_config = GlobalLockConfig()
    lock_selector: GlobalLockSelector = DefaultGlobalLockSelector(global_lock_config)

    # Track locks held by this build for manual release
    held_locks: set[str] = set()

    task_count = TaskCount()
    completion_cache: set[UUID] = set()
    error: BaseException | None = None

    # Task execution states
    task_states: dict[UUID, TaskExecutionState] = {}
    # Events for completion signaling
    completion_events: dict[UUID, asyncio.Event] = {}
    # Currently executing tasks
    executing: set[UUID] = set()

    # Tasks found to be already complete during discovery (to register after build starts)
    previously_completed_tasks: list[BaseTask] = []

    # Synchronization for concurrent discovery
    discover_lock = asyncio.Lock()
    discover_semaphore = asyncio.Semaphore(max_concurrent_discover)

    async def discover(task: BaseTask) -> None:
        """Recursively discover tasks, stopping at already-complete tasks.

        This optimization avoids traversing into dependency subgraphs that
        are already complete, which can significantly reduce discovery time
        for large DAGs with cached results.

        Uses concurrent recursion with TaskGroup for parallel discovery,
        with a lock protecting shared data structures and a semaphore
        limiting concurrent completion checks.
        """
        # Check if already discovered and reserve our spot (with lock)
        async with discover_lock:
            if task.id in task_states:
                return
            static_deps = flatten_task_struct(task.requires())
            task_states[task.id] = TaskExecutionState(
                task=task, static_deps=static_deps
            )
            completion_events[task.id] = asyncio.Event()
            task_count.discovered += 1

        # Check completion outside lock (I/O bound, use semaphore to limit concurrency)
        async with discover_semaphore:
            is_complete = await task.complete_aio()

        if is_complete:
            async with discover_lock:
                completion_cache.add(task.id)
                task_states[task.id].completed = True
                completion_events[task.id].set()
                task_count.previously_completed += 1
                previously_completed_tasks.append(task)
            # Don't recurse into deps - they're already built
            return

        # Task not complete - recurse into dependencies concurrently
        async with asyncio.TaskGroup() as tg:
            for dep in static_deps:
                tg.create_task(discover(dep))

    # Discover all tasks from roots concurrently
    async with asyncio.TaskGroup() as tg:
        for root in tasks:
            tg.create_task(discover(root))

    # Start or resume build
    if resume_build_id is not None:
        build_id = resume_build_id
        logger.info(f"Resuming build: {build_id}")
    else:
        build_id = await registry.build_start_aio(root_tasks=tasks)
        logger.info(f"Started build: {build_id}")

    # Register previously completed tasks so they appear in the build's task list
    # These will get TASK_REFERENCED events since they already exist
    for task in previously_completed_tasks:
        try:
            await registry.task_register_aio(build_id, task)
        except Exception as reg_err:
            logger.warning(f"Failed to register previously completed task: {reg_err}")

    await task_executor.setup()

    # Map task_id -> asyncio.Task for in-flight executions
    pending_futures: dict[UUID, asyncio.Task] = {}

    async def process_result(
        task: BaseTask,
        result: LockAcquisitionResult | BaseException | TaskStruct | None,
    ):
        """Process a single task result (including lock acquisition results)."""
        nonlocal error
        state = task_states[task.id]

        # Handle lock acquisition results (lock was not acquired)
        if isinstance(result, LockAcquisitionResult):
            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                # Task completed externally - wait for visibility then mark complete
                await wait_for_completion_with_retry(task)
                state.completed = True
                completion_cache.add(task.id)
                completion_events[task.id].set()
                task_count.previously_completed += 1
            else:
                # ERROR, HELD_BY_OTHER, or CONCURRENCY_LIMIT_REACHED after timeout
                msg = f"Lock {result.status.value}"
                if result.error_message:
                    msg += f": {result.error_message}"
                state.exception = Exception(msg)
                task_count.failed += 1
                error = state.exception
                if fail_mode == FailMode.FAIL_FAST:
                    raise state.exception
            return

        # Handle normal task execution results
        if isinstance(result, BaseException):
            # Task failed - release lock (not completed) and notify registry
            await release_lock_for_task(task, completed=False)
            try:
                await registry.task_fail_aio(build_id, task, str(result))
            except Exception as reg_err:
                logger.warning(f"Failed to notify registry of task failure: {reg_err}")
            state.exception = result
            task_count.failed += 1
            error = result
            if fail_mode == FailMode.FAIL_FAST:
                raise result

        elif result is None:
            # Task completed - release lock (completed) and notify registry
            await release_lock_for_task(task, completed=True)
            try:
                await registry.task_complete_aio(build_id, task)
                assets = task.registry_assets_aio()
                if assets:
                    await registry.task_upload_assets_aio(build_id, task, assets)
            except Exception as reg_err:
                logger.warning(
                    f"Failed to notify registry of task completion: {reg_err}"
                )
            state.completed = True
            completion_cache.add(task.id)
            completion_events[task.id].set()
            task_count.succeeded += 1

        else:
            # Dynamic deps returned (TaskStruct) - task is suspended
            # Note: Lock is still held - release on final completion/failure
            dynamic_deps = flatten_task_struct(result)

            # Notify registry that task is suspended waiting for dynamic deps
            try:
                await registry.task_suspend_aio(build_id, task)
            except Exception as reg_err:
                logger.warning(
                    f"Failed to notify registry of task suspension: {reg_err}"
                )

            # Discover any new dynamic deps (discover handles counting)
            for dep in dynamic_deps:
                if dep.id not in task_states:
                    await discover(dep)

            # Accumulate dynamic deps (don't overwrite)
            existing_dyn_ids = {d.id for d in state.dynamic_deps}
            for dep in dynamic_deps:
                if dep.id not in existing_dyn_ids:
                    state.dynamic_deps.append(dep)

    def find_ready_tasks() -> list[BaseTask]:
        """Find tasks that are ready to execute."""
        ready: list[BaseTask] = []
        for state in task_states.values():
            if state.completed or state.task.id in executing:
                continue
            if state.exception is not None:
                continue

            # Check all deps (static + dynamic) complete
            all_deps_complete = all(
                task_states[dep.id].completed for dep in state.all_deps
            )
            if all_deps_complete:
                ready.append(state.task)
                executing.add(state.task.id)
        return ready

    async def wait_for_completion_with_retry(task: BaseTask) -> bool:
        """Wait for task.complete_aio() to return True (handles eventual consistency).

        When the lock reports ALREADY_COMPLETED, the task output may not be
        immediately visible due to eventual consistency (e.g., S3). This function
        retries until the output exists or timeout is reached.
        """
        assert global_lock_config is not None
        timeout = global_lock_config.completion_retry_timeout_seconds
        interval = global_lock_config.completion_retry_interval_seconds
        start_time = asyncio.get_event_loop().time()

        while True:
            if await task.complete_aio():
                return True
            elapsed = asyncio.get_event_loop().time() - start_time
            if elapsed >= timeout:
                logger.warning(
                    f"Task {task.id} reported as completed by lock service, "
                    f"but complete_aio() returned False after {timeout}s. "
                    "Treating as complete (eventual consistency)."
                )
                return True
            await asyncio.sleep(interval)

    async def release_lock_for_task(task: BaseTask, completed: bool) -> None:
        """Release lock for task if held."""
        if global_lock_manager is None:
            return
        task_id = str(task.id)
        if task_id not in held_locks:
            return
        try:
            await global_lock_manager.release(task_id, task_completed=completed)
        except Exception as e:
            logger.warning(f"Failed to release lock for task {task_id}: {e}")
        finally:
            held_locks.discard(task_id)

    async def acquire_lock_with_completion_check(
        task: BaseTask,
        task_id: str,
        lock_manager: GlobalConcurrencyLockManager,
        config: GlobalLockConfig,
    ) -> LockAcquisitionResult:
        """Acquire lock with retry/backoff and external completion checking.

        During the retry loop, we also check if the task was completed externally
        (e.g., by another process). This handles the race condition where:
        1. Lock is held by another process
        2. That process completes the task and releases the lock
        3. Before we can re-acquire, we should notice the task is complete
        """
        timeout = config.lock_wait_timeout_seconds
        current_interval = config.lock_wait_initial_interval_seconds
        max_interval = config.lock_wait_max_interval_seconds
        backoff_factor = config.lock_wait_backoff_factor
        state = task_states[task.id]
        notified_waiting = False  # Track if we've already notified registry

        loop = asyncio.get_event_loop()
        start_time = loop.time()

        while True:
            # Try to acquire the lock
            result = await lock_manager.acquire(task_id)

            if result.status == LockAcquisitionStatus.ACQUIRED:
                # Clear waiting flag if we were waiting
                state.waiting_for_lock = False
                return result

            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                state.waiting_for_lock = False
                return result

            if result.status == LockAcquisitionStatus.ERROR:
                state.waiting_for_lock = False
                return result

            # HELD_BY_OTHER or CONCURRENCY_LIMIT_REACHED - retry with backoff
            # Mark task as waiting for lock and notify registry (once)
            if not notified_waiting:
                state.waiting_for_lock = True
                notified_waiting = True
                lock_owner = result.error_message  # May contain owner info
                try:
                    await registry.task_waiting_for_lock_aio(build_id, task, lock_owner)
                except Exception as e:
                    logger.warning(f"Failed to notify registry of lock wait: {e}")

            if timeout is None:
                return result

            elapsed = loop.time() - start_time
            if elapsed >= timeout:
                return LockAcquisitionResult(
                    status=result.status,
                    acquired=False,
                    error_message=f"Timeout after {timeout}s: {result.status.value}",
                )

            # Check if task was completed externally during the wait
            if await task.complete_aio():
                state.waiting_for_lock = False
                return LockAcquisitionResult(
                    status=LockAcquisitionStatus.ALREADY_COMPLETED,
                    acquired=False,
                )

            logger.debug(
                f"Lock for {task_id} unavailable ({result.status}), "
                f"retrying in {current_interval:.1f}s..."
            )
            await asyncio.sleep(current_interval)
            current_interval = min(current_interval * backoff_factor, max_interval)

    async def submit_with_lock(
        task: BaseTask,
    ) -> LockAcquisitionResult | BaseException | TaskStruct | None:
        """Submit task for execution, acquiring lock first if enabled.

        This wraps lock acquisition + task execution as a single async unit,
        allowing the main loop to remain non-blocking while waiting for locks.

        Returns:
            - LockAcquisitionResult: If lock was not acquired (ALREADY_COMPLETED,
                ERROR, or timeout). The task was NOT executed.
            - BaseException | TaskStruct | None: Normal task result if lock was
                acquired (or locking wasn't needed) and task was executed.
        """
        state = task_states[task.id]
        use_lock = global_lock_manager is not None and lock_selector(task)

        if use_lock:
            assert global_lock_manager is not None  # For type checker
            task_id_str = str(task.id)

            # Always acquire lock (even if we think we hold it from a previous
            # dynamic deps yield). This handles:
            # 1. Fresh task execution - normal acquire
            # 2. Task resuming after dynamic deps - re-acquire is safe since
            #    we're the same owner, and handles the case where lock expired
            #    during the wait for deps
            lock_result = await acquire_lock_with_completion_check(
                task, task_id_str, global_lock_manager, global_lock_config
            )

            if lock_result.status != LockAcquisitionStatus.ACQUIRED:
                # Lock not acquired - return the lock result for handling
                return lock_result

            # Lock acquired - track it for release later
            held_locks.add(task_id_str)

        # Now we have the lock (or locking wasn't needed)
        # Start the task in registry
        if not state.started:
            await registry.task_start_aio(build_id, task)
            state.started = True
        elif state.dynamic_deps:
            # Task was suspended waiting for dynamic deps, now resuming
            await registry.task_resume_aio(build_id, task)

        # Execute the task via the executor
        return await task_executor.submit(task)

    try:
        # Main build loop using as_completed pattern
        while True:
            # Check if all roots complete
            all_roots_complete = all(task_states[root.id].completed for root in tasks)
            if all_roots_complete:
                break

            # Find and submit ready tasks
            ready = find_ready_tasks()

            # Submit ready tasks (lock acquisition + execution as single async unit)
            for task in ready:
                async_task = asyncio.create_task(submit_with_lock(task))
                pending_futures[task.id] = async_task

            # If nothing is pending, check for deadlock or completion
            if not pending_futures:
                incomplete = [
                    s
                    for s in task_states.values()
                    if not s.completed and s.exception is None
                ]
                if incomplete:
                    # Check if all incomplete tasks are blocked by failed dependencies
                    def has_failed_dep(state: TaskExecutionState) -> bool:
                        for dep in state.all_deps:
                            dep_state = task_states[dep.id]
                            if dep_state.exception is not None:
                                return True
                        return False

                    truly_blocked = [s for s in incomplete if not has_failed_dep(s)]
                    if truly_blocked:
                        # Real deadlock - tasks blocked without failed deps
                        raise RuntimeError(
                            f"Deadlock: {len(truly_blocked)} tasks cannot proceed. "
                            f"Tasks: {[s.task.id for s in truly_blocked[:5]]}"
                        )
                    # All remaining tasks are blocked by failed deps - exit gracefully
                break

            # Wait for at least one task to complete
            done, _ = await asyncio.wait(
                pending_futures.values(), return_when=asyncio.FIRST_COMPLETED
            )

            # Process completed tasks
            for async_task in done:
                # Find which task this was
                task_id = None
                for tid, fut in pending_futures.items():
                    if fut is async_task:
                        task_id = tid
                        break
                assert task_id is not None

                # Remove from pending and executing
                del pending_futures[task_id]
                executing.discard(task_id)

                # Get result and process
                task = task_states[task_id].task
                try:
                    result = async_task.result()
                except Exception as e:
                    result = e
                await process_result(task, result)

            # Check for exit-early condition: all remaining tasks waiting for locks
            if global_lock_config.exit_early_when_all_locked:
                remaining_tasks = [
                    s
                    for s in task_states.values()
                    if not s.completed and s.exception is None
                ]
                if remaining_tasks:
                    all_waiting = all(s.waiting_for_lock for s in remaining_tasks)
                    if all_waiting:
                        reason = (
                            f"All {len(remaining_tasks)} remaining tasks "
                            "are running in other builds"
                        )
                        logger.info(f"Exiting early: {reason}")
                        try:
                            await registry.build_exit_early_aio(build_id, reason)
                        except Exception as reg_err:
                            logger.warning(
                                f"Failed to notify registry of exit early: {reg_err}"
                            )
                        return BuildSummary(
                            status=BuildExitStatus.EXIT_EARLY,
                            task_count=task_count,
                            build_id=build_id,
                            error=None,
                        )

        await registry.build_complete_aio(build_id)
        return BuildSummary(
            status=BuildExitStatus.SUCCESS
            if error is None
            else BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=error,
        )

    except Exception as e:
        await registry.build_fail_aio(build_id, str(e))
        return BuildSummary(
            status=BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=e,
        )

    finally:
        await task_executor.teardown()

build_sequential

build_sequential(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    dual_run_default="sync",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
)

Sync API for building tasks sequentially.

This is intended primarily for debugging and testing.

Task execution policy: - Sync-only tasks: run via run() - Async-only tasks: run via asyncio.run(run_aio()). (Does not work if called from within an existing event loop.) - Dual tasks: run via run() if dual_run_default=="sync" (default), else (dual_run_default=="async") via asyncio.run(run_aio()).

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: list[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

dual_run_default

For dual tasks, prefer sync or async execution

TYPE: Literal['sync', 'async'] DEFAULT: 'sync'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

Source code in stardag/build/_sequential.py
def build_sequential(
    tasks: list[BaseTask] | BaseTask,
    registry: RegistryABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    dual_run_default: Literal["sync", "async"] = "sync",
    resume_build_id: UUID | None = None,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
) -> BuildSummary:
    """Sync API for building tasks sequentially.

    This is intended primarily for debugging and testing.

    Task execution policy:
    - Sync-only tasks: run via `run()`
    - Async-only tasks: run via `asyncio.run(run_aio())`. (Does not work if called
        from within an existing event loop.)
    - Dual tasks: run via `run()` if `dual_run_default=="sync"` (default), else
        (`dual_run_default=="async"`) via `asyncio.run(run_aio())`.

    Args:
        tasks: List of root tasks to build (and their dependencies) or a single root
            task.
        registry: Registry for tracking builds
        fail_mode: How to handle task failures
        dual_run_default: For dual tasks, prefer sync or async execution
        resume_build_id: Optional build ID to resume. If provided, continues tracking
            events under this existing build instead of starting a new one.
        global_lock_manager: Global concurrency lock manager for distributed builds.
            If provided with global_lock_config.enabled=True, tasks will acquire locks
            before execution for "exactly once" semantics across processes.
        global_lock_config: Configuration for global locking behavior.

    Returns:
        BuildSummary with status, task counts, and build_id
    """
    if isinstance(tasks, BaseTask):
        tasks = [tasks]
    else:
        tasks = list(tasks)
        for idx, task in enumerate(tasks):
            if not isinstance(task, BaseTask):
                raise ValueError(
                    f"Invalid task at index {idx}: {task} (must be BaseTask)"
                )

    if registry is None:
        registry = init_registry()
    if global_lock_config is None:
        global_lock_config = GlobalLockConfig()
    lock_selector: GlobalLockSelector = DefaultGlobalLockSelector(global_lock_config)
    held_locks: set[str] = set()

    task_count = TaskCount()
    completion_cache: set[UUID] = set()
    failed_cache: set[UUID] = set()
    error: Exception | None = None

    # Discover all tasks, stopping at already-complete tasks
    all_tasks: dict[UUID, BaseTask] = {}
    previously_completed_tasks: list[BaseTask] = []

    def discover(task: BaseTask) -> None:
        """Recursively discover tasks, stopping at already-complete tasks."""
        if task.id in all_tasks:
            return
        all_tasks[task.id] = task
        task_count.discovered += 1

        # Check if this task is already complete
        if task.complete():
            completion_cache.add(task.id)
            task_count.previously_completed += 1
            previously_completed_tasks.append(task)
            # Don't recurse into deps - they're already built
            return

        # Task not complete - recurse into dependencies
        for dep in flatten_task_struct(task.requires()):
            discover(dep)

    for root in tasks:
        discover(root)

    # Start or resume build
    if resume_build_id is not None:
        build_id = resume_build_id
    else:
        build_id = registry.build_start(root_tasks=tasks)

    # Register previously completed tasks so they appear in the build's task list
    for task in previously_completed_tasks:
        try:
            registry.task_register(build_id, task)
        except Exception:
            pass  # Best effort

    def has_failed_dep(task: BaseTask) -> bool:
        """Check if any dependency has failed."""
        deps = flatten_task_struct(task.requires())
        return any(d.id in failed_cache for d in deps)

    def acquire_lock_sync(task: BaseTask) -> LockAcquisitionResult:
        """Acquire lock synchronously with retry/backoff."""
        assert global_lock_manager is not None
        assert global_lock_config is not None
        task_id = str(task.id)
        timeout = global_lock_config.lock_wait_timeout_seconds
        current_interval = global_lock_config.lock_wait_initial_interval_seconds
        max_interval = global_lock_config.lock_wait_max_interval_seconds
        backoff_factor = global_lock_config.lock_wait_backoff_factor
        start_time = time.time()

        while True:
            result = asyncio.run(global_lock_manager.acquire(task_id))

            if result.status == LockAcquisitionStatus.ACQUIRED:
                return result

            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                return result

            if result.status == LockAcquisitionStatus.ERROR:
                return result

            # HELD_BY_OTHER or CONCURRENCY_LIMIT_REACHED - retry with backoff
            if timeout is None:
                return result

            elapsed = time.time() - start_time
            if elapsed >= timeout:
                return LockAcquisitionResult(
                    status=result.status,
                    acquired=False,
                    error_message=f"Timeout after {timeout}s: {result.status.value}",
                )

            # Check if task was completed externally during the wait
            if task.complete():
                return LockAcquisitionResult(
                    status=LockAcquisitionStatus.ALREADY_COMPLETED,
                    acquired=False,
                )

            logger.debug(
                f"Lock for {task_id} unavailable ({result.status}), "
                f"retrying in {current_interval:.1f}s..."
            )
            time.sleep(current_interval)
            current_interval = min(current_interval * backoff_factor, max_interval)

    def release_lock_sync(task: BaseTask, completed: bool) -> None:
        """Release lock for task if held."""
        if global_lock_manager is None:
            return
        task_id = str(task.id)
        if task_id not in held_locks:
            return
        try:
            asyncio.run(global_lock_manager.release(task_id, task_completed=completed))
        except Exception as e:
            logger.warning(f"Failed to release lock for task {task_id}: {e}")
        finally:
            held_locks.discard(task_id)

    try:
        # Build in topological order
        while True:
            # Find a task ready to run
            ready_task: BaseTask | None = None
            for task in all_tasks.values():
                if task.id in completion_cache or task.id in failed_cache:
                    continue
                # Skip tasks with failed dependencies
                if has_failed_dep(task):
                    continue
                deps = flatten_task_struct(task.requires())
                if all(d.id in completion_cache for d in deps):
                    ready_task = task
                    break

            if ready_task is None:
                # No more tasks can run - either all done or blocked by failures
                break

            # Acquire lock if needed
            use_lock = global_lock_manager is not None and lock_selector(ready_task)
            if use_lock:
                lock_result = acquire_lock_sync(ready_task)

                if lock_result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                    # Task completed elsewhere - skip execution
                    completion_cache.add(ready_task.id)
                    task_count.previously_completed += 1
                    continue

                if lock_result.status != LockAcquisitionStatus.ACQUIRED:
                    # Lock not acquired - treat as failure
                    task_count.failed += 1
                    failed_cache.add(ready_task.id)
                    error = RuntimeError(
                        f"Failed to acquire lock: {lock_result.error_message}"
                    )
                    registry.task_fail(build_id, ready_task, str(error))
                    if fail_mode == FailMode.FAIL_FAST:
                        raise error
                    continue

                # Lock acquired - track it
                held_locks.add(str(ready_task.id))

            # Execute the task
            task_completed = False
            try:
                _run_task_sequential(
                    ready_task,
                    completion_cache,
                    all_tasks,
                    build_id,
                    registry,
                    dual_run_default,
                )
                task_count.succeeded += 1
                task_completed = True
            except Exception as e:
                task_count.failed += 1
                failed_cache.add(ready_task.id)
                error = e
                registry.task_fail(build_id, ready_task, str(e))
                if fail_mode == FailMode.FAIL_FAST:
                    raise
            finally:
                if use_lock:
                    release_lock_sync(ready_task, completed=task_completed)

        registry.build_complete(build_id)
        return BuildSummary(
            status=BuildExitStatus.SUCCESS
            if error is None
            else BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=error,
        )

    except Exception as e:
        registry.build_fail(build_id, str(e))
        return BuildSummary(
            status=BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=e,
        )

build_sequential_aio async

build_sequential_aio(
    tasks,
    registry=None,
    fail_mode=FAIL_FAST,
    sync_run_default="blocking",
    resume_build_id=None,
    global_lock_manager=None,
    global_lock_config=None,
)

Async API for building tasks sequentially.

This is intended primarily for debugging and testing.

Task execution policy: - Sync-only tasks: runs blocking via run() in main event loop if sync_run_default=="blocking" (default), else (sync_run_default=="thread") in thread pool. - Async-only tasks: run via await run_aio(). - Dual tasks: run via await run_aio().

PARAMETER DESCRIPTION
tasks

List of root tasks to build (and their dependencies) or a single root task.

TYPE: list[BaseTask] | BaseTask

registry

Registry for tracking builds

TYPE: RegistryABC | None DEFAULT: None

fail_mode

How to handle task failures

TYPE: FailMode DEFAULT: FAIL_FAST

sync_run_default

For sync-only tasks, block or use thread pool

TYPE: Literal['thread', 'blocking'] DEFAULT: 'blocking'

resume_build_id

Optional build ID to resume. If provided, continues tracking events under this existing build instead of starting a new one.

TYPE: UUID | None DEFAULT: None

global_lock_manager

Global concurrency lock manager for distributed builds. If provided with global_lock_config.enabled=True, tasks will acquire locks before execution for "exactly once" semantics across processes.

TYPE: GlobalConcurrencyLockManager | None DEFAULT: None

global_lock_config

Configuration for global locking behavior.

TYPE: GlobalLockConfig | None DEFAULT: None

RETURNS DESCRIPTION
BuildSummary

BuildSummary with status, task counts, and build_id

Source code in stardag/build/_sequential.py
async def build_sequential_aio(
    tasks: list[BaseTask] | BaseTask,
    registry: RegistryABC | None = None,
    fail_mode: FailMode = FailMode.FAIL_FAST,
    sync_run_default: Literal["thread", "blocking"] = "blocking",
    resume_build_id: UUID | None = None,
    global_lock_manager: GlobalConcurrencyLockManager | None = None,
    global_lock_config: GlobalLockConfig | None = None,
) -> BuildSummary:
    """Async API for building tasks sequentially.

    This is intended primarily for debugging and testing.

    Task execution policy:
    - Sync-only tasks: runs *blocking* via `run()` in main event loop if
        `sync_run_default=="blocking"` (default), else (`sync_run_default=="thread"`)
        in thread pool.
    - Async-only tasks: run via `await run_aio()`.
    - Dual tasks: run via `await run_aio()`.

    Args:
        tasks: List of root tasks to build (and their dependencies) or a single root
            task.
        registry: Registry for tracking builds
        fail_mode: How to handle task failures
        sync_run_default: For sync-only tasks, block or use thread pool
        resume_build_id: Optional build ID to resume. If provided, continues tracking
            events under this existing build instead of starting a new one.
        global_lock_manager: Global concurrency lock manager for distributed builds.
            If provided with global_lock_config.enabled=True, tasks will acquire locks
            before execution for "exactly once" semantics across processes.
        global_lock_config: Configuration for global locking behavior.

    Returns:
        BuildSummary with status, task counts, and build_id
    """
    if isinstance(tasks, BaseTask):
        tasks = [tasks]
    else:
        tasks = list(tasks)
        for idx, task in enumerate(tasks):
            if not isinstance(task, BaseTask):
                raise ValueError(
                    f"Invalid task at index {idx}: {task} (must be BaseTask)"
                )
    if registry is None:
        registry = init_registry()
    if global_lock_config is None:
        global_lock_config = GlobalLockConfig()
    lock_selector: GlobalLockSelector = DefaultGlobalLockSelector(global_lock_config)
    held_locks: set[str] = set()

    task_count = TaskCount()
    completion_cache: set[UUID] = set()
    failed_cache: set[UUID] = set()
    error: Exception | None = None

    # Discover all tasks, stopping at already-complete tasks
    all_tasks: dict[UUID, BaseTask] = {}
    previously_completed_tasks: list[BaseTask] = []

    async def discover(task: BaseTask) -> None:
        """Recursively discover tasks, stopping at already-complete tasks."""
        if task.id in all_tasks:
            return
        all_tasks[task.id] = task
        task_count.discovered += 1

        # Check if this task is already complete
        if await task.complete_aio():
            completion_cache.add(task.id)
            task_count.previously_completed += 1
            previously_completed_tasks.append(task)
            # Don't recurse into deps - they're already built
            return

        # Task not complete - recurse into dependencies
        for dep in flatten_task_struct(task.requires()):
            await discover(dep)

    for root in tasks:
        await discover(root)

    # Start or resume build
    if resume_build_id is not None:
        build_id = resume_build_id
    else:
        build_id = await registry.build_start_aio(root_tasks=tasks)

    # Register previously completed tasks so they appear in the build's task list
    for task in previously_completed_tasks:
        try:
            await registry.task_register_aio(build_id, task)
        except Exception:
            pass  # Best effort

    def has_failed_dep(task: BaseTask) -> bool:
        """Check if any dependency has failed."""
        deps = flatten_task_struct(task.requires())
        return any(d.id in failed_cache for d in deps)

    async def acquire_lock_aio(task: BaseTask) -> LockAcquisitionResult:
        """Acquire lock asynchronously with retry/backoff."""
        assert global_lock_manager is not None
        assert global_lock_config is not None
        task_id = str(task.id)
        timeout = global_lock_config.lock_wait_timeout_seconds
        current_interval = global_lock_config.lock_wait_initial_interval_seconds
        max_interval = global_lock_config.lock_wait_max_interval_seconds
        backoff_factor = global_lock_config.lock_wait_backoff_factor

        loop = asyncio.get_event_loop()
        start_time = loop.time()

        while True:
            result = await global_lock_manager.acquire(task_id)

            if result.status == LockAcquisitionStatus.ACQUIRED:
                return result

            if result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                return result

            if result.status == LockAcquisitionStatus.ERROR:
                return result

            # HELD_BY_OTHER or CONCURRENCY_LIMIT_REACHED - retry with backoff
            if timeout is None:
                return result

            elapsed = loop.time() - start_time
            if elapsed >= timeout:
                return LockAcquisitionResult(
                    status=result.status,
                    acquired=False,
                    error_message=f"Timeout after {timeout}s: {result.status.value}",
                )

            # Check if task was completed externally during the wait
            if await task.complete_aio():
                return LockAcquisitionResult(
                    status=LockAcquisitionStatus.ALREADY_COMPLETED,
                    acquired=False,
                )

            logger.debug(
                f"Lock for {task_id} unavailable ({result.status}), "
                f"retrying in {current_interval:.1f}s..."
            )
            await asyncio.sleep(current_interval)
            current_interval = min(current_interval * backoff_factor, max_interval)

    async def release_lock_aio(task: BaseTask, completed: bool) -> None:
        """Release lock for task if held."""
        if global_lock_manager is None:
            return
        task_id = str(task.id)
        if task_id not in held_locks:
            return
        try:
            await global_lock_manager.release(task_id, task_completed=completed)
        except Exception as e:
            logger.warning(f"Failed to release lock for task {task_id}: {e}")
        finally:
            held_locks.discard(task_id)

    try:
        # Build in topological order
        while True:
            # Find a task ready to run
            ready_task: BaseTask | None = None
            for task in all_tasks.values():
                if task.id in completion_cache or task.id in failed_cache:
                    continue
                # Skip tasks with failed dependencies
                if has_failed_dep(task):
                    continue
                deps = flatten_task_struct(task.requires())
                if all(d.id in completion_cache for d in deps):
                    ready_task = task
                    break

            if ready_task is None:
                # No more tasks can run - either all done or blocked by failures
                break

            # Acquire lock if needed
            use_lock = global_lock_manager is not None and lock_selector(ready_task)
            if use_lock:
                lock_result = await acquire_lock_aio(ready_task)

                if lock_result.status == LockAcquisitionStatus.ALREADY_COMPLETED:
                    # Task completed elsewhere - skip execution
                    completion_cache.add(ready_task.id)
                    task_count.previously_completed += 1
                    continue

                if lock_result.status != LockAcquisitionStatus.ACQUIRED:
                    # Lock not acquired - treat as failure
                    task_count.failed += 1
                    failed_cache.add(ready_task.id)
                    error = RuntimeError(
                        f"Failed to acquire lock: {lock_result.error_message}"
                    )
                    await registry.task_fail_aio(build_id, ready_task, str(error))
                    if fail_mode == FailMode.FAIL_FAST:
                        raise error
                    continue

                # Lock acquired - track it
                held_locks.add(str(ready_task.id))

            # Execute the task
            task_completed = False
            try:
                await _run_task_sequential_aio(
                    ready_task,
                    completion_cache,
                    all_tasks,
                    build_id,
                    registry,
                    sync_run_default,
                )
                task_count.succeeded += 1
                task_completed = True
            except Exception as e:
                task_count.failed += 1
                failed_cache.add(ready_task.id)
                error = e
                await registry.task_fail_aio(build_id, ready_task, str(e))
                if fail_mode == FailMode.FAIL_FAST:
                    raise
            finally:
                if use_lock:
                    await release_lock_aio(ready_task, completed=task_completed)

        await registry.build_complete_aio(build_id)
        return BuildSummary(
            status=BuildExitStatus.SUCCESS
            if error is None
            else BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=error,
        )

    except Exception as e:
        await registry.build_fail_aio(build_id, str(e))
        return BuildSummary(
            status=BuildExitStatus.FAILURE,
            task_count=task_count,
            build_id=build_id,
            error=e,
        )

Target Module

target

target_factory_provider module-attribute

target_factory_provider = resource_provider(
    type_=TargetFactory, default_factory=TargetFactory
)

FileSystemTarget

FileSystemTarget(uri)

Bases: _FileSystemTargetGeneric[bytes], Protocol

Source code in stardag/target/_base.py
def __init__(self, uri: str) -> None:
    self.uri = uri

LoadableSaveableFileSystemTarget

LoadableSaveableFileSystemTarget(uri)

Bases: LoadableSaveableTarget[LoadedT], _FileSystemTargetGeneric[bytes], Generic[LoadedT], Protocol

Source code in stardag/target/_base.py
def __init__(self, uri: str) -> None:
    self.uri = uri

LocalTarget

LocalTarget(uri)

Bases: FileSystemTarget

TODO use luigi-style atomic writes.

Source code in stardag/target/_base.py
def __init__(self, uri: str) -> None:
    # Expand ~ to user home directory
    self.uri = os.path.expanduser(uri)

exists_aio async

exists_aio()

Asynchronously check if the local file exists.

Source code in stardag/target/_base.py
async def exists_aio(self) -> bool:
    """Asynchronously check if the local file exists."""
    return await aiofiles.os.path.exists(self.path)

TargetFactory

TargetFactory(
    target_roots=None, prefix_to_target_prototype=None
)
Source code in stardag/target/_factory.py
def __init__(
    self,
    target_roots: dict[str, str] | None = None,
    prefix_to_target_prototype: PrefixToTargetPrototype | None = None,
) -> None:
    # If no target_roots provided, get from central config
    if target_roots is None:
        target_roots = config_provider.get().target.roots

    self.target_roots = {
        key: value.removesuffix("/") + "/" for key, value in target_roots.items()
    }
    self.prefix_to_target_prototype = (
        prefix_to_target_prototype or get_default_prefix_to_target_prototype()
    )

get_target

get_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Get a file system target.

PARAMETER DESCRIPTION
relpath

The path to the target, relative to the configured root path for target_root_key.

TYPE: str

target_root_key

The key to the target root to use.

TYPE: str DEFAULT: DEFAULT_TARGET_ROOT_KEY

RETURNS DESCRIPTION
FileSystemTarget

A file system target.

Source code in stardag/target/_factory.py
def get_target(
    self,
    relpath: str,
    target_root_key: str = DEFAULT_TARGET_ROOT_KEY,
) -> FileSystemTarget:
    """Get a file system target.

    Args:
        relpath: The path to the target, relative to the configured root path for
            `target_root_key`.
        target_root_key: The key to the target root to use.

    Returns:
        A file system target.
    """
    if self._is_full_path(relpath):
        path = relpath
    else:
        path = self.get_path(relpath, target_root_key)
    target_prototype = self._get_target_prototype(path)
    return target_prototype(path)

get_directory_target

get_directory_target(
    relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY
)

Get a directory target.

PARAMETER DESCRIPTION
relpath

The path to the target, relative to the configured root path for target_root_key.

TYPE: str

target_root_key

The key to the target root to use.

TYPE: str DEFAULT: DEFAULT_TARGET_ROOT_KEY

RETURNS DESCRIPTION
DirectoryTarget

A directory target.

Source code in stardag/target/_factory.py
def get_directory_target(
    self,
    relpath: str,
    target_root_key: str = DEFAULT_TARGET_ROOT_KEY,
) -> DirectoryTarget:
    """Get a directory target.

    Args:
        relpath: The path to the target, relative to the configured root path for
            `target_root_key`.
        target_root_key: The key to the target root to use.

    Returns:
        A directory target.
    """
    if self._is_full_path(relpath):
        path = relpath
    else:
        path = self.get_path(relpath, target_root_key)
    target_prototype = self._get_target_prototype(path)
    return DirectoryTarget(path, target_prototype)

get_path

get_path(relpath, target_root_key=DEFAULT_TARGET_ROOT_KEY)

Get the full (/"absolute") path (/"URI") to the target.

Source code in stardag/target/_factory.py
def get_path(
    self, relpath: str, target_root_key: str = DEFAULT_TARGET_ROOT_KEY
) -> str:
    """Get the full (/"absolute") path (/"URI") to the target."""
    target_root = self.target_roots.get(target_root_key)
    if target_root is None:
        example_json = json.dumps({target_root_key: "...", "default": "..."})
        raise ValueError(
            f"No target root is configured for key: '{target_root_key}'. "
            f"Available keys are: {list(self.target_roots.keys())}. Set the missing "
            "target root in your registry config or via environment variable: "
            f"`STARDAG_TARGET_ROOTS='{example_json}'`."
        )

    return f"{target_root}{relpath}"

Registry Module

registry

Task registry module for stardag.

This module provides registry implementations for tracking task execution. The main classes are:

  • RegistryABC: Abstract base class defining the registry interface
  • APIRegistry: Registry that communicates with the stardag-api service
  • NoOpRegistry: A do-nothing registry (default when unconfigured)
  • registry_provider: Resource provider for getting the configured registry
  • RegistryGlobalConcurrencyLockManager: GlobalConcurrencyLockManager using Registry API
  • RegistryLockHandle: LockHandle implementation with automatic TTL renewal

registry_provider module-attribute

registry_provider = resource_provider(
    RegistryABC, init_registry
)

APIRegistry

APIRegistry(
    api_url=None,
    timeout=None,
    environment_id=None,
    api_key=None,
)

Bases: RegistryABC

Registry that stores task information via the stardag-api REST service.

This registry is stateless with respect to build_id - the build_id is passed explicitly to all methods that need it. This allows a single registry instance to be reused across multiple builds (via registry_provider).

Usage

build_id = await registry.build_start_aio(root_tasks=tasks) await registry.task_register_aio(build_id, task) await registry.task_start_aio(build_id, task)

... execute task ...

await registry.task_complete_aio(build_id, task) await registry.build_complete_aio(build_id)

Authentication: - API key can be provided directly or via STARDAG_API_KEY env var - JWT token from browser login (stored in registry credentials)

Configuration is loaded from the central config module (stardag.config).

Source code in stardag/registry/_api_registry.py
def __init__(
    self,
    api_url: str | None = None,
    timeout: float | None = None,
    environment_id: str | None = None,
    api_key: str | None = None,
):
    # Load central config
    config = config_provider.get()

    # API key: explicit > config (which includes env var)
    self.api_key = api_key or config.api_key

    # Access token from config (browser login, only if no API key)
    self.access_token = config.access_token if not self.api_key else None

    # API URL: explicit > config
    self.api_url = (api_url or config.api.url).rstrip("/")

    # Timeout: explicit > config
    self.timeout = timeout if timeout is not None else config.api.timeout

    # Environment ID: explicit > config
    self.environment_id = environment_id or config.context.environment_id

    self._client = None
    self._async_client = None

    if self.api_key:
        logger.debug("APIRegistry initialized with API key authentication")
    elif self.access_token:
        if not self.environment_id:
            logger.warning(
                "APIRegistry: JWT auth requires environment_id. "
                "Run 'stardag config set environment <id>' to set it."
            )
        else:
            logger.debug(
                "APIRegistry initialized with browser login (JWT) authentication"
            )
    else:
        logger.warning(
            "APIRegistry initialized without authentication. "
            "Run 'stardag auth login' or set STARDAG_API_KEY env var."
        )

async_client property

async_client

Lazy-initialized async HTTP client.

build_start

build_start(root_tasks=None, description=None)

Start a new build and return its ID.

Source code in stardag/registry/_api_registry.py
def build_start(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Start a new build and return its ID."""
    build_data = {
        "commit_hash": get_git_commit_hash(),
        "root_task_ids": [str(task.id) for task in (root_tasks or [])],
        "description": description,
    }

    response = self.client.post(
        f"{self.api_url}/api/v1/builds",
        json=build_data,
        params=self._get_params(),
    )
    self._handle_response_error(response, "Start build")
    data = response.json()
    build_id = UUID(data["id"])
    logger.info(f"Started build: {data['name']} (ID: {build_id})")
    return build_id

build_complete

build_complete(build_id)

Mark a build as completed.

Source code in stardag/registry/_api_registry.py
def build_complete(self, build_id: UUID) -> None:
    """Mark a build as completed."""
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/complete",
        params=self._get_params(),
    )
    self._handle_response_error(response, "Complete build")
    logger.info(f"Completed build: {build_id}")

build_fail

build_fail(build_id, error_message=None)

Mark a build as failed.

Source code in stardag/registry/_api_registry.py
def build_fail(self, build_id: UUID, error_message: str | None = None) -> None:
    """Mark a build as failed."""
    params = self._get_params()
    if error_message:
        params["error_message"] = error_message
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/fail",
        params=params,
    )
    self._handle_response_error(response, "Fail build")
    logger.info(f"Marked build as failed: {build_id}")

build_cancel

build_cancel(build_id)

Cancel a build.

Source code in stardag/registry/_api_registry.py
def build_cancel(self, build_id: UUID) -> None:
    """Cancel a build."""
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/cancel",
        params=self._get_params(),
    )
    self._handle_response_error(response, "Cancel build")
    logger.info(f"Cancelled build: {build_id}")

build_exit_early

build_exit_early(build_id, reason=None)

Mark a build as exited early.

Source code in stardag/registry/_api_registry.py
def build_exit_early(self, build_id: UUID, reason: str | None = None) -> None:
    """Mark a build as exited early."""
    params = self._get_params()
    if reason:
        params["reason"] = reason
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/exit-early",
        params=params,
    )
    self._handle_response_error(response, "Exit early")
    logger.info(f"Build exited early: {build_id}")

task_register

task_register(build_id, task)

Register a task within a build.

Source code in stardag/registry/_api_registry.py
def task_register(self, build_id: UUID, task: "BaseTask") -> None:
    """Register a task within a build."""
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks",
        json=_get_task_data_for_registration(task),
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Register task {task.id}")

task_start

task_start(build_id, task)

Mark a task as started.

Source code in stardag/registry/_api_registry.py
def task_start(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as started."""
    # Ensure task is registered first
    self.task_register(build_id, task)

    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/start",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Start task {task.id}")

task_complete

task_complete(build_id, task)

Mark a task as completed.

Source code in stardag/registry/_api_registry.py
def task_complete(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as completed."""
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/complete",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Complete task {task.id}")

task_fail

task_fail(build_id, task, error_message=None)

Mark a task as failed.

Source code in stardag/registry/_api_registry.py
def task_fail(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Mark a task as failed."""
    params = self._get_params()
    if error_message:
        params["error_message"] = error_message
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/fail",
        params=params,
    )
    self._handle_response_error(response, f"Fail task {task.id}")

task_suspend

task_suspend(build_id, task)

Mark a task as suspended (waiting for dynamic dependencies).

Source code in stardag/registry/_api_registry.py
def task_suspend(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as suspended (waiting for dynamic dependencies)."""
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/suspend",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Suspend task {task.id}")

task_resume

task_resume(build_id, task)

Mark a task as resumed (dynamic dependencies completed).

Source code in stardag/registry/_api_registry.py
def task_resume(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as resumed (dynamic dependencies completed)."""
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/resume",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Resume task {task.id}")

task_cancel

task_cancel(build_id, task)

Cancel a task.

Source code in stardag/registry/_api_registry.py
def task_cancel(self, build_id: UUID, task: "BaseTask") -> None:
    """Cancel a task."""
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/cancel",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Cancel task {task.id}")

task_waiting_for_lock

task_waiting_for_lock(build_id, task, lock_owner=None)

Record that a task is waiting for a global lock.

Source code in stardag/registry/_api_registry.py
def task_waiting_for_lock(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Record that a task is waiting for a global lock."""
    params = self._get_params()
    if lock_owner:
        params["lock_owner"] = lock_owner
    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/waiting-for-lock",
        params=params,
    )
    self._handle_response_error(response, f"Task {task.id} waiting for lock")

task_upload_assets

task_upload_assets(build_id, task, assets)

Upload assets for a completed task.

Source code in stardag/registry/_api_registry.py
def task_upload_assets(
    self, build_id: UUID, task: "BaseTask", assets: list[RegistryAsset]
) -> None:
    """Upload assets for a completed task."""
    if not assets:
        return

    # Serialize assets to API format
    # For all asset types, body is stored as a dict in body_json
    # - markdown: {"content": "<markdown string>"}
    # - json: the actual JSON data dict
    assets_data = []
    for asset in assets:
        data = asset.model_dump(mode="json")
        if asset.type == "markdown":
            # Wrap markdown body string in {"content": ...} dict
            data["body"] = {"content": data["body"]}
        assets_data.append(data)

    response = self.client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/assets",
        json=assets_data,
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Upload assets for task {task.id}")
    logger.debug(f"Uploaded {len(assets)} assets for task {task.id}")

task_get_metadata

task_get_metadata(task_id)

Get metadata for a registered task.

PARAMETER DESCRIPTION
task_id

The UUID of the task to get metadata for.

TYPE: UUID

RETURNS DESCRIPTION
TaskMetadata

A TaskMetadata object containing task metadata.

Source code in stardag/registry/_api_registry.py
def task_get_metadata(self, task_id: UUID) -> TaskMetadata:
    """Get metadata for a registered task.

    Args:
        task_id: The UUID of the task to get metadata for.

    Returns:
        A TaskMetadata object containing task metadata.
    """

    response = self.client.get(
        f"{self.api_url}/api/v1/tasks/{task_id}/metadata",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Get metadata for task {task_id}")
    data = response.json()

    return TaskMetadata.model_validate(data)

close

close()

Close the HTTP client.

Source code in stardag/registry/_api_registry.py
def close(self) -> None:
    """Close the HTTP client."""
    if self._client is not None:
        self._client.close()
        self._client = None

aclose async

aclose()

Close the async HTTP client.

Source code in stardag/registry/_api_registry.py
async def aclose(self) -> None:
    """Close the async HTTP client."""
    if self._async_client is not None:
        await self._async_client.aclose()
        self._async_client = None

build_start_aio async

build_start_aio(root_tasks=None, description=None)

Async version - start a new build and return its ID.

Source code in stardag/registry/_api_registry.py
async def build_start_aio(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Async version - start a new build and return its ID."""
    build_data = {
        "commit_hash": get_git_commit_hash(),
        "root_task_ids": [str(task.id) for task in (root_tasks or [])],
        "description": description,
    }

    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds",
        json=build_data,
        params=self._get_params(),
    )
    self._handle_response_error(response, "Start build")
    data = response.json()
    build_id = UUID(data["id"])
    logger.info(f"Started build: {data['name']} (ID: {build_id})")
    return build_id

build_complete_aio async

build_complete_aio(build_id)

Async version - mark a build as completed.

Source code in stardag/registry/_api_registry.py
async def build_complete_aio(self, build_id: UUID) -> None:
    """Async version - mark a build as completed."""
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/complete",
        params=self._get_params(),
    )
    self._handle_response_error(response, "Complete build")
    logger.info(f"Completed build: {build_id}")

build_fail_aio async

build_fail_aio(build_id, error_message=None)

Async version - mark a build as failed.

Source code in stardag/registry/_api_registry.py
async def build_fail_aio(
    self, build_id: UUID, error_message: str | None = None
) -> None:
    """Async version - mark a build as failed."""
    params = self._get_params()
    if error_message:
        params["error_message"] = error_message
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/fail",
        params=params,
    )
    self._handle_response_error(response, "Fail build")
    logger.info(f"Marked build as failed: {build_id}")

build_cancel_aio async

build_cancel_aio(build_id)

Async version - cancel a build.

Source code in stardag/registry/_api_registry.py
async def build_cancel_aio(self, build_id: UUID) -> None:
    """Async version - cancel a build."""
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/cancel",
        params=self._get_params(),
    )
    self._handle_response_error(response, "Cancel build")
    logger.info(f"Cancelled build: {build_id}")

build_exit_early_aio async

build_exit_early_aio(build_id, reason=None)

Async version - mark build as exited early.

Source code in stardag/registry/_api_registry.py
async def build_exit_early_aio(
    self, build_id: UUID, reason: str | None = None
) -> None:
    """Async version - mark build as exited early."""
    params = self._get_params()
    if reason:
        params["reason"] = reason
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/exit-early",
        params=params,
    )
    self._handle_response_error(response, "Exit early")
    logger.info(f"Build exited early: {build_id}")

task_register_aio async

task_register_aio(build_id, task)

Async version - register a task within a build.

Source code in stardag/registry/_api_registry.py
async def task_register_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - register a task within a build."""
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks",
        json=_get_task_data_for_registration(task),
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Register task {task.id}")

task_start_aio async

task_start_aio(build_id, task)

Async version - mark a task as started.

Source code in stardag/registry/_api_registry.py
async def task_start_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as started."""
    await self.task_register_aio(build_id, task)

    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/start",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Start task {task.id}")

task_complete_aio async

task_complete_aio(build_id, task)

Async version - mark a task as completed.

Source code in stardag/registry/_api_registry.py
async def task_complete_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as completed."""
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/complete",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Complete task {task.id}")

task_fail_aio async

task_fail_aio(build_id, task, error_message=None)

Async version - mark a task as failed.

Source code in stardag/registry/_api_registry.py
async def task_fail_aio(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Async version - mark a task as failed."""
    params = self._get_params()
    if error_message:
        params["error_message"] = error_message
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/fail",
        params=params,
    )
    self._handle_response_error(response, f"Fail task {task.id}")

task_suspend_aio async

task_suspend_aio(build_id, task)

Async version - mark a task as suspended.

Source code in stardag/registry/_api_registry.py
async def task_suspend_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as suspended."""
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/suspend",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Suspend task {task.id}")

task_resume_aio async

task_resume_aio(build_id, task)

Async version - mark a task as resumed.

Source code in stardag/registry/_api_registry.py
async def task_resume_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - mark a task as resumed."""
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/resume",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Resume task {task.id}")

task_cancel_aio async

task_cancel_aio(build_id, task)

Async version - cancel a task.

Source code in stardag/registry/_api_registry.py
async def task_cancel_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version - cancel a task."""
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/cancel",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Cancel task {task.id}")

task_waiting_for_lock_aio async

task_waiting_for_lock_aio(build_id, task, lock_owner=None)

Async version - record that task is waiting for global lock.

Source code in stardag/registry/_api_registry.py
async def task_waiting_for_lock_aio(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Async version - record that task is waiting for global lock."""
    params = self._get_params()
    if lock_owner:
        params["lock_owner"] = lock_owner
    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/waiting-for-lock",
        params=params,
    )
    self._handle_response_error(response, f"Task {task.id} waiting for lock")

task_upload_assets_aio async

task_upload_assets_aio(build_id, task, assets)

Async version - upload assets for a completed task.

Source code in stardag/registry/_api_registry.py
async def task_upload_assets_aio(
    self, build_id: UUID, task: "BaseTask", assets: list[RegistryAsset]
) -> None:
    """Async version - upload assets for a completed task."""
    if not assets:
        return

    assets_data = []
    for asset in assets:
        data = asset.model_dump(mode="json")
        if asset.type == "markdown":
            data["body"] = {"content": data["body"]}
        assets_data.append(data)

    response = await self.async_client.post(
        f"{self.api_url}/api/v1/builds/{build_id}/tasks/{task.id}/assets",
        json=assets_data,
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Upload assets for task {task.id}")
    logger.debug(f"Uploaded {len(assets)} assets for task {task.id}")

task_get_metadata_aio async

task_get_metadata_aio(task_id)

Async version of task_get_metadata.

Source code in stardag/registry/_api_registry.py
async def task_get_metadata_aio(self, task_id: UUID) -> TaskMetadata:
    """Async version of task_get_metadata."""

    response = await self.async_client.get(
        f"{self.api_url}/api/v1/tasks/{task_id}/metadata",
        params=self._get_params(),
    )
    self._handle_response_error(response, f"Get metadata for task {task_id}")
    data = response.json()

    return TaskMetadata.model_validate(data)

RegistryABC

Abstract base class for task registries.

A registry tracks task execution within builds. Implementations must provide at least the task_register method. All other methods have default no-op implementations for backwards compatibility.

The registry is stateless with respect to build_id - the build_id is passed explicitly to all methods that need it. This allows a single registry instance to be reused across multiple builds.

Method naming convention: - Build methods: build_ (e.g., build_start, build_complete) - Task methods: task_ (e.g., task_register, task_start) - Async versions: _aio suffix (e.g., build_start_aio, task_register_aio)

build_start

build_start(root_tasks=None, description=None)

Start a new build session.

Called at the beginning of a build. Returns a build ID.

PARAMETER DESCRIPTION
root_tasks

The root tasks being built

TYPE: list[BaseTask] | None DEFAULT: None

description

Optional description of the build

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
UUID

Build ID (UUID) for the new build session.

Source code in stardag/registry/_base.py
def build_start(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Start a new build session.

    Called at the beginning of a build. Returns a build ID.

    Args:
        root_tasks: The root tasks being built
        description: Optional description of the build

    Returns:
        Build ID (UUID) for the new build session.
    """
    return UUID("00000000-0000-0000-0000-000000000000")

build_complete

build_complete(build_id)

Mark a build as completed successfully.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

Source code in stardag/registry/_base.py
def build_complete(self, build_id: UUID) -> None:
    """Mark a build as completed successfully.

    Args:
        build_id: The build UUID returned by build_start.
    """
    pass

build_fail

build_fail(build_id, error_message=None)

Mark a build as failed.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

error_message

Optional error message describing the failure.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def build_fail(self, build_id: UUID, error_message: str | None = None) -> None:
    """Mark a build as failed.

    Args:
        build_id: The build UUID returned by build_start.
        error_message: Optional error message describing the failure.
    """
    pass

build_cancel

build_cancel(build_id)

Cancel a build.

Called when a build is explicitly cancelled by the user.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

Source code in stardag/registry/_base.py
def build_cancel(self, build_id: UUID) -> None:
    """Cancel a build.

    Called when a build is explicitly cancelled by the user.

    Args:
        build_id: The build UUID returned by build_start.
    """
    pass

build_exit_early

build_exit_early(build_id, reason=None)

Mark a build as exited early.

Called when all remaining tasks are running in other builds and this build should stop waiting.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

reason

Optional reason for exiting early.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def build_exit_early(self, build_id: UUID, reason: str | None = None) -> None:
    """Mark a build as exited early.

    Called when all remaining tasks are running in other builds
    and this build should stop waiting.

    Args:
        build_id: The build UUID returned by build_start.
        reason: Optional reason for exiting early.
    """
    pass

task_register abstractmethod

task_register(build_id, task)

Register a task as pending/scheduled.

This is called when a task is about to be executed.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task to register.

TYPE: BaseTask

Source code in stardag/registry/_base.py
@abc.abstractmethod
def task_register(self, build_id: UUID, task: "BaseTask") -> None:
    """Register a task as pending/scheduled.

    This is called when a task is about to be executed.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task to register.
    """
    pass

task_start

task_start(build_id, task)

Mark a task as started/running.

Called immediately before a task begins execution.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that is starting.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_start(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as started/running.

    Called immediately before a task begins execution.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that is starting.
    """
    pass

task_complete

task_complete(build_id, task)

Mark a task as completed successfully.

Called after a task finishes execution without errors.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that completed.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_complete(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as completed successfully.

    Called after a task finishes execution without errors.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that completed.
    """
    pass

task_fail

task_fail(build_id, task, error_message=None)

Mark a task as failed.

Called when a task raises an exception during execution.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that failed.

TYPE: BaseTask

error_message

Optional error message describing the failure.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def task_fail(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Mark a task as failed.

    Called when a task raises an exception during execution.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that failed.
        error_message: Optional error message describing the failure.
    """
    pass

task_suspend

task_suspend(build_id, task)

Mark a task as suspended waiting for dynamic dependencies.

Called when a task yields dynamic deps that are not yet complete. The task will remain suspended until its dynamic deps are built.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that is suspended.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_suspend(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as suspended waiting for dynamic dependencies.

    Called when a task yields dynamic deps that are not yet complete.
    The task will remain suspended until its dynamic deps are built.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that is suspended.
    """
    pass

task_resume

task_resume(build_id, task)

Mark a task as resumed after dynamic dependencies completed.

Called when a task's dynamic dependencies are complete and the task is ready to continue execution (either by resuming a suspended generator or by re-executing the task).

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task that is resuming.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_resume(self, build_id: UUID, task: "BaseTask") -> None:
    """Mark a task as resumed after dynamic dependencies completed.

    Called when a task's dynamic dependencies are complete and
    the task is ready to continue execution (either by resuming
    a suspended generator or by re-executing the task).

    Args:
        build_id: The build UUID returned by build_start.
        task: The task that is resuming.
    """
    pass

task_cancel

task_cancel(build_id, task)

Cancel a task.

Called when a task is explicitly cancelled by the user.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task to cancel.

TYPE: BaseTask

Source code in stardag/registry/_base.py
def task_cancel(self, build_id: UUID, task: "BaseTask") -> None:
    """Cancel a task.

    Called when a task is explicitly cancelled by the user.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task to cancel.
    """
    pass

task_waiting_for_lock

task_waiting_for_lock(build_id, task, lock_owner=None)

Record that a task is waiting for a global lock.

Called when a task cannot acquire its lock because another build is holding it.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The task waiting for the lock.

TYPE: BaseTask

lock_owner

Optional identifier of who holds the lock.

TYPE: str | None DEFAULT: None

Source code in stardag/registry/_base.py
def task_waiting_for_lock(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Record that a task is waiting for a global lock.

    Called when a task cannot acquire its lock because another
    build is holding it.

    Args:
        build_id: The build UUID returned by build_start.
        task: The task waiting for the lock.
        lock_owner: Optional identifier of who holds the lock.
    """
    pass

task_upload_assets

task_upload_assets(build_id, task, assets)

Upload assets for a completed task.

Called after a task completes successfully if it has registry assets.

PARAMETER DESCRIPTION
build_id

The build UUID returned by build_start.

TYPE: UUID

task

The completed task.

TYPE: BaseTask

assets

List of assets to upload.

TYPE: list[RegistryAsset]

Source code in stardag/registry/_base.py
def task_upload_assets(
    self, build_id: UUID, task: "BaseTask", assets: list["RegistryAsset"]
) -> None:
    """Upload assets for a completed task.

    Called after a task completes successfully if it has registry assets.

    Args:
        build_id: The build UUID returned by build_start.
        task: The completed task.
        assets: List of assets to upload.
    """
    pass

task_get_metadata abstractmethod

task_get_metadata(task_id)

Get metadata for a registered task.

PARAMETER DESCRIPTION
task_id

The ID of the task to get metadata for.

TYPE: UUID

Returns: A TaskMetadata object containing task metadata.

Source code in stardag/registry/_base.py
@abc.abstractmethod
def task_get_metadata(self, task_id: UUID) -> TaskMetadata:
    """Get metadata for a registered task.

    Args:
        task_id: The ID of the task to get metadata for.
    Returns:
        A TaskMetadata object containing task metadata.
    """
    pass

build_start_aio async

build_start_aio(root_tasks=None, description=None)

Async version of build_start.

Source code in stardag/registry/_base.py
async def build_start_aio(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Async version of build_start."""
    return self.build_start(root_tasks, description)

build_complete_aio async

build_complete_aio(build_id)

Async version of build_complete.

Source code in stardag/registry/_base.py
async def build_complete_aio(self, build_id: UUID) -> None:
    """Async version of build_complete."""
    self.build_complete(build_id)

build_fail_aio async

build_fail_aio(build_id, error_message=None)

Async version of build_fail.

Source code in stardag/registry/_base.py
async def build_fail_aio(
    self, build_id: UUID, error_message: str | None = None
) -> None:
    """Async version of build_fail."""
    self.build_fail(build_id, error_message)

build_cancel_aio async

build_cancel_aio(build_id)

Async version of build_cancel.

Source code in stardag/registry/_base.py
async def build_cancel_aio(self, build_id: UUID) -> None:
    """Async version of build_cancel."""
    self.build_cancel(build_id)

build_exit_early_aio async

build_exit_early_aio(build_id, reason=None)

Async version of build_exit_early.

Source code in stardag/registry/_base.py
async def build_exit_early_aio(
    self, build_id: UUID, reason: str | None = None
) -> None:
    """Async version of build_exit_early."""
    self.build_exit_early(build_id, reason)

task_register_aio async

task_register_aio(build_id, task)

Async version of task_register.

Source code in stardag/registry/_base.py
async def task_register_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_register."""
    self.task_register(build_id, task)

task_start_aio async

task_start_aio(build_id, task)

Async version of task_start.

Source code in stardag/registry/_base.py
async def task_start_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_start."""
    self.task_start(build_id, task)

task_complete_aio async

task_complete_aio(build_id, task)

Async version of task_complete.

Source code in stardag/registry/_base.py
async def task_complete_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_complete."""
    self.task_complete(build_id, task)

task_fail_aio async

task_fail_aio(build_id, task, error_message=None)

Async version of task_fail.

Source code in stardag/registry/_base.py
async def task_fail_aio(
    self, build_id: UUID, task: "BaseTask", error_message: str | None = None
) -> None:
    """Async version of task_fail."""
    self.task_fail(build_id, task, error_message)

task_suspend_aio async

task_suspend_aio(build_id, task)

Async version of task_suspend.

Source code in stardag/registry/_base.py
async def task_suspend_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_suspend."""
    self.task_suspend(build_id, task)

task_resume_aio async

task_resume_aio(build_id, task)

Async version of task_resume.

Source code in stardag/registry/_base.py
async def task_resume_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_resume."""
    self.task_resume(build_id, task)

task_cancel_aio async

task_cancel_aio(build_id, task)

Async version of task_cancel.

Source code in stardag/registry/_base.py
async def task_cancel_aio(self, build_id: UUID, task: "BaseTask") -> None:
    """Async version of task_cancel."""
    self.task_cancel(build_id, task)

task_waiting_for_lock_aio async

task_waiting_for_lock_aio(build_id, task, lock_owner=None)

Async version of task_waiting_for_lock.

Source code in stardag/registry/_base.py
async def task_waiting_for_lock_aio(
    self, build_id: UUID, task: "BaseTask", lock_owner: str | None = None
) -> None:
    """Async version of task_waiting_for_lock."""
    self.task_waiting_for_lock(build_id, task, lock_owner)

task_upload_assets_aio async

task_upload_assets_aio(build_id, task, assets)

Async version of task_upload_assets.

Source code in stardag/registry/_base.py
async def task_upload_assets_aio(
    self, build_id: UUID, task: "BaseTask", assets: list["RegistryAsset"]
) -> None:
    """Async version of task_upload_assets."""
    self.task_upload_assets(build_id, task, assets)

task_get_metadata_aio async

task_get_metadata_aio(task_id)

Async version of task_get_metadata.

Source code in stardag/registry/_base.py
async def task_get_metadata_aio(self, task_id: UUID) -> TaskMetadata:
    """Async version of task_get_metadata."""
    return self.task_get_metadata(task_id)

NoOpRegistry

Bases: RegistryABC

A registry that does nothing.

Used as a default when no registry is configured.

build_start

build_start(root_tasks=None, description=None)

Return a placeholder build ID.

Source code in stardag/registry/_base.py
def build_start(
    self,
    root_tasks: list["BaseTask"] | None = None,
    description: str | None = None,
) -> UUID:
    """Return a placeholder build ID."""
    return UUID("00000000-0000-0000-0000-000000000000")

Configuration

config

Centralized configuration for Stardag SDK.

This module provides a unified configuration system that consolidates: - Target factory settings (target roots) - API registry settings (URL, timeout, environment) - Active context (registry, workspace, environment)

Configuration is loaded from multiple sources with the following priority: 1. Environment variables (STARDAG_*) 2. Project config (.stardag/config.toml in working directory or parents) 3. User config (~/.stardag/config.toml) 4. Defaults

Usage

from stardag.config import get_config

config = get_config() print(config.api.url) print(config.target.roots)

Environment Variables (highest priority): STARDAG_PROFILE - Profile name to use (looks up in config.toml) STARDAG_REGISTRY_URL - Direct registry URL override STARDAG_WORKSPACE_ID - Direct workspace ID override STARDAG_ENVIRONMENT_ID - Direct environment ID override STARDAG_API_KEY - API key for authentication STARDAG_TARGET_ROOTS - JSON dict of target roots (override)

load_config

load_config(use_project_config=True)

Load configuration from all sources.

Priority (highest to lowest): 1. Environment variables (STARDAG_*) 2. Project config (.stardag/config.toml in repo) 3. User config (~/.stardag/config.toml) 4. Defaults

PARAMETER DESCRIPTION
use_project_config

Whether to load .stardag/config.toml from project.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
StardagConfig

Fully resolved StardagConfig.

Source code in stardag/config.py
def load_config(
    use_project_config: bool = True,
) -> StardagConfig:
    """Load configuration from all sources.

    Priority (highest to lowest):
    1. Environment variables (STARDAG_*)
    2. Project config (.stardag/config.toml in repo)
    3. User config (~/.stardag/config.toml)
    4. Defaults

    Args:
        use_project_config: Whether to load .stardag/config.toml from project.

    Returns:
        Fully resolved StardagConfig.
    """
    # 1. Load env vars first (highest priority)
    env_settings = StardagSettings()

    # 2. Load user and project TOML configs
    user_toml = load_toml_file(get_user_config_path())
    project_toml = {}
    if use_project_config:
        project_path = find_project_config()
        if project_path:
            project_toml = load_toml_file(project_path)

    # Merge configs (project overrides user)
    toml_config = _merge_toml_configs(user_toml, project_toml)

    # 3. Resolve profile → (registry, user, workspace, environment)
    profile_name: str | None = None
    registry_name: str | None = None
    registry_url: str | None = None
    user: str | None = None
    workspace_id: str | None = None
    environment_id: str | None = None

    # Check for direct env var overrides first
    if env_settings.registry_url:
        registry_url = env_settings.registry_url
        workspace_id = env_settings.workspace_id
        environment_id = env_settings.environment_id
    # Then check for profile-based config
    elif env_settings.profile:
        profile_name = env_settings.profile
    # Fall back to default profile from config
    elif toml_config.default.get("profile"):
        profile_name = toml_config.default["profile"]

    # If we have a profile, look it up
    if profile_name and not registry_url:
        profile = toml_config.profile.get(profile_name)
        if profile:
            registry_name = profile.registry
            user = profile.user  # Optional user for multi-user support
            workspace_value = profile.workspace  # Could be slug or ID
            environment_value = profile.environment  # Could be slug or ID

            # Look up registry URL from registry name
            registry_config = toml_config.registry.get(registry_name)
            if registry_config:
                registry_url = registry_config.url
            else:
                logger.warning(
                    f"Profile '{profile_name}' references unknown registry '{registry_name}'"
                )

            # Resolve workspace slug to ID if needed
            if _looks_like_uuid(workspace_value):
                workspace_id = workspace_value
            else:
                # Try to resolve from cache
                cached_workspace_id = get_cached_workspace_id(
                    registry_name, workspace_value
                )
                if cached_workspace_id:
                    workspace_id = cached_workspace_id
                else:
                    # Store the slug - will need to be resolved at runtime
                    workspace_id = workspace_value
                    logger.debug(
                        f"Workspace '{workspace_value}' is a slug, not cached. "
                        "Run 'stardag auth refresh' to resolve."
                    )

            # Resolve environment slug to ID if needed
            if _looks_like_uuid(environment_value):
                environment_id = environment_value
            elif workspace_id and _looks_like_uuid(workspace_id):
                # Can only resolve environment if we have a resolved workspace ID
                cached_env_id = get_cached_environment_id(
                    registry_name, workspace_id, environment_value
                )
                if cached_env_id:
                    environment_id = cached_env_id
                else:
                    # Store the slug - will need to be resolved at runtime
                    environment_id = environment_value
                    logger.debug(
                        f"Environment '{environment_value}' is a slug, not cached. "
                        "Run 'stardag auth refresh' to resolve."
                    )
            else:
                # Workspace is not resolved, can't resolve environment either
                environment_id = environment_value
        else:
            logger.warning(f"Profile '{profile_name}' not found in config")

    # Apply defaults
    if not registry_url:
        registry_url = DEFAULT_API_URL

    # 4. Resolve target roots
    # Priority: env > cached > default
    target_roots: dict[str, str]
    if env_settings.target_roots:
        target_roots = env_settings.target_roots
    elif registry_url and workspace_id and environment_id:
        cached_roots = get_cached_target_roots(
            registry_url, workspace_id, environment_id
        )
        if cached_roots:
            target_roots = cached_roots
        else:
            target_roots = {DEFAULT_TARGET_ROOT_KEY: DEFAULT_TARGET_ROOT}
    else:
        target_roots = {DEFAULT_TARGET_ROOT_KEY: DEFAULT_TARGET_ROOT}

    # 5. Load access token from cache (if we have profile info)
    access_token: str | None = None
    if registry_name and workspace_id and user:
        token_cache_path = get_access_token_cache_path(
            registry_name, workspace_id, user
        )
        if token_cache_path.exists():
            token_data = load_json_file(token_cache_path)
            # Check if token is still valid
            import time

            expires_at = token_data.get("expires_at", 0)
            if expires_at > time.time():
                access_token = token_data.get("access_token")

    # 6. Get API key from env
    api_key = env_settings.api_key or os.environ.get("STARDAG_API_KEY")

    return StardagConfig(
        target=TargetConfig(roots=target_roots),
        api=APIConfig(
            url=registry_url,
            timeout=env_settings.api_timeout or DEFAULT_API_TIMEOUT,
        ),
        context=ContextConfig(
            profile=profile_name,
            registry_name=registry_name,
            user=user,
            workspace_id=workspace_id,
            environment_id=environment_id,
        ),
        access_token=access_token,
        api_key=api_key,
    )