Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
169602b
refactor: convert Flow to Pydantic BaseModel
greysonlalonde Mar 31, 2026
f4ec171
Merge branch 'main' into refactor/flow-to-basemodel
greysonlalonde Mar 31, 2026
cbba8f0
fix: guard _flow_post_init against double-init, log model_rebuild fai…
greysonlalonde Mar 31, 2026
dd9540e
Merge branch 'refactor/flow-to-basemodel' of https://github.com/crewA…
greysonlalonde Mar 31, 2026
f7d8fb7
fix: use typing_extensions.TypedDict in prompts for Python < 3.12
greysonlalonde Mar 31, 2026
7695c56
fix: restore init_state is None guard in _create_initial_state
greysonlalonde Mar 31, 2026
f379334
Merge branch 'main' into refactor/flow-to-basemodel
greysonlalonde Mar 31, 2026
c537cdd
fix: add type annotations for initial_state in memory flows
greysonlalonde Mar 31, 2026
2090beb
Merge branch 'refactor/flow-to-basemodel' of https://github.com/crewA…
greysonlalonde Mar 31, 2026
d060e94
fix: add id field to AgentExecutorState for flow_id and tracing
greysonlalonde Mar 31, 2026
ab99fba
fix: operator precedence bug in stop words set expression
greysonlalonde Mar 31, 2026
daa9354
fix: narrow model_rebuild exception catch to ImportError and Pydantic…
greysonlalonde Mar 31, 2026
80da812
fix: remove duplicate private attrs for persistence and max_method_calls
greysonlalonde Mar 31, 2026
d48d309
Merge branch 'main' into refactor/flow-to-basemodel
greysonlalonde Mar 31, 2026
c5e71f8
fix: update test to use public persistence field
greysonlalonde Mar 31, 2026
c0ced12
fix: skip auto-memory creation in AgentExecutor
greysonlalonde Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions lib/crewai/src/crewai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import urllib.request
import warnings

from pydantic import PydanticUserError

from crewai.agent.core import Agent
from crewai.agent.planning_config import PlanningConfig
from crewai.crew import Crew
Expand Down Expand Up @@ -93,6 +95,38 @@ def __getattr__(name: str) -> Any:
raise AttributeError(f"module 'crewai' has no attribute {name!r}")


try:
from crewai.agents.tools_handler import ToolsHandler as _ToolsHandler
from crewai.experimental.agent_executor import AgentExecutor as _AgentExecutor
from crewai.hooks.llm_hooks import LLMCallHookContext as _LLMCallHookContext
from crewai.tools.tool_types import ToolResult as _ToolResult
from crewai.utilities.prompts import (
StandardPromptResult as _StandardPromptResult,
SystemPromptResult as _SystemPromptResult,
)

_AgentExecutor.model_rebuild(
force=True,
_types_namespace={
"Agent": Agent,
"ToolsHandler": _ToolsHandler,
"Crew": Crew,
"BaseLLM": BaseLLM,
"Task": Task,
"StandardPromptResult": _StandardPromptResult,
"SystemPromptResult": _SystemPromptResult,
"LLMCallHookContext": _LLMCallHookContext,
"ToolResult": _ToolResult,
},
)
except (ImportError, PydanticUserError):
import logging as _logging

_logging.getLogger(__name__).warning(
"AgentExecutor.model_rebuild() failed; forward refs may be unresolved.",
exc_info=True,
)

