API Reference

VibeBlocks Core

Core execution context structures.

This module provides the ExecutionContext class, which flows through all tasks and processes, accumulating data and tracking trace history.

class vibeblocks.core.context.Event(timestamp: datetime.datetime, level: Literal['INFO', 'ERROR', 'DEBUG'], source: str, message: str)[source]

Bases: object

level: Literal['INFO', 'ERROR', 'DEBUG']
message: str
source: str
timestamp: datetime
class vibeblocks.core.context.ExecutionContext(data: ~T, trace: List[vibeblocks.core.context.Event] = <factory>, metadata: Dict[str, Any] = <factory>, completed_steps: Set[str] = <factory>, exception_sanitizer: Callable[[Exception], str] = <class 'str'>)[source]

Bases: Generic[T]

completed_steps: Set[str]
data: T
exception_sanitizer

alias of str

format_exception(e: Exception) str[source]

Formats an exception for logging using the configured sanitizer.

classmethod from_json(raw: str, data_cls: Type[T] | None = None) ExecutionContext[T][source]

Deserializes a JSON string back to an ExecutionContext.

Parameters:
  • raw – The JSON string.

  • data_cls – Optional class to cast the ‘data’ field into (e.g. a dataclass or Pydantic model). If not provided, ‘data’ remains a dictionary.

log_event(level: Literal['INFO', 'ERROR', 'DEBUG'], source: str, message: str) None[source]

Logs an event to the trace.

metadata: Dict[str, Any]
to_json() str[source]

Serializes the context to a JSON string.

trace: List[Event]
class vibeblocks.core.outcome.Outcome(status: ~typing.Literal['SUCCESS', 'FAILED', 'ABORTED'], context: ~vibeblocks.core.context.ExecutionContext[~vibeblocks.core.outcome.T], errors: ~typing.List[Exception] = <factory>, duration_ms: int = 0)[source]

Bases: Generic[T]

Represents the final result of a workflow or task execution.

context: ExecutionContext[T]
duration_ms: int = 0
errors: List[Exception]
status: Literal['SUCCESS', 'FAILED', 'ABORTED']
exception vibeblocks.core.errors.BlockExecutionError[source]

Bases: VibeBlocksError

Raised when a Block fails to execute.

exception vibeblocks.core.errors.BlockTimeoutError[source]

Bases: BlockExecutionError

Raised when a Block exceeds its allocated execution time.

exception vibeblocks.core.errors.ChainExecutionError[source]

Bases: VibeBlocksError

Raised when a Chain fails (bubbling up from a Block).

exception vibeblocks.core.errors.VibeBlocksError[source]

Bases: Exception

Base exception for all VibeBlocks errors.

class vibeblocks.core.executable.Executable[source]

Bases: Generic[T], ABC

Abstract base class for all executable units in VibeBlocks.

abstractmethod compensate(ctx: ExecutionContext[T]) None | Awaitable[None][source]

Rolls back the changes made by this unit.

Returns:

None if synchronous. Awaitable[None] if asynchronous.

abstractmethod execute(ctx: ExecutionContext[T]) Outcome[T] | Awaitable[Outcome[T]][source]

Executes the unit of work.

Returns:

Outcome[T] if synchronous. Awaitable[Outcome[T]] if asynchronous.

abstract property is_async: bool

Determines if this executable requires asynchronous execution.

Decorators for wrapping user functions into library components.

vibeblocks.core.decorators.block(name: str | None = None, description: str | None = None, retry_policy: RetryPolicy | None = None, undo: Callable[[ExecutionContext[Any]], Any] | None = None, timeout: float | None = None, max_attempts: int = 1, delay: float = 1.0, backoff: BackoffStrategy = BackoffStrategy.FIXED, retry_on: List[Type[Exception]] | None = None, give_up_on: List[Type[Exception]] | None = None) Callable[[Callable[[ExecutionContext[Any]], Any]], Block[Any]][source]

Decorator to convert a standard function into a Block component.

Parameters:
  • name – Name of the block (defaults to function name).

  • description – Semantic description of the block for AI/LLM contexts.

  • retry_policy – Highly customizable policy object. Overrides quick config args when provided.

  • undo – A callable that reverts changes made by the block.

