API Reference
VibeBlocks Core
Core execution context structures.
This module provides the ExecutionContext class, which flows through all tasks and processes, accumulating data and tracking trace history.
- class vibeblocks.core.context.Event(timestamp: datetime.datetime, level: Literal['INFO', 'ERROR', 'DEBUG'], source: str, message: str)[source]
Bases:
object- level: Literal['INFO', 'ERROR', 'DEBUG']
- message: str
- source: str
- timestamp: datetime
- class vibeblocks.core.context.ExecutionContext(data: ~T, trace: List[vibeblocks.core.context.Event] = <factory>, metadata: Dict[str, Any] = <factory>, completed_steps: Set[str] = <factory>, exception_sanitizer: Callable[[Exception], str] = <class 'str'>)[source]
Bases:
Generic[T]- completed_steps: Set[str]
- data: T
- exception_sanitizer
alias of
str
- format_exception(e: Exception) str[source]
Formats an exception for logging using the configured sanitizer.
- classmethod from_json(raw: str, data_cls: Type[T] | None = None) ExecutionContext[T][source]
Deserializes a JSON string back to an ExecutionContext.
- Parameters:
raw – The JSON string.
data_cls – Optional class to cast the ‘data’ field into (e.g. a dataclass or Pydantic model). If not provided, ‘data’ remains a dictionary.
- log_event(level: Literal['INFO', 'ERROR', 'DEBUG'], source: str, message: str) None[source]
Logs an event to the trace.
- metadata: Dict[str, Any]
- class vibeblocks.core.outcome.Outcome(status: ~typing.Literal['SUCCESS', 'FAILED', 'ABORTED'], context: ~vibeblocks.core.context.ExecutionContext[~vibeblocks.core.outcome.T], errors: ~typing.List[Exception] = <factory>, duration_ms: int = 0)[source]
Bases:
Generic[T]Represents the final result of a workflow or task execution.
- context: ExecutionContext[T]
- duration_ms: int = 0
- errors: List[Exception]
- status: Literal['SUCCESS', 'FAILED', 'ABORTED']
- exception vibeblocks.core.errors.BlockExecutionError[source]
Bases:
VibeBlocksErrorRaised when a Block fails to execute.
- exception vibeblocks.core.errors.BlockTimeoutError[source]
Bases:
BlockExecutionErrorRaised when a Block exceeds its allocated execution time.
- exception vibeblocks.core.errors.ChainExecutionError[source]
Bases:
VibeBlocksErrorRaised when a Chain fails (bubbling up from a Block).
- exception vibeblocks.core.errors.VibeBlocksError[source]
Bases:
ExceptionBase exception for all VibeBlocks errors.
- class vibeblocks.core.executable.Executable[source]
Bases:
Generic[T],ABCAbstract base class for all executable units in VibeBlocks.
- abstractmethod compensate(ctx: ExecutionContext[T]) None | Awaitable[None][source]
Rolls back the changes made by this unit.
- Returns:
None if synchronous. Awaitable[None] if asynchronous.
- abstractmethod execute(ctx: ExecutionContext[T]) Outcome[T] | Awaitable[Outcome[T]][source]
Executes the unit of work.
- Returns:
Outcome[T] if synchronous. Awaitable[Outcome[T]] if asynchronous.
- abstract property is_async: bool
Determines if this executable requires asynchronous execution.
Decorators for wrapping user functions into library components.
- vibeblocks.core.decorators.block(name: str | None = None, description: str | None = None, retry_policy: RetryPolicy | None = None, undo: Callable[[ExecutionContext[Any]], Any] | None = None, timeout: float | None = None, max_attempts: int = 1, delay: float = 1.0, backoff: BackoffStrategy = BackoffStrategy.FIXED, retry_on: List[Type[Exception]] | None = None, give_up_on: List[Type[Exception]] | None = None) Callable[[Callable[[ExecutionContext[Any]], Any]], Block[Any]][source]
Decorator to convert a standard function into a Block component.
- Parameters:
name – Name of the block (defaults to function name).
description – Semantic description of the block for AI/LLM contexts.
retry_policy – Highly customizable policy object. Overrides quick config args when provided.
undo – A callable that reverts changes made by the block.
- Quick Retry Config Args (only used if retry_policy is NOT provided):
max_attempts: Maximum total attempts before failing permanently (default: 1). delay: Base wait delay in seconds between retries (default: 1.0). backoff: Wait increment strategy (FIXED, LINEAR, or EXPONENTIAL) (default: FIXED). retry_on: List of Exception classes that trigger retries (default: [Exception]). give_up_on: List of Exception classes that explicitly skip retries (default: []).
VibeBlocks Components
- class vibeblocks.components.block.Block(name: str, func: Callable[[ExecutionContext[T]], Any], description: str | None = None, retry_policy: RetryPolicy | None = None, undo: Callable[[ExecutionContext[T]], Any] | None = None, timeout: float | None = None)[source]
Bases:
Executable[T]Represents an atomic unit of work in a workflow. Executes a function with retry logic and supports compensation.
- compensate(ctx: ExecutionContext[T]) None | Awaitable[None][source]
Rolls back the changes made by this unit.
- Returns:
None if synchronous. Awaitable[None] if asynchronous.
- execute(ctx: ExecutionContext[T]) Outcome[T] | Awaitable[Outcome[T]][source]
Executes the unit of work.
- Returns:
Outcome[T] if synchronous. Awaitable[Outcome[T]] if asynchronous.
- property is_async: bool
Determines if the block function is asynchronous.
Chain component handling linear execution of multiple blocks.
- class vibeblocks.components.chain.Chain(name: str, steps: List[Executable[T]])[source]
Bases:
Executable[T]A linear collection of executables (blocks or sub-chains). Executes steps sequentially.
- compensate(ctx: ExecutionContext[T]) None | Awaitable[None][source]
Rolls back the changes made by this unit.
- Returns:
None if synchronous. Awaitable[None] if asynchronous.
- execute(ctx: ExecutionContext[T]) Outcome[T] | Awaitable[Outcome[T]][source]
Executes the unit of work.
- Returns:
Outcome[T] if synchronous. Awaitable[Outcome[T]] if asynchronous.
- property is_async: bool
Determines if the chain requires asynchronous execution.
Flow orchestrator for executing trees of Blocks and Chaines. Handles high-level failure policies.
- class vibeblocks.components.flow.Flow(name: str, steps: List[Executable[T]], description: str | None = None, strategy: FailureStrategy = FailureStrategy.ABORT)[source]
Bases:
Executable[T]The top-level orchestrator that manages a sequence of steps (Blocks or Chaines) and handles failures according to a strategy.
- compensate(ctx: ExecutionContext[T]) None | Awaitable[None][source]
Rolls back the changes made by this unit.
- Returns:
None if synchronous. Awaitable[None] if asynchronous.
- execute(ctx: ExecutionContext[T]) Outcome[T] | Awaitable[Outcome[T]][source]
Executes the unit of work.
- Returns:
Outcome[T] if synchronous. Awaitable[Outcome[T]] if asynchronous.
- get_manifest() Dict[str, Any][source]
Returns a dictionary representation of the flow structure, including semantic descriptions.
- property is_async: bool
Recursively checks if any step requires async execution.
VibeBlocks dynamic orchestration layer. Allows creating and executing flows from JSON descriptions on the fly.
- class vibeblocks.vibeblocks.VibeBlocks[source]
Bases:
objectDynamic Orchestrator that builds and runs Flows from JSON definitions.
- static run_from_json(json_request: Dict[str, Any], initial_data: Any, available_blocks: Dict[str, Block]) Outcome[Any] | Awaitable[Outcome[Any]][source]
Parses a JSON request, constructs a Flow, and executes it.
- Parameters:
json_request – A dictionary defining the flow with keys like
name,steps, andstrategy.initial_data – The data object to initialize ExecutionContext with.
available_blocks – A dictionary mapping step names (strings) to Block instances.
- Returns:
The outcome of the execution.
VibeBlocks Policies
Retry logic and backoff strategies. Provides classes to handle transient errors in tasks and workflows.
- class vibeblocks.policies.retry.BackoffStrategy(*values)[source]
Bases:
EnumDefines how the delay between retry intervals grows.
- EXPONENTIAL = 3
Delays multiply using base 2 to the power of the attempt num. (delay = base_delay * (2^(attempt_num-1)))
- FIXED = 1
Fixed delay across all attempts (delay = base_delay).
- LINEAR = 2
Delays multiply linearly with the attempt’s loop iteration. (delay = base_delay * attempt_num)
- class vibeblocks.policies.retry.RetryPolicy(max_attempts: int = 3, delay: float = 1.0, backoff: BackoffStrategy = BackoffStrategy.FIXED, max_delay: float = 60.0, jitter: bool = False, retry_on: Sequence[Type[Exception]] = (<class 'Exception'>, ), give_up_on: Sequence[Type[Exception]] = ())[source]
Bases:
objectConfiguration for retry logic.
- MAX_ATTEMPTS_LIMIT = 100
- MAX_DELAY_LIMIT = 3600.0
- backoff: BackoffStrategy = 1
- delay: float = 1.0
- give_up_on: Sequence[Type[Exception]] = ()
- jitter: bool = False
- max_attempts: int = 3
- max_delay: float = 60.0
- retry_on: Sequence[Type[Exception]] = (<class 'Exception'>,)
- class vibeblocks.policies.failure.FailureStrategy(*values)[source]
Bases:
EnumDefines the behavior when a workflow or process encounters an error.
- ABORT = 1
Stop execution immediately. No compensation.
- COMPENSATE = 3
Stop execution and run compensation logic (undo) on successful steps in reverse order.
- CONTINUE = 2
Log the error and proceed to the next step. Context may be partial.
VibeBlocks Runtime
- class vibeblocks.runtime.runner.AsyncRunner[source]
Bases:
objectExecutes flows asynchronously.
- async run(executable: Executable[T], ctx: ExecutionContext[T]) Outcome[T][source]
Runs the executable asynchronously. Handles both sync (Outcome) and async (Awaitable[Outcome]) returns.
- class vibeblocks.runtime.runner.SyncRunner[source]
Bases:
objectExecutes flows synchronously.
- run(executable: Executable[T], ctx: ExecutionContext[T]) Outcome[T][source]
Runs the executable (Flow, Chain, or Block) synchronously. Raises RuntimeError if the executable returns an Awaitable (requires async execution).
VibeBlocks Utils
- vibeblocks.utils.execution.execute_flow(flow: Executable[T], data: T, async_mode: bool = False) Outcome[T] | Awaitable[Outcome[T]][source]
Executes a flow, hiding the creation of context and runner.
- Parameters:
flow – The flow or block to execute.
data – The input data object for the flow.
async_mode – If True, uses AsyncRunner; otherwise, uses SyncRunner.
- Returns:
The outcome of the flow execution.
- vibeblocks.utils.inspection.is_async_callable(obj: Any) bool[source]
Determines if an object is an async callable (coroutine function).
Warning
This only detects functions strictly declared as async def or wrappers around them. If a regular def function manually returns a coroutine (e.g., returning asyncio.sleep()), this function will return False, and it could fail at execution time in synchronous environments.
Handles: - async def functions - functools.partial wrapping async functions - Callable objects with async def __call__
Utilities for generating JSON Schemas from Flow manifests and data models. Designed to work with OpenAI Function Calling or similar LLM structured outputs.
- vibeblocks.utils.schema.generate_function_schema(flow_manifest: Dict[str, Any], context_model: Type[Any]) Dict[str, Any][source]
Generates a JSON Schema for the Flow execution request.
- Parameters:
flow_manifest – The dictionary returned by Flow.get_manifest().
context_model – The class (Dataclass or Pydantic Model) used for ExecutionContext.data.
- Returns:
A JSON Schema compatible with OpenAI Function Calling.