Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/en/concepts/files.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ result = flow.kickoff(
)
```

You can also define file types directly in your flow state for structured file handling:

```python
from pydantic import BaseModel
from crewai.flow.flow import Flow, start
from crewai_files import ImageFile, PDFFile

class DocumentState(BaseModel):
document: PDFFile
cover_image: ImageFile
title: str = ""

class DocumentFlow(Flow[DocumentState]):
@start()
def process(self):
content = self.state.document.read()
return {"processed": True}
```

<Note type="info" title="CrewAI Platform Integration">
When deploying flows to the CrewAI Platform (AMP), file fields in your state automatically render as file upload dropzones in the UI. For API usage, you can pass URL strings directly and Pydantic coerces them to file objects automatically. See [Flows - File Inputs](/en/concepts/flows#file-inputs) for details.
</Note>

### With Standalone Agents

Pass files directly to agent kickoff:
Expand Down
63 changes: 63 additions & 0 deletions docs/en/concepts/flows.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,69 @@ flow.kickoff()

By providing both unstructured and structured state management options, CrewAI Flows empowers developers to build AI workflows that are both flexible and robust, catering to a wide range of application requirements.

## File Inputs

Flows support file inputs through the `crewai-files` package, enabling you to build workflows that process images, PDFs, and other file types. When you use file types like `ImageFile` or `PDFFile` in your flow state, they integrate seamlessly with both local development and the CrewAI Platform.

<Note type="info" title="Optional Dependency">
File support requires the optional `crewai-files` package. Install it with:

```bash
uv add 'crewai[file-processing]'
```
</Note>

### Using File Types in Flow State

You can include file types directly in your structured flow state:

```python
from pydantic import BaseModel
from crewai.flow.flow import Flow, start
from crewai_files import ImageFile, PDFFile

class DocumentProcessingState(BaseModel):
document: PDFFile # Renders as file upload in CrewAI Platform
cover_image: ImageFile # Renders as image upload
title: str = "" # Renders as text input