Quick Retry Config Args (only used if retry_policy is NOT provided):

max_attempts: Maximum total attempts before failing permanently (default: 1). delay: Base wait delay in seconds between retries (default: 1.0). backoff: Wait increment strategy (FIXED, LINEAR, or EXPONENTIAL) (default: FIXED). retry_on: List of Exception classes that trigger retries (default: [Exception]). give_up_on: List of Exception classes that explicitly skip retries (default: []).

VibeBlocks Components

class vibeblocks.components.block.Block(name: str, func: Callable[[ExecutionContext[T]], Any], description: str | None = None, retry_policy: RetryPolicy | None = None, undo: Callable[[ExecutionContext[T]], Any] | None = None, timeout: float | None = None)[source]

Bases: Executable[T]

Represents an atomic unit of work in a workflow. Executes a function with retry logic and supports compensation.

compensate(ctx: ExecutionContext[T]) None | Awaitable[None][source]

Rolls back the changes made by this unit.

Returns:

None if synchronous. Awaitable[None] if asynchronous.

execute(ctx: ExecutionContext[T]) Outcome[T] | Awaitable[Outcome[T]][source]

Executes the unit of work.

Returns:

Outcome[T] if synchronous. Awaitable[Outcome[T]] if asynchronous.

property is_async: bool

Determines if the block function is asynchronous.

Chain component handling linear execution of multiple blocks.

class vibeblocks.components.chain.Chain(name: str, steps: List[Executable[T]])[source]

Bases: Executable[T]

A linear collection of executables (blocks or sub-chains). Executes steps sequentially.

compensate(ctx: ExecutionContext[T]) None | Awaitable[None][source]

Rolls back the changes made by this unit.

Returns:

None if synchronous. Awaitable[None] if asynchronous.

execute(ctx: ExecutionContext[T]) Outcome[T] | Awaitable[Outcome[T]][source]

Executes the unit of work.

Returns:

Outcome[T] if synchronous. Awaitable[Outcome[T]] if asynchronous.

property is_async: bool

Determines if the chain requires asynchronous execution.

Flow orchestrator for executing trees of Blocks and Chaines. Handles high-level failure policies.

class vibeblocks.components.flow.Flow(name: str, steps: List[Executable[T]], description: str | None = None, strategy: FailureStrategy = FailureStrategy.ABORT)[source]

Bases: Executable[T]

The top-level orchestrator that manages a sequence of steps (Blocks or Chaines) and handles failures according to a strategy.

compensate(ctx: ExecutionContext[T]) None | Awaitable[None][source]

Rolls back the changes made by this unit.

Returns:

None if synchronous. Awaitable[None] if asynchronous.

execute(ctx: ExecutionContext[T]) Outcome[T] | Awaitable[Outcome[T]][source]

Executes the unit of work.

Returns:

Outcome[T] if synchronous. Awaitable[Outcome[T]] if asynchronous.

get_manifest() Dict[str, Any][source]

Returns a dictionary representation of the flow structure, including semantic descriptions.

property is_async: bool

Recursively checks if any step requires async execution.

VibeBlocks dynamic orchestration layer. Allows creating and executing flows from JSON descriptions on the fly.

class vibeblocks.vibeblocks.VibeBlocks[source]

Bases: object

Dynamic Orchestrator that builds and runs Flows from JSON definitions.

static run_from_json(json_request: Dict[str, Any], initial_data: Any, available_blocks: Dict[str, Block]) Outcome[Any] | Awaitable[Outcome[Any]][source]

Parses a JSON request, constructs a Flow, and executes it.

Parameters:
  • json_request – A dictionary defining the flow with keys like name, steps, and strategy.

  • initial_data – The data object to initialize ExecutionContext with.

  • available_blocks – A dictionary mapping step names (strings) to Block instances.

Returns:

The outcome of the execution.

VibeBlocks Policies

Retry logic and backoff strategies. Provides classes to handle transient errors in tasks and workflows.

class vibeblocks.policies.retry.BackoffStrategy(*values)[source]

Bases: Enum

Defines how the delay between retry intervals grows.

