Source code for vibeblocks.components.block

import asyncio
import concurrent.futures
import inspect
import time
from typing import Any, Awaitable, Callable, Optional, TypeVar, Union

from vibeblocks.core.context import ExecutionContext
from vibeblocks.core.errors import BlockExecutionError, BlockTimeoutError
from vibeblocks.core.executable import Executable
from vibeblocks.core.outcome import Outcome
from vibeblocks.policies.retry import RetryPolicy
from vibeblocks.utils.inspection import is_async_callable

T = TypeVar("T")

# Shared executor for synchronous block timeouts to avoid overhead and thread leakage.
# Using a large enough number of workers to handle concurrent blocks.
_TASK_TIMEOUT_EXECUTOR = concurrent.futures.ThreadPoolExecutor(
    thread_name_prefix="BlockTimeout")


[docs] class Block(Executable[T]): """ Represents an atomic unit of work in a workflow. Executes a function with retry logic and supports compensation. """ def __init__( self, name: str, func: Callable[[ExecutionContext[T]], Any], description: Optional[str] = None, retry_policy: Optional[RetryPolicy] = None, undo: Optional[Callable[[ExecutionContext[T]], Any]] = None, timeout: Optional[float] = None, ): self.name = name self.func = func self.description = description or func.__doc__ self.retry_policy = retry_policy or RetryPolicy(max_attempts=1) self.undo = undo self.timeout = timeout self._is_async = is_async_callable(self.func) self._is_undo_async = is_async_callable( self.undo) if self.undo else False @property def is_async(self) -> bool: """Determines if the block function is asynchronous.""" # Optimized: Return pre-calculated value to avoid repeated inspection overhead return self._is_async
[docs] def execute(self, ctx: ExecutionContext[T]) -> Union[Outcome[T], Awaitable[Outcome[T]]]: if self.is_async: return self._execute_async(ctx) else: return self._execute_sync(ctx)
def _execute_sync(self, ctx: ExecutionContext[T]) -> Outcome[T]: ctx.log_event("INFO", self.name, "Block Started") start_time = time.time() attempt = 1 while True: try: if self.timeout: try: future = _TASK_TIMEOUT_EXECUTOR.submit(self.func, ctx) res = future.result(timeout=self.timeout) except concurrent.futures.TimeoutError: raise BlockTimeoutError( f"Block '{self.name}' timed out after {self.timeout}s" ) from None else: res = self.func(ctx) # Runtime check for false-negative async detection (e.g. lambdas) if inspect.isawaitable(res): if inspect.iscoroutine(res): res.close() # We cannot await it here because we are in sync mode. # We must warn the user that their block logic probably didn't run. # Or raise an error? Raising error is safer. msg = ( f"Block '{self.name}' returned an awaitable (coroutine) but " "was executed synchronously. Check if the function is " "defined correctly or if AsyncRunner should be used." ) raise RuntimeError(msg) duration = int((time.time() - start_time) * 1000) ctx.log_event("INFO", self.name, "Block Completed") ctx.completed_steps.add(self.name) return Outcome(status="SUCCESS", context=ctx, errors=[], duration_ms=duration) except Exception as e: duration = int((time.time() - start_time) * 1000) ctx.log_event("ERROR", self.name, f"Block Failed: {ctx.format_exception(e)}") if self.retry_policy.should_retry(attempt, e): delay = self.retry_policy.calculate_delay(attempt) msg = ( f"Retrying in {delay}s (Attempt {attempt}/" f"{self.retry_policy.max_attempts})" ) ctx.log_event("INFO", self.name, msg) time.sleep(delay) attempt += 1 continue else: msg = f"Block '{self.name}' failed after {attempt} attempts" error = BlockExecutionError(msg) error.__cause__ = e return Outcome( status="FAILED", context=ctx, errors=[error], duration_ms=duration ) async def _execute_async(self, ctx: ExecutionContext[T]) -> Outcome[T]: ctx.log_event("INFO", self.name, "Block Started (Async)") start_time = time.time() attempt = 1 while True: try: res = self.func(ctx) if inspect.isawaitable(res): if self.timeout: try: res = await asyncio.wait_for(res, timeout=self.timeout) except asyncio.TimeoutError: raise BlockTimeoutError( f"Block '{self.name}' timed out after {self.timeout}s" ) from None else: res = await res duration = int((time.time() - start_time) * 1000) ctx.log_event("INFO", self.name, "Block Completed") ctx.completed_steps.add(self.name) return Outcome(status="SUCCESS", context=ctx, errors=[], duration_ms=duration) except Exception as e: duration = int((time.time() - start_time) * 1000) ctx.log_event("ERROR", self.name, f"Block Failed: {ctx.format_exception(e)}") if self.retry_policy.should_retry(attempt, e): delay = self.retry_policy.calculate_delay(attempt) msg = ( f"Retrying in {delay}s (Attempt {attempt}/" f"{self.retry_policy.max_attempts})" ) ctx.log_event("INFO", self.name, msg) await asyncio.sleep(delay) attempt += 1 continue else: msg = f"Block '{self.name}' failed after {attempt} attempts" error = BlockExecutionError(msg) error.__cause__ = e return Outcome( status="FAILED", context=ctx, errors=[error], duration_ms=duration )
[docs] def compensate(self, ctx: ExecutionContext[T]) -> Union[None, Awaitable[None]]: if self.undo is None: return None ctx.log_event("INFO", self.name, "Compensating Block") if self._is_undo_async: return self._compensate_async(ctx) else: try: self.undo(ctx) except Exception as e: ctx.log_event("ERROR", self.name, f"Compensation Failed: {ctx.format_exception(e)}") raise return None
async def _compensate_async(self, ctx: ExecutionContext[T]) -> None: if self.undo is None: return try: res = self.undo(ctx) if inspect.isawaitable(res): await res except Exception as e: ctx.log_event("ERROR", self.name, f"Compensation Failed: {ctx.format_exception(e)}") raise