class DocumentFlow(Flow[DocumentProcessingState]):
@start()
def process_document(self):
# Access the file - works with URLs, paths, or uploaded files
content = self.state.document.read()
# Or pass to an agent with VisionTool, etc.
return {"processed": True}
```

### CrewAI Platform Integration

When you deploy a flow to the CrewAI Platform (AMP), file fields in your state automatically render as file upload dropzones in the UI. This makes it easy to build user-facing applications that accept file uploads without any additional frontend work.

| State Field Type | Platform UI Rendering |
|:-----------------|:----------------------|
| `ImageFile` | Image upload dropzone |
| `PDFFile` | PDF upload dropzone |
| `AudioFile` | Audio upload dropzone |
| `VideoFile` | Video upload dropzone |
| `TextFile` | Text file upload dropzone |
| `str`, `int`, etc. | Standard form inputs |

### API Usage

When calling your flow via API, you can pass URL strings directly for file fields. Pydantic automatically coerces URLs into the appropriate file type:

```python
# API request body - URLs are automatically converted to file objects
{
"document": "https://example.com/report.pdf",
"cover_image": "https://example.com/cover.png",
"title": "Q4 Report"
}
```

For more details on file types, sources, and provider support, see the [Files documentation](/en/concepts/files).

## Flow Persistence

The @persist decorator enables automatic state persistence in CrewAI Flows, allowing you to maintain flow state across restarts or different workflow executions. This decorator can be applied at either the class level or method level, providing flexibility in how you manage state persistence.
Expand Down
10 changes: 10 additions & 0 deletions lib/crewai/src/crewai/flow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,7 @@ def __init__(
self.human_feedback_history: list[HumanFeedbackResult] = []
self.last_human_feedback: HumanFeedbackResult | None = None
self._pending_feedback_context: PendingFeedbackContext | None = None
self._human_feedback_method_output: Any = None # Stashed real output from @human_feedback with emit
self.suppress_flow_events: bool = suppress_flow_events

# User input history (for self.ask())
Expand Down Expand Up @@ -2290,6 +2291,15 @@ async def _execute_method(
result = await result

self._method_outputs.append(result)

# For @human_feedback methods with emit, the result is the collapsed outcome
# (e.g., "approved") used for routing. But we want the actual method output
# to be the stored result (for final flow output). Replace the last entry
# if a stashed output exists.
if self._human_feedback_method_output is not None:
self._method_outputs[-1] = self._human_feedback_method_output
self._human_feedback_method_output = None

self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)
Expand Down
12 changes: 12 additions & 0 deletions lib/crewai/src/crewai/flow/human_feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,12 @@ async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
):
_distill_and_store_lessons(self, method_output, raw_feedback)

# Stash the real method output for final flow result when emit is set
# (result is the collapsed outcome string for routing, but we want to
# preserve the actual method output as the flow's final result)
if emit:
self._human_feedback_method_output = method_output

return result

wrapper: Any = async_wrapper
Expand All @@ -615,6 +621,12 @@ def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
):
_distill_and_store_lessons(self, method_output, raw_feedback)

# Stash the real method output for final flow result when emit is set
# (result is the collapsed outcome string for routing, but we want to
# preserve the actual method output as the flow's final result)
if emit:
self._human_feedback_method_output = method_output

return result

wrapper = sync_wrapper
Expand Down
143 changes: 139 additions & 4 deletions lib/crewai/tests/test_human_feedback_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def generate(self):
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_empty_feedback_with_default_outcome(self, mock_print, mock_input):
"""Test empty feedback uses default_outcome."""
"""Test empty feedback uses default_outcome for routing, but flow returns method output."""

class TestFlow(Flow):
@start()
Expand All @@ -264,14 +264,16 @@ def review(self):
with patch.object(flow, "_request_human_feedback", return_value=""):
result = flow.kickoff()

assert result == "needs_work"
# Flow result is the method's return value, NOT the collapsed outcome
assert result == "Content"
assert flow.last_human_feedback is not None
# But the outcome is still correctly set for routing purposes
assert flow.last_human_feedback.outcome == "needs_work"

@patch("builtins.input", return_value="Approved!")
@patch("builtins.print")
def test_feedback_collapsing(self, mock_print, mock_input):
"""Test that feedback is collapsed to an outcome."""
"""Test that feedback is collapsed to an outcome for routing, but flow returns method output."""

class TestFlow(Flow):
@start()
Expand All @@ -291,8 +293,10 @@ def review(self):
):
result = flow.kickoff()

assert result == "approved"
# Flow result is the method's return value, NOT the collapsed outcome
assert result == "Content"
assert flow.last_human_feedback is not None
# But the outcome is still correctly set for routing purposes
assert flow.last_human_feedback.outcome == "approved"


Expand Down Expand Up @@ -591,3 +595,134 @@ def test_method(self):
assert config.learn is True
# llm defaults to "gpt-4o-mini" at the function level
assert config.llm == "gpt-4o-mini"


class TestHumanFeedbackFinalOutputPreservation:
"""Tests for preserving method return value as flow's final output when @human_feedback with emit is terminal.

This addresses the bug where the flow's final output was the collapsed outcome string (e.g., 'approved')
instead of the method's actual return value when a @human_feedback method with emit is the final method.
"""

@patch("builtins.input", return_value="Looks good!")
@patch("builtins.print")
def test_final_output_is_method_return_not_collapsed_outcome(
self, mock_print, mock_input
):
"""When @human_feedback with emit is the final method, flow output is the method's return value."""

class FinalHumanFeedbackFlow(Flow):
@start()
@human_feedback(
message="Review this content:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def generate_and_review(self):
# This dict should be the final output, NOT the string 'approved'
return {"title": "My Article", "content": "Article content here", "status": "ready"}

flow = FinalHumanFeedbackFlow()

with (
patch.object(flow, "_request_human_feedback", return_value="Looks great, approved!"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()

# The final output should be the actual method return value, not the collapsed outcome
assert isinstance(result, dict), f"Expected dict, got {type(result).__name__}: {result}"
assert result == {"title": "My Article", "content": "Article content here", "status": "ready"}
# But the outcome should still be tracked in last_human_feedback
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.outcome == "approved"

@patch("builtins.input", return_value="approved")
@patch("builtins.print")
def test_routing_still_works_with_downstream_listener(self, mock_print, mock_input):
"""When @human_feedback has a downstream listener, routing still triggers the listener."""
publish_called = []

class RoutingFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return {"content": "original content"}

@listen("approved")
def publish(self):
publish_called.append(True)
return {"published": True, "timestamp": "2024-01-01"}

flow = RoutingFlow()

with (
patch.object(flow, "_request_human_feedback", return_value="LGTM"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
result = flow.kickoff()

# The downstream listener should have been triggered
assert len(publish_called) == 1, "publish() should have been called"
# The final output should be from the listener, not the human_feedback method
assert result == {"published": True, "timestamp": "2024-01-01"}

@patch("builtins.input", return_value="")
@patch("builtins.print")
@pytest.mark.asyncio
async def test_async_human_feedback_final_output_preserved(self, mock_print, mock_input):
"""Async @human_feedback methods also preserve the real return value."""

class AsyncFinalFlow(Flow):
@start()
@human_feedback(
message="Review async content:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
default_outcome="approved",
)
async def async_generate(self):
return {"async_data": "value", "computed": 42}

flow = AsyncFinalFlow()

with (
patch.object(flow, "_request_human_feedback", return_value=""),
):
result = await flow.kickoff_async()

# The final output should be the dict, not "approved"
assert isinstance(result, dict), f"Expected dict, got {type(result).__name__}: {result}"
assert result == {"async_data": "value", "computed": 42}
assert flow.last_human_feedback.outcome == "approved"

@patch("builtins.input", return_value="feedback")
@patch("builtins.print")
def test_method_outputs_contains_real_output(self, mock_print, mock_input):
"""The _method_outputs list should contain the real method output, not the collapsed outcome."""

class OutputTrackingFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def generate(self):
return {"data": "real output"}

flow = OutputTrackingFlow()

with (
patch.object(flow, "_request_human_feedback", return_value="approved"),
patch.object(flow, "_collapse_to_outcome", return_value="approved"),
):
flow.kickoff()

# _method_outputs should contain the real output
assert len(flow._method_outputs) == 1
assert flow._method_outputs[0] == {"data": "real output"}
14 changes: 10 additions & 4 deletions lib/crewai/tests/test_human_feedback_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ class TestEdgeCases:
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_empty_feedback_first_outcome_fallback(self, mock_print, mock_input):
"""Test that empty feedback without default uses first outcome."""
"""Test that empty feedback without default uses first outcome for routing, but returns method output."""

class FallbackFlow(Flow):
@start()
Expand All @@ -726,12 +726,15 @@ def review(self):
with patch.object(flow, "_request_human_feedback", return_value=""):
result = flow.kickoff()

assert result == "first" # Falls back to first outcome
# Flow result is the method's return value, NOT the collapsed outcome
assert result == "content"
# But outcome is still set to first for routing purposes
assert flow.last_human_feedback.outcome == "first"

@patch("builtins.input", return_value="whitespace only ")
@patch("builtins.print")
def test_whitespace_only_feedback_treated_as_empty(self, mock_print, mock_input):
"""Test that whitespace-only feedback is treated as empty."""
"""Test that whitespace-only feedback is treated as empty for routing, but returns method output."""

class WhitespaceFlow(Flow):
@start()
Expand All @@ -749,7 +752,10 @@ def review(self):
with patch.object(flow, "_request_human_feedback", return_value=" "):
result = flow.kickoff()

assert result == "reject" # Uses default because feedback is empty after strip
# Flow result is the method's return value, NOT the collapsed outcome
assert result == "content"
# But outcome is set to default because feedback is empty after strip
assert flow.last_human_feedback.outcome == "reject"

@patch("builtins.input", return_value="feedback")
@patch("builtins.print")
Expand Down
Loading