Skip to content

12. Nested Flows

VibeBlocks supports flow nesting: a Flow can be used as a step inside another Flow (or a Chain). This is possible because every component — Block, Chain, and Flow — implements the Executable interface, and a Flow accepts any list of Executable objects as its steps.


How It Works

The Executable Interface

All three component types implement the same abstract base class:

class Executable(Generic[T], ABC):
    @property
    def is_async(self) -> bool: ...
    def execute(self, ctx: ExecutionContext[T]) -> Outcome[T] | Awaitable[Outcome[T]]: ...
    def compensate(self, ctx: ExecutionContext[T]) -> None | Awaitable[None]: ...

Because Flow is an Executable, it can be passed anywhere an Executable is accepted — including inside another Flow's blocks list.


Basic Example

from vibeblocks import Flow, block, ExecutionContext, execute_flow
from vibeblocks.policies.failure import FailureStrategy

@block()
def validate_input(ctx: ExecutionContext) -> None:
    assert ctx.data["amount"] > 0

@block()
def reserve_inventory(ctx: ExecutionContext) -> None:
    ctx.data["reserved"] = True

@block()
def charge_payment(ctx: ExecutionContext) -> None:
    ctx.data["charged"] = True

@block()
def send_confirmation(ctx: ExecutionContext) -> None:
    ctx.data["confirmed"] = True

# Inner flow: transactional sub-workflow
fulfillment_flow = Flow(
    name="fulfillment",
    blocks=[reserve_inventory, charge_payment],
    strategy=FailureStrategy.COMPENSATE,  # undo on failure
)

# Outer flow: full order pipeline
order_flow = Flow(
    name="order_pipeline",
    blocks=[validate_input, fulfillment_flow, send_confirmation],
    strategy=FailureStrategy.ABORT,
)

outcome = execute_flow(order_flow, data={"amount": 100})
print(outcome.status)  # SUCCESS

Failure Isolation

Each Flow applies its own FailureStrategy independently:

  • The inner flow handles failures among its own steps — it can retry, continue, or compensate internally.
  • The outer flow only sees the final Outcome returned by the inner flow. If the inner flow returns a FAILED Outcome, the outer flow reacts according to its strategy.

This gives you two-layer failure control:

outer_flow (strategy=ABORT)
├── validate_input
├── fulfillment_flow (strategy=COMPENSATE) ← inner failure handled here first
│   ├── reserve_inventory
│   └── charge_payment
└── send_confirmation

Async Propagation

Async detection is propagated automatically. If any step inside a nested flow is async, the entire hierarchy becomes async — no manual configuration required:

@block()
async def async_step(ctx: ExecutionContext) -> None:
    await some_io_call()

inner = Flow("inner", blocks=[async_step])  # is_async = True
outer = Flow("outer", blocks=[inner])        # is_async = True — auto-detected

# execute_flow detects async_mode automatically
outcome = await execute_flow(outer, data=..., async_mode=True)

Compensation in Nested Flows

When the outer flow compensates (i.e., strategy=COMPENSATE), it calls compensate() on each step that previously succeeded, in reverse order. For a nested flow, this triggers the inner flow's own compensate() method, which in turn reverses its own completed steps.

This produces the correct saga-style rollback across all levels:

Execution:  validate → reserve → charge  ← (charge fails)
Rollback:   charge.undo → reserve.undo   ← inner flow compensates
            then outer flow compensates validate if needed

⚠️ Critical Constraint: Globally Unique Names

Completion tracking uses ctx.completed_blocks, a flat Set[str] shared across the entire execution. Each block and flow is identified by its name attribute.

If two steps at any nesting level share the same name, compensation will be unreliable.

# BAD: name collision
inner = Flow("process", blocks=[...])
outer = Flow("process", blocks=[inner])  # ← same name as inner!

# GOOD: unique names at every level
inner = Flow("fulfillment_process", blocks=[...])
outer = Flow("order_pipeline", blocks=[inner])

The library does not automatically namespace nested step names. It is the developer's responsibility to ensure uniqueness.


Nesting Flow inside Chain

A Flow can also be nested inside a Chain:

from vibeblocks import Chain

chain = Chain("my_chain", blocks=[step_a, inner_flow, step_b])

A Chain has no FailureStrategy — it stops on the first failure and returns the failed Outcome to whoever called it (typically a parent Flow).


Summary

Feature Supported? Notes
Flow inside Flow Any nesting depth
Flow inside Chain
Chain inside Flow Standard usage
Async propagation Automatic, no config needed
Independent failure strategies per level Each flow applies its own
Cross-level compensation (saga) Reverses in correct order
Automatic name namespacing Names must be globally unique