__all__ = [
"LLM",
"Agent",
Expand Down
2 changes: 1 addition & 1 deletion lib/crewai/src/crewai/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ def _update_executor_parameters(
self.agent_executor.tools = tools
self.agent_executor.original_tools = raw_tools
self.agent_executor.prompt = prompt
self.agent_executor.stop = stop_words
self.agent_executor.stop_words = stop_words
self.agent_executor.tools_names = get_tool_names(tools)
self.agent_executor.tools_description = render_text_description_and_args(tools)
self.agent_executor.response_model = (
Expand Down
223 changes: 74 additions & 149 deletions lib/crewai/src/crewai/experimental/agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
from typing import TYPE_CHECKING, Any, Literal, TypeVar, cast
from uuid import uuid4

from pydantic import BaseModel, Field, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
from pydantic import (
BaseModel,
Field,
PrivateAttr,
model_validator,
)
from rich.console import Console
from rich.text import Text
from typing_extensions import Self

from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.parser import (
Expand Down Expand Up @@ -119,6 +124,7 @@ class AgentExecutorState(BaseModel):
(todos, observations, replan tracking) in a single validated model.
"""

id: str = Field(default_factory=lambda: str(uuid4()))
messages: list[LLMMessage] = Field(default_factory=list)
iterations: int = Field(default=0)
current_answer: AgentAction | AgentFinish | None = Field(default=None)
Expand Down Expand Up @@ -152,143 +158,84 @@ class AgentExecutorState(BaseModel):
class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin):
"""Agent Executor for both standalone agents and crew-bound agents.

_skip_auto_memory prevents Flow from eagerly allocating a Memory
instance — the executor uses agent/crew memory, not its own.

Inherits from:
- Flow[AgentExecutorState]: Provides flow orchestration capabilities
- CrewAgentExecutorMixin: Provides memory methods (short/long/external term)

This executor can operate in two modes:
- Standalone mode: When crew and task are None (used by Agent.kickoff())
- Crew mode: When crew and task are provided (used by Agent.execute_task())

Note: Multiple instances may be created during agent initialization
(cache setup, RPM controller setup, etc.) but only the final instance
should execute tasks via invoke().
"""

def __init__(
self,
llm: BaseLLM,
agent: Agent,
prompt: SystemPromptResult | StandardPromptResult,
max_iter: int,
tools: list[CrewStructuredTool],
tools_names: str,
stop_words: list[str],
tools_description: str,
tools_handler: ToolsHandler,
task: Task | None = None,
crew: Crew | None = None,
step_callback: Any = None,
original_tools: list[BaseTool] | None = None,
function_calling_llm: BaseLLM | Any | None = None,
respect_context_window: bool = False,
request_within_rpm_limit: Callable[[], bool] | None = None,
callbacks: list[Any] | None = None,
response_model: type[BaseModel] | None = None,
i18n: I18N | None = None,
) -> None:
"""Initialize the flow-based agent executor.
_skip_auto_memory: bool = True

suppress_flow_events: bool = True # always suppress for executor
llm: BaseLLM = Field(exclude=True)
agent: Agent = Field(exclude=True)
prompt: SystemPromptResult | StandardPromptResult = Field(exclude=True)
max_iter: int = Field(default=25, exclude=True)
tools: list[CrewStructuredTool] = Field(default_factory=list, exclude=True)
tools_names: str = Field(default="", exclude=True)
stop_words: list[str] = Field(default_factory=list, exclude=True)
tools_description: str = Field(default="", exclude=True)
tools_handler: ToolsHandler | None = Field(default=None, exclude=True)
task: Task | None = Field(default=None, exclude=True)
crew: Crew | None = Field(default=None, exclude=True)
step_callback: Any = Field(default=None, exclude=True)
original_tools: list[BaseTool] = Field(default_factory=list, exclude=True)
function_calling_llm: BaseLLM | None = Field(default=None, exclude=True)
respect_context_window: bool = Field(default=False, exclude=True)
request_within_rpm_limit: Callable[[], bool] | None = Field(
default=None, exclude=True
)
callbacks: list[Any] = Field(default_factory=list, exclude=True)
response_model: type[BaseModel] | None = Field(default=None, exclude=True)
i18n: I18N | None = Field(default=None, exclude=True)
log_error_after: int = Field(default=3, exclude=True)
before_llm_call_hooks: list[BeforeLLMCallHookType | BeforeLLMCallHookCallable] = (
Field(default_factory=list, exclude=True)
)
after_llm_call_hooks: list[AfterLLMCallHookType | AfterLLMCallHookCallable] = Field(
default_factory=list, exclude=True
)

Args:
llm: Language model instance.
agent: Agent to execute.
prompt: Prompt templates.
max_iter: Maximum iterations.
tools: Available tools.
tools_names: Tool names string.
stop_words: Stop word list.
tools_description: Tool descriptions.
tools_handler: Tool handler instance.
task: Optional task to execute (None for standalone agent execution).
crew: Optional crew instance (None for standalone agent execution).
step_callback: Optional step callback.
original_tools: Original tool list.
function_calling_llm: Optional function calling LLM.
respect_context_window: Respect context limits.
request_within_rpm_limit: RPM limit check function.
callbacks: Optional callbacks list.
response_model: Optional Pydantic model for structured outputs.
"""
self._i18n: I18N = i18n or get_i18n()
self.llm = llm
self.task: Task | None = task
self.agent = agent
self.crew: Crew | None = crew
self.prompt = prompt
self.tools = tools
self.tools_names = tools_names
self.stop = stop_words
self.max_iter = max_iter
self.callbacks = callbacks or []
self._printer: Printer = Printer()
self.tools_handler = tools_handler
self.original_tools = original_tools or []
self.step_callback = step_callback
self.tools_description = tools_description
self.function_calling_llm = function_calling_llm
self.respect_context_window = respect_context_window
self.request_within_rpm_limit = request_within_rpm_limit
self.response_model = response_model
self.log_error_after = 3
self._console: Console = Console()

# Error context storage for recovery
self._last_parser_error: OutputParserError | None = None
self._last_context_error: Exception | None = None

# Execution guard to prevent concurrent/duplicate executions
self._execution_lock = threading.Lock()
self._finalize_lock = threading.Lock()
self._finalize_called: bool = False
self._is_executing: bool = False
self._has_been_invoked: bool = False
self._flow_initialized: bool = False

self._instance_id = str(uuid4())[:8]

self.before_llm_call_hooks: list[
BeforeLLMCallHookType | BeforeLLMCallHookCallable
] = []
self.after_llm_call_hooks: list[
AfterLLMCallHookType | AfterLLMCallHookCallable
] = []
_i18n: I18N = PrivateAttr(default_factory=get_i18n)
_printer: Printer = PrivateAttr(default_factory=Printer)
_console: Console = PrivateAttr(default_factory=Console)
_last_parser_error: OutputParserError | None = PrivateAttr(default=None)
_last_context_error: Exception | None = PrivateAttr(default=None)
_execution_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_finalize_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
_finalize_called: bool = PrivateAttr(default=False)
_is_executing: bool = PrivateAttr(default=False)
_has_been_invoked: bool = PrivateAttr(default=False)
_instance_id: str = PrivateAttr(default_factory=lambda: str(uuid4())[:8])
_step_executor: Any = PrivateAttr(default=None)
_planner_observer: Any = PrivateAttr(default=None)

@model_validator(mode="after")
def _setup_executor(self) -> Self:
"""Configure executor after Pydantic field initialization."""
self._i18n = self.i18n or get_i18n()
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())

if self.llm:
existing_stop = getattr(self.llm, "stop", [])
self.llm.stop = list(
set(
existing_stop + self.stop
if isinstance(existing_stop, list)
else self.stop
)
)
self._state = AgentExecutorState()

# Plan-and-Execute components (Phase 2)
# Lazy-imported to avoid circular imports during module load
self._step_executor: Any = None
self._planner_observer: Any = None
if not isinstance(existing_stop, list):
existing_stop = []
self.llm.stop = list(set(existing_stop + self.stop_words))

def _ensure_flow_initialized(self) -> None:
"""Ensure Flow.__init__() has been called.
self._state = AgentExecutorState()
self.max_method_calls = self.max_iter * 10

This is deferred from __init__ to prevent FlowCreatedEvent emission
during agent setup when multiple executor instances are created.
Only the instance that actually executes via invoke() will emit events.
"""
if not self._flow_initialized:
current_tracing = is_tracing_enabled_in_context()
# Now call Flow's __init__ which will replace self._state
# with Flow's managed state. Suppress flow events since this is
# an agent executor, not a user-facing flow.
super().__init__(
suppress_flow_events=True,
tracing=current_tracing if current_tracing else None,
max_method_calls=self.max_iter * 10,
)
self._flow_initialized = True
current_tracing = is_tracing_enabled_in_context()
self.tracing = current_tracing if current_tracing else None
self._flow_post_init()
return self

def _check_native_tool_support(self) -> bool:
"""Check if LLM supports native function calling."""
Expand Down Expand Up @@ -318,19 +265,13 @@ def use_stop_words(self) -> bool:

@property
def state(self) -> AgentExecutorState:
"""Get state - returns temporary state if Flow not yet initialized.

Flow initialization is deferred to prevent event emission during agent setup.
Returns the temporary state until invoke() is called.
"""
if self._flow_initialized and hasattr(self, "_state_lock"):
return StateProxy(self._state, self._state_lock) # type: ignore[return-value]
return self._state
"""Get thread-safe state proxy."""
return StateProxy(self._state, self._state_lock) # type: ignore[return-value]

@property
def iterations(self) -> int:
"""Compatibility property for mixin - returns state iterations."""
return self._state.iterations
return self._state.iterations # type: ignore[no-any-return]

@iterations.setter
def iterations(self, value: int) -> None:
Expand All @@ -340,7 +281,7 @@ def iterations(self, value: int) -> None:
@property
def messages(self) -> list[LLMMessage]:
"""Compatibility property - returns state messages."""
return self._state.messages
return self._state.messages # type: ignore[no-any-return]

@messages.setter
def messages(self, value: list[LLMMessage]) -> None:
Expand Down Expand Up @@ -1969,8 +1910,7 @@ def _execute_single_native_tool_call(self, tool_call: Any) -> dict[str, Any]:
@listen("initialized")
def continue_iteration(self) -> Literal["check_iteration"]:
"""Bridge listener that connects iteration loop back to iteration check."""
if self._flow_initialized:
self._discard_or_listener(FlowMethodName("continue_iteration"))
self._discard_or_listener(FlowMethodName("continue_iteration"))
return "check_iteration"

@router(or_(initialize_reasoning, continue_iteration))
Expand Down Expand Up @@ -2598,8 +2538,6 @@ def invoke(
if is_inside_event_loop():
return self.invoke_async(inputs)

self._ensure_flow_initialized()

with self._execution_lock:
if self._is_executing:
raise RuntimeError(
Expand Down Expand Up @@ -2690,8 +2628,6 @@ async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
Returns:
Dictionary with agent output.
"""
self._ensure_flow_initialized()

with self._execution_lock:
if self._is_executing:
raise RuntimeError(
Expand Down Expand Up @@ -3007,17 +2943,6 @@ def _is_training_mode(self) -> bool:
"""
return bool(self.crew and self.crew._train)

@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: GetCoreSchemaHandler
) -> CoreSchema:
"""Generate Pydantic core schema for Protocol compatibility.

Allows the executor to be used in Pydantic models without
requiring arbitrary_types_allowed=True.
"""
return core_schema.any_schema()


# Backward compatibility alias (deprecated)
CrewAgentExecutorFlow = AgentExecutor
Loading
Loading