Skip to content

Dependencies

Dependencies define how tasks relate to each other and form the DAG structure.

Declaring Dependencies

Task dependencies are declared via the method requires:

Number = int | float

class AddAB(sd.AutoTask[Number]):

    def requires(self):
        return {
            "a": ATask(),
            "b": BTask(),
        }

    def run(self):
        deps = self.requires()
        a = deps["a"].output().load()
        b = deps["b"].output().load()
        result = a + b
        self.output().save(result)

The return type of requires can be a single BaseTask or any nested list or dictionary of BaseTasks.

The task above declares hardcoded dependencies resulting in a static DAG, both in terms of topology and each node.

We can use parameters to forward information to dependencies:

ParamType = ...  # Some information to forward to A/BTask

class AddParameterizedAB(sd.AutoTask[Number]):
    a_param: ParamType
    b_param: ParamType

    def requires(self):
        return {
            "a": ATask(a=self.a_param),
            "b": BTask(b=self.b_param),
        }

Now we have a DAG with fixed topology, but parameterized nodes.

We can extend this pattern with conditional logic to achieve parameterized topology within a predefined set of alternatives:

ParamType = ...  # Some information to forward to A/BTask

class AddParameterizedABAndMaybeC(sd.AutoTask[Number]):
    a_param: ParamType
    b_param: ParamType
    c_param: ParamType | None  # optionally include a third dependency

    def requires(self):
        deps = {
            "a": ATask(a=self.a_param),
            "b": BTask(b=self.b_param),
        }
        if c_param:
            deps["c"] = CTask(self.c_param)

        return deps

Dependency Injection

The pattern above quickly becomes verbose and inflexible, here dependency injection comes to the rescue:

class Add(sd.AutoTask[Number]):
    values: list[sd.TaskLoads[Number]]

    def requires(self):
        return self.values

    def run(self):
        result = sum([dep.output().load() for dep in self.values])
        self.output().save(result)

Now we have effectively arbitrarily parameterized the upstream dependencies, the definition of nodes as well as the DAG topology. Each element of values can be any task type, as long as its output().load() returns a Number.

We can also accept raw data or tasks, which output loads the same data type, as parameters by:

class Add(sd.AutoTask[Number]):
    values: list[sd.TaskLoads[Number] | Number]

    def requires(self):
        return [value for value in self.values if isinstance(value, sd.BaseTask)]

    def run(self):
        result = sum(
            [
                value.output().load() if isinstance(value, sd.BaseTask)
                else value
                for value in self.values
            ]
        )
        self.output().save(result)

Benefits of Dependency Injection:

  • Loose coupling: Downstream tasks don't know upstream implementation
  • Reusability: Same task works with different inputs
  • Testability: Easy to mock dependencies
  • Flexibility: Compose DAGs dynamically

Decorator API

When using the decorator API, requires is implemented automatically based on parameters annotated with sd.Depends:

Number = int | float

@sd.task
def add(a: sd.Depends[Number], b: sd.Depends[Number]) -> Number:
    return a + b

🚧 Work in progress 🚧

This documentation is still taking shape. It should soon cover:

  • Limitations of the @task decorator API
  • Dynamic dependencies (which requires upstream tasks to be executed before we know which additional dependencies are needed)
  • Common patterns and best practices
  • Context on how Polymorphism is handled via PolymorphicRoots class registry.