EXPONENTIAL = 3

Delays multiply using base 2 to the power of the attempt num. (delay = base_delay * (2^(attempt_num-1)))

FIXED = 1

Fixed delay across all attempts (delay = base_delay).

LINEAR = 2

Delays multiply linearly with the attempt’s loop iteration. (delay = base_delay * attempt_num)

class vibeblocks.policies.retry.RetryPolicy(max_attempts: int = 3, delay: float = 1.0, backoff: BackoffStrategy = BackoffStrategy.FIXED, max_delay: float = 60.0, jitter: bool = False, retry_on: Sequence[Type[Exception]] = (<class 'Exception'>, ), give_up_on: Sequence[Type[Exception]] = ())[source]

Bases: object

Configuration for retry logic.

MAX_ATTEMPTS_LIMIT = 100
MAX_DELAY_LIMIT = 3600.0
backoff: BackoffStrategy = 1
calculate_delay(attempt: int) float[source]

Calculates the delay before the next retry attempt.

delay: float = 1.0
give_up_on: Sequence[Type[Exception]] = ()
jitter: bool = False
max_attempts: int = 3
max_delay: float = 60.0
retry_on: Sequence[Type[Exception]] = (<class 'Exception'>,)
should_retry(attempt: int, exception: Exception) bool[source]

Determines if a retry should occur based on attempts and exception type.

class vibeblocks.policies.failure.FailureStrategy(*values)[source]

Bases: Enum

Defines the behavior when a workflow or process encounters an error.

ABORT = 1

Stop execution immediately. No compensation.

COMPENSATE = 3

Stop execution and run compensation logic (undo) on successful steps in reverse order.

CONTINUE = 2

Log the error and proceed to the next step. Context may be partial.

VibeBlocks Runtime

class vibeblocks.runtime.runner.AsyncRunner[source]

Bases: object

Executes flows asynchronously.

async run(executable: Executable[T], ctx: ExecutionContext[T]) Outcome[T][source]

Runs the executable asynchronously. Handles both sync (Outcome) and async (Awaitable[Outcome]) returns.

class vibeblocks.runtime.runner.SyncRunner[source]

Bases: object

Executes flows synchronously.

run(executable: Executable[T], ctx: ExecutionContext[T]) Outcome[T][source]

Runs the executable (Flow, Chain, or Block) synchronously. Raises RuntimeError if the executable returns an Awaitable (requires async execution).

VibeBlocks Utils

vibeblocks.utils.execution.execute_flow(flow: Executable[T], data: T, async_mode: bool = False) Outcome[T] | Awaitable[Outcome[T]][source]

Executes a flow, hiding the creation of context and runner.

Parameters:
  • flow – The flow or block to execute.

  • data – The input data object for the flow.

  • async_mode – If True, uses AsyncRunner; otherwise, uses SyncRunner.

Returns:

The outcome of the flow execution.

vibeblocks.utils.inspection.is_async_callable(obj: Any) bool[source]

Determines if an object is an async callable (coroutine function).

Warning

This only detects functions strictly declared as async def or wrappers around them. If a regular def function manually returns a coroutine (e.g., returning asyncio.sleep()), this function will return False, and it could fail at execution time in synchronous environments.

Handles: - async def functions - functools.partial wrapping async functions - Callable objects with async def __call__

Utilities for generating JSON Schemas from Flow manifests and data models. Designed to work with OpenAI Function Calling or similar LLM structured outputs.

vibeblocks.utils.schema.generate_function_schema(flow_manifest: Dict[str, Any], context_model: Type[Any]) Dict[str, Any][source]

Generates a JSON Schema for the Flow execution request.

Parameters:
  • flow_manifest – The dictionary returned by Flow.get_manifest().

  • context_model – The class (Dataclass or Pydantic Model) used for ExecutionContext.data.

Returns:

A JSON Schema compatible with OpenAI Function Calling.

vibeblocks.utils.serialization.from_json(json_str: str) Any[source]

Deserializes a JSON string to a dictionary/list structure.

vibeblocks.utils.serialization.to_json(obj: Any, indent: int | None = 2) str[source]

Serializes an object to a JSON string using custom encoders.