diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..ea63cd5 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,162 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.3.0] - 2024 + +### ⚠️ BREAKING CHANGES + +This release introduces a complete redesign of the event system with a unified, consistent pattern. **This is a breaking change** that requires migration for existing code. + +#### Event System Redesign + +**Old Pattern (v0.2.x)**: +- 19 specific event types (`WorkflowStarted`, `AgentProcessing`, `ToolCallStarted`, etc.) +- Inconsistent naming and lifecycle patterns +- Limited extensibility + +**New Pattern (v0.3.0)**: +- Unified `Scope × Type × Status` model +- 6 event scopes: `Workflow`, `WorkflowStep`, `Agent`, `LlmRequest`, `Tool`, `System` +- 5 standard lifecycle types: `Started`, `Progress`, `Completed`, `Failed`, `Canceled` +- 5 component statuses: `Pending`, `Running`, `Completed`, `Failed`, `Canceled` + +### Added + +- **Event Scopes** (`EventScope` enum): + - `Workflow` - Entire workflow execution + - `WorkflowStep` - Individual workflow steps + - `Agent` - Agent execution + - `LlmRequest` - LLM/ChatClient requests (renamed from "Agent LLM") + - `Tool` - Tool execution + - `System` - System-level events + +- **Unified Event Types** (`EventType` enum): + - `Started` - Component begins execution + - `Progress` - Component reports progress/streaming data + - `Completed` - Component finished successfully + - `Failed` - Component encountered error + - **`Canceled`** - Component was canceled/interrupted (new!) + +- **Component Status Tracking** (`ComponentStatus` enum): + - Explicit status field on all events + - Tracks component state after event + +- **Component ID Validation**: + - Enforced format standards per scope + - `Workflow`: `workflow_name` + - `WorkflowStep`: `workflow_name:step:N` + - `Agent`: `agent_name` + - `LlmRequest`: `agent_name:llm:N` + - `Tool`: `tool_name` + - `System`: `system:subsystem` + +- **Helper Methods** on `EventStream`: + - `agent_started()`, `agent_completed()`, `agent_failed()` + - `llm_started()`, `llm_progress()`, `llm_completed()`, `llm_failed()` + - `tool_started()`, `tool_progress()`, `tool_completed()`, `tool_failed()` + - `workflow_started()`, `workflow_completed()`, `workflow_failed()` + - `step_started()`, `step_completed()`, `step_failed()` + +- **Event Fields**: + - `component_id` - Standardized component identifier + - `scope` - Event scope (which component type) + - `status` - Current component status + - `message` - Optional human-readable message + +### Changed + +- **Event struct**: + - Added: `scope`, `event_type`, `component_id`, `status`, `message` + - Renamed: `event_type` field is now the lifecycle stage, not the specific event + +- **EventStream::append() signature**: + ```rust + // Old + fn append(&self, event_type: EventType, workflow_id: String, data: JsonValue) + + // New + fn append( + &self, + scope: EventScope, + event_type: EventType, + component_id: String, + status: ComponentStatus, + workflow_id: String, + message: Option, + data: JsonValue + ) + ``` + +- **Tool loop detection** now emits `System::Progress` events instead of `AgentToolLoopDetected` + +- Version bumped from `0.1.0` → `0.3.0` (skip 0.2.0 due to breaking nature) + +### Removed + +- **Old EventType variants**: + - `WorkflowStarted`, `WorkflowStepStarted`, `WorkflowStepCompleted`, `WorkflowCompleted`, `WorkflowFailed` + - `AgentInitialized`, `AgentProcessing`, `AgentCompleted`, `AgentFailed` + - `AgentLlmRequestStarted`, `AgentLlmStreamChunk`, `AgentLlmRequestCompleted`, `AgentLlmRequestFailed` + - `ToolCallStarted`, `ToolCallCompleted`, `ToolCallFailed`, `AgentToolLoopDetected` + - `SystemError`, `StateSaved` + +### Migration Guide + +See [docs/MIGRATION_0.2_TO_0.3.md](docs/MIGRATION_0.2_TO_0.3.md) for detailed migration instructions. + +**Quick migration examples**: + +```rust +// Old: Pattern matching on specific event types +match event.event_type { + EventType::WorkflowStarted => { ... } + EventType::ToolCallCompleted => { ... } +} + +// New: Pattern matching on scope + type +match (event.scope, event.event_type) { + (EventScope::Workflow, EventType::Started) => { ... } + (EventScope::Tool, EventType::Completed) => { ... } +} + +// Old: Manual event emission +stream.append( + EventType::ToolCallStarted, + "wf_1", + json!({"tool": "my_tool"}) +); + +// New: Use helper methods +stream.tool_started( + "my_tool", + "wf_1".to_string(), + json!({}) +); +``` + +### Technical Details + +- All events now emit asynchronously via spawned tasks (from v0.2.x) +- Event validation ensures component IDs follow standardized formats +- Helper methods provide type-safe, ergonomic API for common event patterns +- Raw `append()` method still available for custom event scenarios + +### Tests + +- 97 tests passing (63 lib + 7 chat_history + 12 error + 3 integration + 12 load) +- All clippy warnings resolved +- All doctests passing + +--- + +## [0.2.0] - 2024 (Skipped) + +Skipped to avoid confusion with pre-release versions. + +## [0.1.0] - 2024 + +Initial release with basic agent runtime, workflow system, and event streaming. diff --git a/Cargo.lock b/Cargo.lock index f94d579..f42637a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -193,7 +193,7 @@ checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" [[package]] name = "agent-runtime" -version = "0.1.0" +version = "0.3.0" dependencies = [ "actix-web", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 87ad275..7d6b3f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ ] [workspace.package] -version = "0.1.0" +version = "0.3.0" edition = "2021" authors = ["Travis Sharp "] license = "MIT OR Apache-2.0" @@ -19,7 +19,7 @@ all = "warn" # Root package - the main MCP library [package] name = "agent-runtime" -version = "0.1.0" +version = "0.3.0" edition = "2021" authors = ["Travis Sharp "] license = "MIT OR Apache-2.0" diff --git a/README.md b/README.md index 8c4aab6..ad81717 100644 --- a/README.md +++ b/README.md @@ -25,12 +25,12 @@ A production-ready Rust framework for building AI agent workflows with native an - **Nested workflows** - SubWorkflows for complex orchestration - **Mermaid export** - Visualize workflows as diagrams -### 📡 Event System -- **Real-time events** - Complete visibility into execution -- **Fine-grained tracking** - Workflow, agent, LLM, and tool events -- **Streaming chunks** - Live LLM token streaming via events +### 📡 Event System (v0.3.0 - Unified) +- **Unified event model** - Consistent `Scope × Type × Status` pattern across all components +- **Complete lifecycle tracking** - Started → Progress → Completed/Failed for workflows, agents, LLM requests, tools +- **Real-time streaming** - Live LLM token streaming via Progress events - **Multi-subscriber** - Multiple event listeners per workflow -- **Event bubbling** - Events propagate from tools → agents → workflows +- **Type-safe component IDs** - Enforced formats with validation ### ⚙️ Configuration - **YAML and TOML support** - Human-readable config files @@ -39,11 +39,11 @@ A production-ready Rust framework for building AI agent workflows with native an - **Per-agent settings** - System prompts, tools, LLM clients, loop prevention ### 🔒 Production Ready -- **61 comprehensive tests** - All core functionality tested -- **Tool loop prevention** - Prevents LLM from calling same tool repeatedly -- **Microsecond timing** - Precise performance metrics -- **Structured logging** - FileLogger with timestamped output -- **Error handling** - Detailed error types with context +- **97 comprehensive tests** - All core functionality tested +- **Tool loop prevention** - Prevents LLM from calling same tool repeatedly with System::Progress events +- **Microsecond timing** - Precise performance metrics via event data +- **Async event emission** - Non-blocking event streaming with tokio::spawn +- **Error handling** - Detailed error types with context and human-readable messages ## Quick Start @@ -111,23 +111,36 @@ let workflow = Workflow::new("analysis") let result = workflow.execute(initial_input, &mut event_rx).await?; ``` -### Event Streaming +### Event Streaming (v0.3.0) ```rust +use agent_runtime::{EventScope, EventType}; + let (tx, mut rx) = mpsc::channel(100); // Subscribe to events tokio::spawn(async move { while let Some(event) = rx.recv().await { - match event.event_type { - EventType::AgentLlmStreamChunk => { - print!("{}", event.data.get("chunk").unwrap()); + match (event.scope, event.event_type) { + // Stream LLM responses in real-time + (EventScope::LlmRequest, EventType::Progress) => { + if let Some(chunk) = event.data["chunk"].as_str() { + print!("{}", chunk); + } } - EventType::ToolCallCompleted => { - println!("Tool {} returned: {}", - event.data["tool_name"], + // Track tool executions + (EventScope::Tool, EventType::Completed) => { + println!("✓ Tool {} returned: {}", + event.component_id, event.data["result"] ); } + // Handle failures + (_, EventType::Failed) => { + eprintln!("❌ {}: {}", + event.component_id, + event.message.unwrap_or_default() + ); + } _ => {} } } @@ -171,23 +184,42 @@ let config = RuntimeConfig::from_file("agent-runtime.yaml")?; - **`config`** - YAML/TOML configuration loading - **`tool_loop_detection`** - Intelligent duplicate tool call prevention -### Event Types -- **Workflow**: Started, StepStarted, StepCompleted, StepFailed, Completed, Failed -- **Agent**: Started, Completed, Failed, LlmStreamChunk -- **LLM**: RequestSent, ResponseReceived, StreamChunkReceived -- **Tool**: ToolCallStarted, ToolCallCompleted, ToolCallFailed, AgentToolLoopDetected +### Event System (v0.3.0) +**Unified Scope × Type × Status Pattern:** +- **Scopes**: Workflow, WorkflowStep, Agent, LlmRequest, Tool, System +- **Types**: Started, Progress, Completed, Failed, Canceled +- **Status**: Pending, Running, Completed, Failed, Canceled + +**Key Events:** +- `Workflow::Started/Completed/Failed` - Overall workflow execution +- `WorkflowStep::Started/Completed/Failed` - Individual step tracking +- `Agent::Started/Completed/Failed` - Agent processing lifecycle +- `LlmRequest::Started/Progress/Completed/Failed` - Real-time LLM streaming +- `Tool::Started/Progress/Completed/Failed` - Tool execution tracking +- `System::Progress` - Runtime behaviors (e.g., tool loop detection) + +**Component ID Formats:** +- Workflow: `workflow_name` +- WorkflowStep: `workflow:step:N` +- Agent: `agent_name` +- LlmRequest: `agent:llm:N` +- Tool: `tool_name` or `tool_name:N` +- System: `system:subsystem` ### Tool Loop Prevention Prevents LLMs from calling the same tool with identical arguments repeatedly: - **Automatic detection** - Tracks tool calls and arguments using MD5 hashing +- **System events** - Emits `System::Progress` event with `system:tool_loop_detection` component ID - **Configurable messages** - Custom messages with `{tool_name}` and `{previous_result}` placeholders -- **Event emission** - `AgentToolLoopDetected` event for observability - **Enabled by default** - Can be disabled per-agent if needed ## Examples Run any demo: ```bash +# Event System +cargo run --bin async_events_demo # NEW! Async event streaming demo with visible sequence + # Workflows cargo run --bin workflow_demo # 3-agent workflow with LLM cargo run --bin hello_workflow # Simple sequential workflow @@ -212,10 +244,12 @@ cargo run --bin complex_viz # Complex workflow diagram ## Documentation +- **[Event Streaming Guide](docs/EVENT_STREAMING.md)** - Complete event system documentation (v0.3.0) +- **[Migration Guide](docs/MIGRATION_0.2_TO_0.3.md)** - Upgrading from v0.2.x to v0.3.0 +- **[Changelog](CHANGELOG.md)** - Release notes for v0.3.0 - **[Specification](docs/spec.md)** - Complete system design - **[Tool Calling](docs/TOOL_CALLING.md)** - Native tool usage - **[MCP Integration](docs/MCP_INTEGRATION.md)** - External MCP tools -- **[Event Streaming](docs/EVENT_STREAMING.md)** - Event system guide - **[LLM Module](docs/LLM_MODULE.md)** - LLM provider integration - **[Workflow Composition](docs/WORKFLOW_COMPOSITION.md)** - Building workflows - **[Testing](docs/TESTING.md)** - Test suite documentation @@ -223,12 +257,39 @@ cargo run --bin complex_viz # Complex workflow diagram ## Testing ```bash -cargo test # All 61 tests +cargo test # All 97 tests cargo test --lib # Library tests only cargo test agent # Agent tests cargo test tool # Tool tests +cargo test event # Event system tests cargo clippy # Linting +cargo fmt --all # Format code +``` + +## What's New in v0.3.0 + +**🎉 Unified Event System** - Complete rewrite for consistency and extensibility + +- **Breaking Changes**: New event structure with `EventScope`, `ComponentStatus`, unified `EventType` +- **Helper Methods**: 19 ergonomic helper methods for common event patterns +- **Component IDs**: Enforced formats with validation for type safety +- **Async Events**: Non-blocking event emission via `tokio::spawn()` +- **Migration Guide**: See [docs/MIGRATION_0.2_TO_0.3.md](docs/MIGRATION_0.2_TO_0.3.md) + +**Upgrading from v0.2.x?** +```rust +// Old (v0.2.x) +match event.event_type { + EventType::AgentLlmStreamChunk => { ... } +} + +// New (v0.3.0) +match (event.scope, event.event_type) { + (EventScope::LlmRequest, EventType::Progress) => { ... } +} ``` +See [CHANGELOG.md](CHANGELOG.md) for complete details. + ## License Dual-licensed under MIT or Apache-2.0 at your option. diff --git a/benches/agent_benchmarks.rs b/benches/agent_benchmarks.rs index a17fccb..4cac17e 100644 --- a/benches/agent_benchmarks.rs +++ b/benches/agent_benchmarks.rs @@ -1,5 +1,5 @@ use agent_runtime::prelude::*; -use agent_runtime::{Agent, AgentConfig, AgentInput, Event, EventType, NativeTool, ToolRegistry}; +use agent_runtime::{Agent, AgentConfig, AgentInput, Event, NativeTool, ToolRegistry}; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; use serde_json::json; use std::sync::Arc; @@ -185,10 +185,15 @@ fn bench_event_emission(c: &mut Criterion) { // Emit event let event = Event::new( 0, - EventType::AgentProcessing, + agent_runtime::EventScope::Agent, + agent_runtime::EventType::Started, + "bench_agent".to_string(), + agent_runtime::ComponentStatus::Running, "bench_workflow".to_string(), - json!({"agent": "bench_agent", "step": 0}), - ); + None, + json!({"step": 0}), + ) + .unwrap(); let _ = tx.send(black_box(event)); diff --git a/docs/EVENT_STREAMING.md b/docs/EVENT_STREAMING.md index fcebe0f..5d91a51 100644 --- a/docs/EVENT_STREAMING.md +++ b/docs/EVENT_STREAMING.md @@ -1,235 +1,1116 @@ -# Event Streaming & Observability +# Event Streaming & Observability Guide -## What Was Added +**Version:** 0.3.0 +**Last Updated:** 2026-02-16 -### LLM-Specific Events -Added three new event types to track LLM interactions: -- `AgentLlmRequestStarted` - Emitted when agent sends request to LLM -- `AgentLlmRequestCompleted` - Emitted when LLM responds successfully -- `AgentLlmRequestFailed` - Emitted when LLM call fails +## Table of Contents +1. [Overview](#overview) +2. [Event System Architecture](#event-system-architecture) +3. [Core Concepts](#core-concepts) +4. [Event Types Reference](#event-types-reference) +5. [Usage Patterns](#usage-patterns) +6. [Advanced Topics](#advanced-topics) +7. [Best Practices](#best-practices) +8. [Examples](#examples) + +--- + +## Overview + +The agent-runtime event system provides **complete end-to-end observability** for AI agent workflows. Every component (workflows, agents, LLM requests, tools) emits structured lifecycle events that enable: + +- **Real-time monitoring**: Track execution as it happens +- **Streaming responses**: Live LLM token streaming to end users +- **Debugging**: Detailed execution traces with timing +- **Analytics**: Token usage, costs, performance metrics +- **Production observability**: Integration with monitoring systems + +### Key Features + +✅ **Unified event model**: Consistent `Scope × Type × Status` pattern +✅ **Component lifecycle tracking**: Started → Running → Completed/Failed +✅ **Streaming support**: Real-time LLM token streaming +✅ **Non-blocking**: Async event emission via `tokio::spawn()` +✅ **Type-safe**: Enforced component ID formats +✅ **Extensible**: Easy to add new component types + +--- + +## Event System Architecture + +### Unified Event Pattern + +All events follow the **Scope × Type × Status** pattern: -### Execution Context -Created `ExecutionContext` to pass runtime context (like event streams) to steps: -```rust -pub struct ExecutionContext<'a> { - pub event_stream: Option<&'a EventStream>, -} +``` +┌─────────────┐ ┌──────────────┐ ┌──────────────────┐ +│ EventScope │ × │ EventType │ × │ ComponentStatus │ +├─────────────┤ ├──────────────┤ ├──────────────────┤ +│ Workflow │ │ Started │ │ Pending │ +│ WorkflowStep│ │ Progress │ │ Running │ +│ Agent │ │ Completed │ │ Completed │ +│ LlmRequest │ │ Failed │ │ Failed │ +│ Tool │ │ Canceled │ │ Canceled │ +│ System │ │ │ │ │ +└─────────────┘ └──────────────┘ └──────────────────┘ ``` -### Agent Event Emission -Updated `Agent::execute_with_events()` to emit detailed events: -- Agent processing started -- LLM request with full message history -- LLM response with content and usage -- Agent completion or failure +### Event Structure -### Step Trait Enhancement -Added `execute_with_context()` method to Step trait: ```rust -#[async_trait] -pub trait Step { - async fn execute_with_context(&self, input: StepInput, ctx: ExecutionContext<'_>) -> StepResult; - async fn execute(&self, input: StepInput) -> StepResult { - self.execute_with_context(input, ExecutionContext::new()).await - } +pub struct Event { + pub event_id: Uuid, // Unique event identifier + pub scope: EventScope, // Which component type emitted this + pub event_type: EventType, // Lifecycle stage (Started, Progress, etc.) + pub component_id: String, // Specific component instance + pub status: ComponentStatus, // Component's current status + pub message: Option, // Human-readable message (errors, progress) + pub timestamp: DateTime, // When event occurred + pub workflow_id: Option, // Associated workflow + pub parent_workflow_id: Option, // For nested workflows + pub data: JsonValue, // Component-specific data } ``` -### Runtime Event Access -Exposed `event_stream()` method on Runtime for subscribing to events: +### EventStream + +The `EventStream` manages event emission and subscription: + ```rust -let mut receiver = runtime.event_stream().subscribe(); -while let Ok(event) = receiver.recv().await { - // Handle event +pub struct EventStream { + sender: broadcast::Sender, + events: Arc>>, +} + +impl EventStream { + // Subscribe to live events + pub fn subscribe(&self) -> broadcast::Receiver + + // Get historical events + pub async fn get_events(&self, offset: usize) -> Vec + + // Emit event (non-blocking) + pub async fn append(...) -> JoinHandle> } ``` -## Enhanced Workflow Demo +--- + +## Core Concepts + +### Event Scopes + +Six component types emit events: -The workflow demo now includes real-time event monitoring: +| Scope | Purpose | Component ID Format | Example | +|-------|---------|---------------------|---------| +| **Workflow** | Overall workflow execution | `workflow_name` | `"data_pipeline"` | +| **WorkflowStep** | Individual workflow steps | `workflow:step:N` | `"pipeline:step:0"` | +| **Agent** | Agent execution | `agent_name` | `"researcher"` | +| **LlmRequest** | LLM API calls | `agent:llm:N` | `"researcher:llm:0"` | +| **Tool** | Tool execution | `tool_name` or `tool:N` | `"calculator"` | +| **System** | Runtime behaviors | `system:subsystem` | `"system:tool_loop"` | -### Features -- **Live Event Stream**: Separate task subscribes to runtime events -- **Detailed LLM Visibility**: See every message sent to and from LLM -- **Token Usage**: Track token consumption per request -- **Execution Timing**: Monitor step and agent execution times -- **Error Handling**: See detailed error messages when things fail +### Event Types (Lifecycle Stages) -### Output Example +All components share the same lifecycle: + +``` +┌──────────┐ +│ Started │ ──┐ +└──────────┘ │ + ├──> ┌──────────┐ +┌──────────┐ │ │ Progress │ (optional, repeatable) +│ Pending │ ──┘ └──────────┘ +└──────────┘ │ + ├──> ┌───────────┐ + │ │ Completed │ + │ └───────────┘ + │ + ├──> ┌─────────┐ + │ │ Failed │ + │ └─────────┘ + │ + └──> ┌───────────┐ + │ Canceled │ + └───────────┘ ``` -=== Workflow Demo === -✓ LLM client configured (https://192.168.91.57 - insecure) -✓ Created 3 agents: greeter → analyzer → summarizer -✓ Workflow built with 3 sequential steps +- **Started**: Component begins execution (status: `Running`) +- **Progress**: Component reports intermediate progress (optional, status: `Running`) +- **Completed**: Component finishes successfully (status: `Completed`) +- **Failed**: Component encounters error (status: `Failed`) +- **Canceled**: Component execution canceled (status: `Canceled`) -📡 Event Stream Monitor Started -============================================================ +### Component Status -🚀 Workflow Started ------------------------------------------------------------- +Tracks component state across events: -▶ Step Started: greeter - 🤖 Agent 'greeter' processing... - 💬 LLM Request Started (greeter) - [system]: You are a friendly greeter. Say hello and introduce yourself warmly. - [user]: Hello! I'm interested in learning about AI agents. - ✅ LLM Response (greeter) - Hello! Welcome! I'm delighted to meet you... - Tokens: 245 - ⏱ Step Completed: greeter (1234ms) +- **Pending**: Not yet started +- **Running**: Currently executing +- **Completed**: Finished successfully +- **Failed**: Encountered error +- **Canceled**: Execution canceled -▶ Step Started: analyzer - 🤖 Agent 'analyzer' processing... - 💬 LLM Request Started (analyzer) - [system]: You are a thoughtful analyzer... - [user]: {"response": "Hello! Welcome..."} - ✅ LLM Response (analyzer) - This is a warm greeting that demonstrates... - Tokens: 312 - ⏱ Step Completed: analyzer (1567ms) +--- -▶ Step Started: summarizer - 🤖 Agent 'summarizer' processing... - 💬 LLM Request Started (summarizer) - [system]: You are a concise summarizer... - [user]: {"response": "This is a warm greeting..."} - ✅ LLM Response (summarizer) - The conversation consisted of... - Tokens: 89 - ⏱ Step Completed: summarizer (876ms) +## Event Types Reference -============================================================ -✅ Workflow Completed +### Workflow Events -============================================================ +```rust +use agent_runtime::prelude::*; -📊 Final Results +// Workflow execution begins +Event::workflow_started( + "data_pipeline", // component_id + Some("wf_123".into()), // workflow_id + None, // parent_workflow_id + json!({"input": "data.csv"}) +); -Output: -{ - "response": "The conversation consisted of..." -} +// Workflow completes +Event::workflow_completed( + "data_pipeline", + Some("wf_123".into()), + None, + json!({"processed_rows": 1000}) +); -Steps executed: 3 - 1. greeter (agent) - 1234ms - 2. analyzer (agent) - 1567ms - 3. summarizer (agent) - 876ms +// Workflow fails +Event::workflow_failed( + "data_pipeline", + "Database connection timeout".to_string(), // error message + Some("wf_123".into()), + None, + json!({"error_code": "DB_TIMEOUT"}) +); ``` -## Benefits - -### 1. **Complete Observability** -- See exactly what's happening inside agents -- Monitor LLM calls in real-time -- Track token usage per request +**Event Data Fields**: +- `input`: Workflow input (Started) +- `output`: Workflow result (Completed) +- `error`: Error details (Failed) -### 2. **Better Debugging** -- Detailed error messages with context -- Full conversation history visible -- Execution timing for performance tuning +### WorkflowStep Events -### 3. **Production Ready** -- Events can be streamed to monitoring systems -- Event history preserved for replay -- Offset-based replay for reconnection +```rust +// Step starts +Event::workflow_step_started( + "pipeline:step:0", + Some("wf_123".into()), + None, + json!({"step_name": "data_loader", "step_type": "agent"}) +); -### 4. **Streaming Potential** -- Foundation for HTTP/WebSocket streaming -- Can add event filtering by type -- Can add event persistence/storage +// Step completes +Event::workflow_step_completed( + "pipeline:step:0", + Some("wf_123".into()), + None, + json!({ + "step_name": "data_loader", + "output": {"rows_loaded": 500}, + "duration_ms": 1234 + }) +); -## Event Types Reference +// Step fails +Event::workflow_step_failed( + "pipeline:step:0", + "Agent execution timeout".to_string(), + Some("wf_123".into()), + None, + json!({"step_name": "data_loader", "timeout_ms": 30000}) +); +``` -### Workflow Events -- `WorkflowStarted` - Workflow execution begins -- `WorkflowStepStarted` - Step starts executing -- `WorkflowStepCompleted` - Step finishes successfully -- `WorkflowCompleted` - Workflow finishes successfully -- `WorkflowFailed` - Workflow encounters error +**Event Data Fields**: +- `step_name`: Step identifier +- `step_type`: "agent" | "transform" | "conditional" | "subworkflow" +- `input`: Step input (Started) +- `output`: Step result (Completed) +- `duration_ms`: Execution time (Completed) ### Agent Events -- `AgentProcessing` - Agent begins processing -- `AgentCompleted` - Agent finishes successfully -- `AgentFailed` - Agent encounters error -### LLM Events (New!) -- `AgentLlmRequestStarted` - LLM request sent (includes messages) -- `AgentLlmRequestCompleted` - LLM responds (includes response & usage) -- `AgentLlmRequestFailed` - LLM call fails (includes error) +```rust +// Agent starts processing +Event::agent_started( + "researcher", + Some("wf_123".into()), + None, + json!({"input": "Analyze Q4 sales data"}) +); + +// Agent reports progress (rare, usually via LlmRequest::Progress) +Event::agent_progress( + "researcher", + Some("Processing iteration 2 of 5".into()), + Some("wf_123".into()), + None, + json!({"iteration": 2, "max_iterations": 5}) +); + +// Agent completes +Event::agent_completed( + "researcher", + Some("wf_123".into()), + None, + json!({ + "output": "Q4 sales increased 15%...", + "llm_calls": 3, + "tool_calls": 5, + "total_tokens": 1240 + }) +); + +// Agent fails +Event::agent_failed( + "researcher", + "Max iterations exceeded".to_string(), + Some("wf_123".into()), + None, + json!({"iterations": 10, "max_allowed": 10}) +); +``` + +**Event Data Fields**: +- `input`: Agent input (Started) +- `output`: Agent result (Completed) +- `iteration`: Current iteration (Progress) +- `llm_calls`: Number of LLM requests (Completed) +- `tool_calls`: Number of tool executions (Completed) +- `total_tokens`: Cumulative token usage (Completed) + +### LlmRequest Events + +**These events enable real-time LLM streaming!** + +```rust +// LLM request sent +Event::llm_started( + "researcher:llm:0", + Some("wf_123".into()), + None, + json!({ + "messages": [ + {"role": "system", "content": "You are a researcher..."}, + {"role": "user", "content": "Analyze sales data"} + ], + "model": "gpt-4", + "stream": true + }) +); + +// LLM streaming chunks (emitted for each token/batch) +Event::llm_progress( + "researcher:llm:0", + Some("Q4".into()), // The streamed chunk + Some("wf_123".into()), + None, + json!({"chunk": "Q4"}) +); + +// LLM request completes +Event::llm_completed( + "researcher:llm:0", + Some("wf_123".into()), + None, + json!({ + "response": "Q4 sales increased 15% compared to Q3...", + "usage": { + "prompt_tokens": 120, + "completion_tokens": 85, + "total_tokens": 205 + }, + "finish_reason": "stop", + "tool_calls": [] // If LLM requested tool calls + }) +); + +// LLM request fails +Event::llm_failed( + "researcher:llm:0", + "API rate limit exceeded".to_string(), + Some("wf_123".into()), + None, + json!({"retry_after": 60, "error_code": "rate_limit_exceeded"}) +); +``` + +**Event Data Fields**: +- `messages`: Chat history sent to LLM (Started) +- `model`: LLM model name (Started) +- `stream`: Whether streaming enabled (Started) +- `chunk`: Token/text chunk (Progress) +- `response`: Complete LLM response (Completed) +- `usage`: Token usage stats (Completed) +- `tool_calls`: Tools requested by LLM (Completed) ### Tool Events -- `ToolCallStarted` - Tool execution starts -- `ToolCallCompleted` - Tool execution completes -- `ToolCallFailed` - Tool execution fails -## Usage +```rust +// Tool execution starts +Event::tool_started( + "calculator", + Some("wf_123".into()), + None, + json!({ + "tool_name": "calculator", + "arguments": {"operation": "multiply", "a": 42, "b": 137} + }) +); + +// Tool reports progress (for long-running tools) +Event::tool_progress( + "web_scraper:0", + Some("Fetched page 3 of 10".into()), + Some("wf_123".into()), + None, + json!({"pages_fetched": 3, "total_pages": 10}) +); + +// Tool completes +Event::tool_completed( + "calculator", + Some("wf_123".into()), + None, + json!({ + "tool_name": "calculator", + "result": 5754, + "duration_ms": 12 + }) +); + +// Tool fails +Event::tool_failed( + "calculator", + "Division by zero".to_string(), + Some("wf_123".into()), + None, + json!({"tool_name": "calculator", "error_code": "DIV_BY_ZERO"}) +); +``` + +**Event Data Fields**: +- `tool_name`: Tool identifier +- `arguments`: Tool input parameters (Started) +- `result`: Tool output (Completed) +- `duration_ms`: Execution time (Completed) + +### System Events + +Used for runtime behaviors and diagnostics: -### Subscribe to Events ```rust -let runtime = Runtime::new(); -let mut receiver = runtime.event_stream().subscribe(); +// Tool loop detection +Event::system_progress( + "system:tool_loop_detection", + Some("Tool 'calculator' called with identical arguments".into()), + Some("wf_123".into()), + None, + json!({ + "tool_name": "calculator", + "arguments": {"a": 5, "b": 3}, + "previous_result": 8, + "call_count": 2 + }) +); -// In a separate task +// Custom system events +Event::system_progress( + "system:rate_limiter", + Some("Rate limit approaching: 90% of quota used".into()), + Some("wf_123".into()), + None, + json!({"requests_used": 900, "requests_limit": 1000}) +); +``` + +--- + +## Usage Patterns + +### Pattern 1: Basic Event Monitoring + +```rust +use agent_runtime::prelude::*; +use tokio::sync::mpsc; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create event stream + let (tx, mut rx) = mpsc::channel(100); + + // Spawn monitor task + tokio::spawn(async move { + while let Some(event) = rx.recv().await { + println!("[{}] {} - {} ({})", + event.timestamp.format("%H:%M:%S"), + event.scope, + event.event_type, + event.component_id + ); + } + }); + + // Run workflow with events + let workflow = Workflow::new("example") + .add_step(AgentStep::new(agent)) + .build(); + + workflow.execute(input, &mut rx).await?; + Ok(()) +} +``` + +### Pattern 2: Filtering Specific Events + +```rust tokio::spawn(async move { - while let Ok(event) = receiver.recv().await { - println!("{:?}", event); + while let Some(event) = rx.recv().await { + match (event.scope, event.event_type) { + // Track all failures + (_, EventType::Failed) => { + eprintln!("❌ {} failed: {}", + event.component_id, + event.message.unwrap_or_default() + ); + } + + // Track LLM streaming + (EventScope::LlmRequest, EventType::Progress) => { + if let Some(chunk) = event.data["chunk"].as_str() { + print!("{}", chunk); + io::stdout().flush().unwrap(); + } + } + + // Track tool executions + (EventScope::Tool, EventType::Completed) => { + let result = &event.data["result"]; + println!("🔧 {} → {}", event.component_id, result); + } + + _ => {} + } } }); ``` -### Filter Events by Type +### Pattern 3: Real-time LLM Streaming + ```rust -while let Ok(event) = receiver.recv().await { +// Stream LLM responses to end user +tokio::spawn(async move { + let mut current_response = String::new(); + + while let Some(event) = rx.recv().await { + match (event.scope, event.event_type) { + (EventScope::LlmRequest, EventType::Started) => { + println!("\n🤖 Assistant is thinking...\n"); + current_response.clear(); + } + + (EventScope::LlmRequest, EventType::Progress) => { + if let Some(chunk) = event.data["chunk"].as_str() { + print!("{}", chunk); + io::stdout().flush().unwrap(); + current_response.push_str(chunk); + } + } + + (EventScope::LlmRequest, EventType::Completed) => { + println!("\n\n✅ Complete ({} tokens)", + event.data["usage"]["total_tokens"] + ); + } + + _ => {} + } + } +}); +``` + +### Pattern 4: Component Status Tracking + +```rust +use std::collections::HashMap; + +let mut statuses: HashMap = HashMap::new(); +let mut timings: HashMap = HashMap::new(); + +while let Some(event) = rx.recv().await { + // Update status + statuses.insert(event.component_id.clone(), event.status); + + // Track timing match event.event_type { - EventType::AgentLlmRequestCompleted => { - // Handle LLM responses + EventType::Started => { + timings.insert(event.component_id.clone(), Instant::now()); } - EventType::WorkflowFailed => { - // Handle errors + EventType::Completed | EventType::Failed => { + if let Some(start) = timings.remove(&event.component_id) { + let duration = start.elapsed(); + println!("{} took {:?}", event.component_id, duration); + } } _ => {} } + + // Show progress + let running: Vec<_> = statuses.iter() + .filter(|(_, s)| **s == ComponentStatus::Running) + .map(|(id, _)| id) + .collect(); + + if !running.is_empty() { + println!("Running: {:?}", running); + } } ``` -### Access Event Data +### Pattern 5: Token Usage Tracking + ```rust -if let Some(response) = event.data.get("response").and_then(|v| v.as_str()) { - println!("LLM said: {}", response); +let mut total_tokens = 0; +let mut total_cost = 0.0; + +// Pricing: $0.03 per 1K input tokens, $0.06 per 1K output tokens +const INPUT_COST: f64 = 0.03 / 1000.0; +const OUTPUT_COST: f64 = 0.06 / 1000.0; + +while let Some(event) = rx.recv().await { + if let (EventScope::LlmRequest, EventType::Completed) = (event.scope, event.event_type) { + let usage = &event.data["usage"]; + let prompt_tokens = usage["prompt_tokens"].as_u64().unwrap_or(0); + let completion_tokens = usage["completion_tokens"].as_u64().unwrap_or(0); + + total_tokens += prompt_tokens + completion_tokens; + total_cost += (prompt_tokens as f64 * INPUT_COST) + + (completion_tokens as f64 * OUTPUT_COST); + + println!("💰 Request cost: ${:.4} ({} tokens)", + (prompt_tokens as f64 * INPUT_COST) + (completion_tokens as f64 * OUTPUT_COST), + prompt_tokens + completion_tokens + ); + } +} + +println!("\n📊 Total: {} tokens, ${:.2}", total_tokens, total_cost); +``` + +### Pattern 6: Error Handling & Retries + +```rust +let mut retry_queue: Vec = Vec::new(); + +while let Some(event) = rx.recv().await { + match (event.scope, event.event_type) { + (EventScope::LlmRequest, EventType::Failed) => { + let error = event.message.as_ref().unwrap(); + + if error.contains("rate_limit") { + println!("⏸ Rate limited, will retry in 60s"); + retry_queue.push(event); + } else if error.contains("timeout") { + println!("⚠ Timeout, retrying with longer timeout"); + retry_queue.push(event); + } else { + eprintln!("❌ Permanent failure: {}", error); + } + } + + (EventScope::Tool, EventType::Failed) => { + println!("🔧 Tool {} failed: {}", + event.component_id, + event.message.unwrap_or_default() + ); + // Tools usually shouldn't be retried + } + + _ => {} + } } ``` -## Next Steps +--- + +## Advanced Topics + +### Event Stream Replay + +Retrieve historical events for replay/debugging: -### HTTP Event Streaming Endpoint -Add actix-web route to stream events: ```rust -#[get("/workflows/{id}/stream")] -async fn stream_events(runtime: Data) -> HttpResponse { - let stream = runtime.event_stream().subscribe(); - // Convert to SSE or chunked response +let runtime = Runtime::new(); + +// Get all events from start +let all_events = runtime.event_stream().get_events(0).await; + +// Get events from offset (for reconnection) +let recent_events = runtime.event_stream().get_events(100).await; + +// Replay events +for event in all_events { + println!("{:?}", event); +} +``` + +### Multi-Subscriber Pattern + +Multiple components can subscribe to the same event stream: + +```rust +let runtime = Runtime::new(); + +// Subscriber 1: UI updates +let mut ui_rx = runtime.event_stream().subscribe(); +tokio::spawn(async move { + while let Ok(event) = ui_rx.recv().await { + update_ui(event); + } +}); + +// Subscriber 2: Logging +let mut log_rx = runtime.event_stream().subscribe(); +tokio::spawn(async move { + while let Ok(event) = log_rx.recv().await { + log_event(event); + } +}); + +// Subscriber 3: Metrics +let mut metrics_rx = runtime.event_stream().subscribe(); +tokio::spawn(async move { + while let Ok(event) = metrics_rx.recv().await { + record_metrics(event); + } +}); +``` + +### Custom Event Data + +Add custom fields to event data: + +```rust +Event::agent_completed( + "researcher", + Some("wf_123".into()), + None, + json!({ + // Standard fields + "output": "Analysis complete", + "total_tokens": 500, + + // Custom fields + "custom_metric": 42, + "tags": ["production", "high-priority"], + "user_id": "user_123", + "request_id": "req_abc" + }) +); +``` + +### Nested Workflows + +Track parent-child workflow relationships: + +```rust +// Parent workflow +Event::workflow_started( + "parent_workflow", + Some("wf_parent".into()), + None, // no parent + json!({}) +); + +// Child workflow (subworkflow) +Event::workflow_started( + "child_workflow", + Some("wf_child".into()), + Some("wf_parent".into()), // parent_workflow_id + json!({"parent": "wf_parent"}) +); + +// Filter by parent +while let Some(event) = rx.recv().await { + if event.parent_workflow_id == Some("wf_parent".to_string()) { + println!("Child workflow event: {:?}", event); + } +} +``` + +--- + +## Best Practices + +### 1. Event Monitoring + +✅ **DO**: Use separate async tasks for event monitoring +✅ **DO**: Handle `Err` from broadcast receiver (indicates lag/overflow) +✅ **DO**: Use pattern matching on `(scope, type)` tuples +❌ **DON'T**: Block agent execution waiting for events +❌ **DON'T**: Store unbounded event history in memory + +```rust +// ✅ Good: Separate task +tokio::spawn(async move { + while let Ok(event) = rx.recv().await { + // Process event + } +}); + +// ❌ Bad: Blocking main thread +for event in rx.recv().await { + // Blocks workflow execution +} +``` + +### 2. Error Handling + +✅ **DO**: Always check `message` field for Failed events +✅ **DO**: Log error details from `event.data` +✅ **DO**: Track error rates and patterns +❌ **DON'T**: Ignore Failed events silently + +```rust +// ✅ Good +match (event.scope, event.event_type) { + (_, EventType::Failed) => { + let error_msg = event.message.unwrap_or_else(|| "Unknown error".into()); + let error_data = &event.data; + eprintln!("[{}] {} failed: {}\nData: {}", + event.component_id, event.scope, error_msg, error_data); + } + _ => {} +} +``` + +### 3. Streaming Performance + +✅ **DO**: Batch LLM chunks for UI updates (Future: automatic batching in v0.4.0) +✅ **DO**: Use buffered channels (e.g., `mpsc::channel(100)`) +✅ **DO**: Flush output after streaming chunks +❌ **DON'T**: Send each token individually to network (causes flicker) + +```rust +// ✅ Good: Batch chunks (manual for now) +let mut buffer = String::new(); +let mut last_flush = Instant::now(); + +while let Some(event) = rx.recv().await { + if let (EventScope::LlmRequest, EventType::Progress) = (event.scope, event.event_type) { + buffer.push_str(event.data["chunk"].as_str().unwrap()); + + // Flush every 50ms + if last_flush.elapsed() > Duration::from_millis(50) { + print!("{}", buffer); + io::stdout().flush().unwrap(); + buffer.clear(); + last_flush = Instant::now(); + } + } +} +``` + +### 4. Component IDs + +✅ **DO**: Follow enforced formats for each scope +✅ **DO**: Use descriptive, unique component IDs +✅ **DO**: Include context in IDs (e.g., `agent:llm:0` not just `llm`) +❌ **DON'T**: Use empty or generic IDs like `"agent"` for everything + +```rust +// ✅ Good +Event::tool_started("web_scraper:batch_1", ...); +Event::llm_started("researcher:llm:2", ...); + +// ❌ Bad +Event::tool_started("tool", ...); // Not descriptive +Event::llm_started("llm", ...); // Missing agent context +``` + +### 5. Message Fields + +✅ **DO**: Provide human-readable messages for Progress/Failed events +✅ **DO**: Keep messages concise but informative +❌ **DON'T**: Put large data in message field (use `data` instead) + +```rust +// ✅ Good +Event::tool_progress( + "data_loader", + Some("Loaded 500/1000 rows".into()), + ... +); + +// ❌ Bad +Event::tool_progress( + "data_loader", + Some(format!("{:?}", all_loaded_data)), // Too large! + ... +); +``` + +--- + +## Examples + +### Example 1: Complete Workflow Monitor + +```rust +use agent_runtime::prelude::*; +use std::io::{self, Write}; +use std::time::Instant; + +async fn monitor_workflow() { + let (tx, mut rx) = mpsc::channel(100); + let start_time = Instant::now(); + + tokio::spawn(async move { + while let Some(event) = rx.recv().await { + let elapsed = start_time.elapsed().as_secs_f64(); + + match (event.scope, event.event_type) { + (EventScope::Workflow, EventType::Started) => { + println!("\n🚀 [{:.2}s] Workflow '{}' started", + elapsed, event.component_id); + } + + (EventScope::WorkflowStep, EventType::Started) => { + println!(" ▶ [{:.2}s] Step '{}'", + elapsed, event.component_id); + } + + (EventScope::Agent, EventType::Started) => { + println!(" 🤖 [{:.2}s] Agent '{}' processing", + elapsed, event.component_id); + } + + (EventScope::LlmRequest, EventType::Started) => { + println!(" 💬 [{:.2}s] LLM request", elapsed); + } + + (EventScope::LlmRequest, EventType::Progress) => { + if let Some(chunk) = event.data["chunk"].as_str() { + print!("{}", chunk); + io::stdout().flush().unwrap(); + } + } + + (EventScope::LlmRequest, EventType::Completed) => { + let tokens = event.data["usage"]["total_tokens"] + .as_u64().unwrap_or(0); + println!("\n ✅ [{:.2}s] LLM complete ({} tokens)", + elapsed, tokens); + } + + (EventScope::Tool, EventType::Started) => { + let args = &event.data["arguments"]; + println!(" 🔧 [{:.2}s] Tool '{}' args: {}", + elapsed, event.component_id, args); + } + + (EventScope::Tool, EventType::Completed) => { + let result = &event.data["result"]; + println!(" ✓ [{:.2}s] Tool result: {}", + elapsed, result); + } + + (EventScope::WorkflowStep, EventType::Completed) => { + let duration = event.data["duration_ms"] + .as_u64().unwrap_or(0); + println!(" ✅ [{:.2}s] Step complete ({}ms)", + elapsed, duration); + } + + (EventScope::Workflow, EventType::Completed) => { + println!("\n✅ [{:.2}s] Workflow complete\n", elapsed); + } + + (_, EventType::Failed) => { + let msg = event.message.unwrap_or_default(); + eprintln!("\n❌ [{:.2}s] {} failed: {}", + elapsed, event.component_id, msg); + } + + _ => {} + } + } + }); + + // Run workflow... +} +``` + +### Example 2: Production Metrics Collection + +```rust +use std::collections::HashMap; + +struct Metrics { + llm_calls: usize, + total_tokens: u64, + tool_calls: HashMap, + errors: Vec, + execution_times: HashMap, +} + +async fn collect_metrics(mut rx: mpsc::Receiver) -> Metrics { + let mut metrics = Metrics { + llm_calls: 0, + total_tokens: 0, + tool_calls: HashMap::new(), + errors: Vec::new(), + execution_times: HashMap::new(), + }; + + let mut start_times: HashMap = HashMap::new(); + + while let Some(event) = rx.recv().await { + match (event.scope, event.event_type) { + (EventScope::LlmRequest, EventType::Completed) => { + metrics.llm_calls += 1; + if let Some(tokens) = event.data["usage"]["total_tokens"].as_u64() { + metrics.total_tokens += tokens; + } + } + + (EventScope::Tool, EventType::Started) => { + let tool = event.component_id.clone(); + *metrics.tool_calls.entry(tool.clone()).or_insert(0) += 1; + start_times.insert(tool, Instant::now()); + } + + (EventScope::Tool, EventType::Completed) => { + if let Some(start) = start_times.remove(&event.component_id) { + let duration = start.elapsed().as_millis(); + metrics.execution_times.insert(event.component_id.clone(), duration); + } + } + + (_, EventType::Failed) => { + let error = format!("{}: {}", + event.component_id, + event.message.unwrap_or_default() + ); + metrics.errors.push(error); + } + + _ => {} + } + } + + metrics +} +``` + +### Example 3: WebSocket Event Streaming + +```rust +// For actix-web or similar +use actix_web::{web, HttpRequest, HttpResponse, Error}; +use actix_web_actors::ws; + +async fn ws_events( + req: HttpRequest, + stream: web::Payload, + runtime: web::Data, +) -> Result { + let mut rx = runtime.event_stream().subscribe(); + + ws::start( + EventWebSocket { rx }, + &req, + stream, + ) +} + +struct EventWebSocket { + rx: broadcast::Receiver, +} + +impl Actor for EventWebSocket { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + let rx = self.rx.resubscribe(); + + ctx.spawn( + async move { + while let Ok(event) = rx.recv().await { + // Send event to client + let json = serde_json::to_string(&event).unwrap(); + ctx.text(json); + } + } + .into_actor(self), + ); + } +} +``` + +--- + +## Future Enhancements (v0.4.0+) + +### Automatic Streaming Batching + +Future versions will automatically batch high-frequency events: + +- **LLM chunks**: Batched over 50ms windows +- **Tool progress**: Batched over 100ms windows +- Reduces network overhead for real-time UIs + +### Tool Progress Callbacks + +Tools will support streaming progress: + +```rust +// Future API +#[async_trait] +impl Tool for LongRunningTool { + async fn execute_with_progress( + &self, + args: JsonValue, + progress: ProgressCallback, + ) -> Result { + for i in 0..100 { + process_item(i); + progress(format!("Processed {}/100", i), Some(i)); + } + Ok(json!({"status": "done"})) + } } ``` ### Event Persistence -Store events in database for: -- Historical analysis -- Debugging past runs -- Audit trail -- Cost tracking -### Event Filtering -Add subscription filters: +Planned support for persisting events to storage: + +- Database integration (Postgres, SQLite) +- S3/object storage for archival +- Event replay from storage + +### Advanced Filtering + +Subscription-time event filtering: + ```rust +// Future API runtime.event_stream() .subscribe() - .filter(|e| matches!(e.event_type, EventType::AgentLlm*)) + .filter(EventScope::LlmRequest) + .filter_status(ComponentStatus::Failed) ``` -### Metrics & Analytics -Aggregate events for: -- Token usage per workflow/agent -- Average execution times -- Error rates -- Cost calculations +--- + +## See Also + +- [MIGRATION_0.2_TO_0.3.md](MIGRATION_0.2_TO_0.3.md) - Upgrade guide from v0.2.x +- [CHANGELOG.md](../CHANGELOG.md) - Release notes for v0.3.0 +- [docs/vnext/ASYNC_EVENTS.md](vnext/ASYNC_EVENTS.md) - Future async architecture +- Examples: `src/bin/workflow_demo.rs`, `src/bin/multi_subscriber.rs` diff --git a/docs/MIGRATION_0.2_TO_0.3.md b/docs/MIGRATION_0.2_TO_0.3.md new file mode 100644 index 0000000..ae78b03 --- /dev/null +++ b/docs/MIGRATION_0.2_TO_0.3.md @@ -0,0 +1,735 @@ +# Migration Guide: v0.2.x → v0.3.0 + +This guide helps you migrate from agent-runtime v0.2.x to v0.3.0, which introduces a **unified event system** with breaking changes to the event API. + +## Overview of Changes + +### What Changed +- **Event structure redesigned**: Introduced `EventScope`, `EventType`, and `ComponentStatus` for unified lifecycle tracking +- **19 old event types replaced**: All old specific event types (e.g., `AgentLlmRequestStarted`, `WorkflowStepCompleted`) consolidated into 5 lifecycle events +- **Component ID format enforcement**: Standardized component identification across all event scopes +- **Helper methods added**: Ergonomic API for common event patterns +- **Pattern matching updates**: Event handling now uses `(scope, type)` tuple matching + +### Why This Change? +The new unified event system provides: +- **Consistency**: All components (workflows, agents, tools) share the same lifecycle events +- **Extensibility**: Easy to add new component types without proliferating event variants +- **Observability**: Complete visibility into execution with predictable event patterns +- **Type safety**: Structured component IDs with validation + +--- + +## Breaking Changes + +### 1. Event Structure + +#### Before (v0.2.x) +```rust +pub struct Event { + pub event_id: Uuid, + pub event_type: EventType, // 19 different variants + pub timestamp: DateTime, + pub agent_name: Option, + pub workflow_id: Option, + pub parent_workflow_id: Option, + pub data: JsonValue, +} +``` + +#### After (v0.3.0) +```rust +pub struct Event { + pub event_id: Uuid, + pub scope: EventScope, // NEW: Which component emitted + pub event_type: EventType, // CHANGED: Only 5 lifecycle types + pub component_id: String, // NEW: Identifies specific component + pub status: ComponentStatus, // NEW: Component's current status + pub message: Option, // NEW: Optional human-readable message + pub timestamp: DateTime, + pub workflow_id: Option, + pub parent_workflow_id: Option, + pub data: JsonValue, +} +``` + +**Migration Action**: Update all code that accesses event fields. + +--- + +### 2. EventType Enum + +#### Before (v0.2.x) +```rust +pub enum EventType { + // Workflow events (6) + WorkflowStarted, + WorkflowStepStarted, + WorkflowStepCompleted, + WorkflowStepFailed, + WorkflowCompleted, + WorkflowFailed, + + // Agent events (7) + AgentProcessing, + AgentCompleted, + AgentFailed, + AgentLlmRequestStarted, + AgentLlmRequestCompleted, + AgentLlmRequestFailed, + AgentLlmStreamChunk, + + // Tool events (4) + ToolCallStarted, + ToolCallCompleted, + ToolCallFailed, + AgentToolLoopDetected, + + // Other (2) + TransformStepCompleted, + CustomEvent, +} +``` + +#### After (v0.3.0) +```rust +pub enum EventScope { + Workflow, + WorkflowStep, + Agent, + LlmRequest, + Tool, + System, +} + +pub enum EventType { + Started, + Progress, + Completed, + Failed, + Canceled, +} + +pub enum ComponentStatus { + Pending, + Running, + Completed, + Failed, + Canceled, +} +``` + +**Migration Action**: Replace specific event types with `(scope, type)` combinations. + +--- + +### 3. Event Matching Patterns + +#### Before (v0.2.x) +```rust +match event.event_type { + EventType::WorkflowStarted => { + println!("Workflow started: {}", event.workflow_id.unwrap()); + } + EventType::AgentLlmStreamChunk => { + let chunk = event.data.get("chunk").unwrap(); + print!("{}", chunk); + } + EventType::ToolCallCompleted => { + println!("Tool completed: {}", event.data["tool_name"]); + } + _ => {} +} +``` + +#### After (v0.3.0) +```rust +match (event.scope, event.event_type) { + (EventScope::Workflow, EventType::Started) => { + println!("Workflow started: {} ({})", + event.component_id, event.workflow_id.unwrap()); + } + (EventScope::LlmRequest, EventType::Progress) => { + let chunk = event.data.get("chunk").unwrap(); + print!("{}", chunk); + } + (EventScope::Tool, EventType::Completed) => { + println!("Tool completed: {}", event.component_id); + } + _ => {} +} +``` + +**Migration Action**: Update all event pattern matching to use `(scope, type)` tuples. + +--- + +### 4. Event Creation + +#### Before (v0.2.x) +```rust +// Manual event creation (rare) +let event = Event { + event_id: Uuid::new_v4(), + event_type: EventType::ToolCallStarted, + timestamp: Utc::now(), + agent_name: Some("my_agent".to_string()), + workflow_id: None, + parent_workflow_id: None, + data: json!({ + "tool_name": "calculator", + "arguments": {"a": 5, "b": 3} + }), +}; +``` + +#### After (v0.3.0) +```rust +// Use helper methods (recommended) +let event = Event::tool_started( + "calculator", // component_id + None, // workflow_id + None, // parent_workflow_id + json!({"a": 5, "b": 3}) // data +); + +// Or raw construction (advanced) +let event = Event::new( + EventScope::Tool, + EventType::Started, + "calculator", + ComponentStatus::Running, + None, // message + None, // workflow_id + None, // parent_workflow_id + json!({"a": 5, "b": 3}), +); +``` + +**Migration Action**: Use helper methods for event creation. + +--- + +### 5. EventStream::append() Signature + +#### Before (v0.2.x) +```rust +pub async fn append( + &self, + event_type: EventType, + workflow_id: Option, + data: JsonValue, +) -> Result +``` + +#### After (v0.3.0) +```rust +pub async fn append( + &self, + scope: EventScope, + event_type: EventType, + component_id: String, + status: ComponentStatus, + message: Option, + workflow_id: Option, + parent_workflow_id: Option, + data: JsonValue, +) -> JoinHandle> +``` + +**Migration Actions**: +1. Add new required parameters (`scope`, `component_id`, `status`, `message`, `parent_workflow_id`) +2. Handle returned `JoinHandle` (or ignore if fire-and-forget) +3. **Recommended**: Use helper methods instead of raw `append()` + +--- + +## Event Type Migration Table + +| Old Event (v0.2.x) | New Pattern (v0.3.0) | Helper Method | +|--------------------|----------------------|---------------| +| `WorkflowStarted` | `(Workflow, Started)` | `Event::workflow_started()` | +| `WorkflowStepStarted` | `(WorkflowStep, Started)` | `Event::workflow_step_started()` | +| `WorkflowStepCompleted` | `(WorkflowStep, Completed)` | `Event::workflow_step_completed()` | +| `WorkflowStepFailed` | `(WorkflowStep, Failed)` | `Event::workflow_step_failed()` | +| `WorkflowCompleted` | `(Workflow, Completed)` | `Event::workflow_completed()` | +| `WorkflowFailed` | `(Workflow, Failed)` | `Event::workflow_failed()` | +| `AgentProcessing` | `(Agent, Started)` | `Event::agent_started()` | +| `AgentCompleted` | `(Agent, Completed)` | `Event::agent_completed()` | +| `AgentFailed` | `(Agent, Failed)` | `Event::agent_failed()` | +| `AgentLlmRequestStarted` | `(LlmRequest, Started)` | `Event::llm_started()` | +| `AgentLlmStreamChunk` | `(LlmRequest, Progress)` | `Event::llm_progress()` | +| `AgentLlmRequestCompleted` | `(LlmRequest, Completed)` | `Event::llm_completed()` | +| `AgentLlmRequestFailed` | `(LlmRequest, Failed)` | `Event::llm_failed()` | +| `ToolCallStarted` | `(Tool, Started)` | `Event::tool_started()` | +| `ToolCallCompleted` | `(Tool, Completed)` | `Event::tool_completed()` | +| `ToolCallFailed` | `(Tool, Failed)` | `Event::tool_failed()` | +| `AgentToolLoopDetected` | `(System, Progress)` | `Event::system_progress()` | +| `TransformStepCompleted` | `(WorkflowStep, Completed)` | `Event::workflow_step_completed()` | +| `CustomEvent` | Any scope + type combo | `Event::new()` | + +--- + +## Component ID Formats + +Component IDs now follow enforced formats validated at event creation: + +| Scope | Format | Examples | +|-------|--------|----------| +| **Workflow** | `workflow_name` | `"analysis"`, `"data_pipeline"` | +| **WorkflowStep** | `workflow:step:N` | `"analysis:step:0"`, `"pipeline:step:2"` | +| **Agent** | `agent_name` | `"researcher"`, `"summarizer"` | +| **LlmRequest** | `agent:llm:N` | `"researcher:llm:0"`, `"summarizer:llm:3"` | +| **Tool** | `tool_name` or `tool_name:N` | `"calculator"`, `"web_search:2"` | +| **System** | `system:subsystem` | `"system:tool_loop_detection"` | + +**Invalid component IDs will be rejected** with a detailed error message. + +--- + +## Step-by-Step Migration + +### Step 1: Update Dependencies +```toml +[dependencies] +agent-runtime = "0.3.0" +``` + +### Step 2: Update Imports +```rust +// Add new types to imports +use agent_runtime::prelude::*; +use agent_runtime::{EventScope, EventType, ComponentStatus}; // NEW +``` + +### Step 3: Update Event Matching + +**Before:** +```rust +while let Some(event) = rx.recv().await { + match event.event_type { + EventType::AgentLlmStreamChunk => { + print!("{}", event.data["chunk"]); + } + EventType::ToolCallCompleted => { + println!("Tool finished"); + } + _ => {} + } +} +``` + +**After:** +```rust +while let Some(event) = rx.recv().await { + match (event.scope, event.event_type) { + (EventScope::LlmRequest, EventType::Progress) => { + print!("{}", event.data["chunk"]); + } + (EventScope::Tool, EventType::Completed) => { + println!("Tool finished: {}", event.component_id); + } + _ => {} + } +} +``` + +### Step 4: Update Event Creation (if applicable) + +**Before:** +```rust +stream.append( + EventType::ToolCallStarted, + Some("my_workflow".to_string()), + json!({"tool": "calculator"}), +).await?; +``` + +**After:** +```rust +// Option A: Helper method (recommended) +stream.tool_started( + "calculator", + Some("my_workflow".to_string()), + None, + json!({"args": {"a": 5}}), +).await; + +// Option B: Raw append +stream.append( + EventScope::Tool, + EventType::Started, + "calculator".to_string(), + ComponentStatus::Running, + None, + Some("my_workflow".to_string()), + None, + json!({"args": {"a": 5}}), +).await; +``` + +### Step 5: Update Custom Event Handling + +If you created custom events, map them to appropriate scopes: + +**Before:** +```rust +Event { + event_type: EventType::CustomEvent, + data: json!({"custom_field": "value"}), + .. +} +``` + +**After:** +```rust +// Choose appropriate scope based on context +Event::system_progress( + "system:custom_subsystem", + Some("Custom operation".to_string()), + None, + None, + json!({"custom_field": "value"}), +) +``` + +--- + +## Common Migration Patterns + +### Pattern 1: Workflow Event Listener + +**Before:** +```rust +tokio::spawn(async move { + while let Some(event) = rx.recv().await { + match event.event_type { + EventType::WorkflowStarted => println!("▶ Workflow started"), + EventType::WorkflowStepStarted => { + println!(" Step: {}", event.data["step_name"]); + } + EventType::WorkflowCompleted => println!("✓ Workflow done"), + _ => {} + } + } +}); +``` + +**After:** +```rust +tokio::spawn(async move { + while let Some(event) = rx.recv().await { + match (event.scope, event.event_type) { + (EventScope::Workflow, EventType::Started) => { + println!("▶ Workflow started: {}", event.component_id); + } + (EventScope::WorkflowStep, EventType::Started) => { + println!(" Step: {}", event.component_id); + } + (EventScope::Workflow, EventType::Completed) => { + println!("✓ Workflow done"); + } + _ => {} + } + } +}); +``` + +### Pattern 2: LLM Streaming + +**Before:** +```rust +match event.event_type { + EventType::AgentLlmStreamChunk => { + let chunk = event.data["chunk"].as_str().unwrap(); + print!("{}", chunk); + io::stdout().flush().unwrap(); + } + _ => {} +} +``` + +**After:** +```rust +match (event.scope, event.event_type) { + (EventScope::LlmRequest, EventType::Progress) => { + if let Some(chunk) = event.data["chunk"].as_str() { + print!("{}", chunk); + io::stdout().flush().unwrap(); + } + } + _ => {} +} +``` + +### Pattern 3: Tool Execution Tracking + +**Before:** +```rust +match event.event_type { + EventType::ToolCallStarted => { + let tool = &event.data["tool_name"]; + let args = &event.data["arguments"]; + println!("🔧 Calling {}: {:?}", tool, args); + } + EventType::ToolCallCompleted => { + let result = &event.data["result"]; + println!("✓ Tool result: {}", result); + } + EventType::ToolCallFailed => { + let error = &event.data["error"]; + eprintln!("✗ Tool failed: {}", error); + } + _ => {} +} +``` + +**After:** +```rust +match (event.scope, event.event_type) { + (EventScope::Tool, EventType::Started) => { + let args = &event.data["arguments"]; + println!("🔧 Calling {}: {:?}", event.component_id, args); + } + (EventScope::Tool, EventType::Completed) => { + let result = &event.data["result"]; + println!("✓ {} result: {}", event.component_id, result); + } + (EventScope::Tool, EventType::Failed) => { + let error = event.message.as_deref().unwrap_or("Unknown error"); + eprintln!("✗ {} failed: {}", event.component_id, error); + } + _ => {} +} +``` + +### Pattern 4: Component Status Tracking + +**New in v0.3.0** - Track component status across events: + +```rust +use std::collections::HashMap; + +let mut statuses = HashMap::new(); + +while let Some(event) = rx.recv().await { + // Update status tracking + statuses.insert(event.component_id.clone(), event.status); + + // Handle state transitions + match event.status { + ComponentStatus::Running => { + println!("▶ {} running", event.component_id); + } + ComponentStatus::Completed => { + println!("✓ {} completed", event.component_id); + } + ComponentStatus::Failed => { + let msg = event.message.as_deref().unwrap_or("Unknown"); + eprintln!("✗ {} failed: {}", event.component_id, msg); + } + _ => {} + } +} +``` + +--- + +## Helper Method Reference + +All helper methods follow a consistent signature pattern: + +```rust +// Basic lifecycle events (no message) +Event::agent_started(component_id, workflow_id, parent_workflow_id, data) +Event::agent_completed(component_id, workflow_id, parent_workflow_id, data) +Event::agent_failed(component_id, error_message, workflow_id, parent_workflow_id, data) + +// Progress events (with message) +Event::llm_progress(component_id, message, workflow_id, parent_workflow_id, data) +Event::tool_progress(component_id, message, workflow_id, parent_workflow_id, data) +Event::system_progress(component_id, message, workflow_id, parent_workflow_id, data) + +// Complete list +Event::workflow_started() +Event::workflow_completed() +Event::workflow_failed() +Event::workflow_step_started() +Event::workflow_step_completed() +Event::workflow_step_failed() +Event::agent_started() +Event::agent_progress() +Event::agent_completed() +Event::agent_failed() +Event::llm_started() +Event::llm_progress() +Event::llm_completed() +Event::llm_failed() +Event::tool_started() +Event::tool_progress() +Event::tool_completed() +Event::tool_failed() +Event::system_progress() +``` + +--- + +## Validation and Error Handling + +### Component ID Validation + +v0.3.0 enforces component ID formats. Invalid IDs are rejected: + +```rust +// ✓ Valid +Event::tool_started("calculator", None, None, json!({})).await; +Event::llm_started("agent:llm:0", None, None, json!({})).await; + +// ✗ Invalid - will return error +Event::tool_started("", None, None, json!({})).await; +// Error: "Component ID cannot be empty" + +Event::llm_started("invalid-format", None, None, json!({})).await; +// Error: "LlmRequest component_id must match 'agent_name:llm:N'" + +Event::workflow_step_started("workflow:step:invalid", None, None, json!({})).await; +// Error: "WorkflowStep component_id must match 'workflow_name:step:N' where N is a number" +``` + +### Error Messages + +The new `message` field provides human-readable context: + +```rust +// Recommended: Set message for failed events +Event::agent_failed( + "researcher", + "API rate limit exceeded".to_string(), // error message + Some("workflow_1".to_string()), + None, + json!({"retry_after": 60}), +).await; + +// Access message in handler +match (event.scope, event.event_type) { + (_, EventType::Failed) => { + if let Some(msg) = event.message { + eprintln!("Error: {}", msg); + } + } + _ => {} +} +``` + +--- + +## Testing Migration + +Update your test assertions: + +**Before:** +```rust +#[tokio::test] +async fn test_workflow_events() { + let event = rx.recv().await.unwrap(); + assert_eq!(event.event_type, EventType::WorkflowStarted); +} +``` + +**After:** +```rust +#[tokio::test] +async fn test_workflow_events() { + let event = rx.recv().await.unwrap(); + assert_eq!(event.scope, EventScope::Workflow); + assert_eq!(event.event_type, EventType::Started); + assert_eq!(event.status, ComponentStatus::Running); +} +``` + +--- + +## Performance Considerations + +### Event Emission is Async (Non-Blocking) + +v0.3.0 emits events asynchronously via `tokio::spawn()`: + +```rust +// Returns immediately, event emitted in background +let handle = stream.append(...).await; + +// Optional: await completion if needed +let event = handle.await.unwrap()?; +``` + +**Recommendation**: Ignore return value for fire-and-forget (most cases). + +### Streaming Batching (Future v0.4.0) + +Component ID formats enable future streaming optimizations: +- LLM chunks batched over 50ms windows +- Tool progress batched over 100ms windows +- Reduces event overhead for high-frequency updates + +--- + +## Troubleshooting + +### Issue: "Component ID cannot be empty" +**Cause**: Passing empty string to component_id +**Fix**: Provide valid component identifier + +### Issue: "component_id must match 'X:Y:N'" +**Cause**: Component ID doesn't follow required format for scope +**Fix**: See [Component ID Formats](#component-id-formats) table + +### Issue: Pattern match not exhaustive +**Cause**: Using old single-field match on `event.event_type` +**Fix**: Switch to `(event.scope, event.event_type)` tuple matching + +### Issue: Missing field errors +**Cause**: Accessing removed fields like `agent_name` +**Fix**: Use `component_id` instead. For scope-specific info, check `event.scope` + +### Issue: Helper method not found +**Cause**: Trying to use old event creation patterns +**Fix**: Import new types: `use agent_runtime::{EventScope, EventType, ComponentStatus}` + +--- + +## Backwards Compatibility + +**None.** v0.3.0 is a breaking release. All event-related code must be updated. + +To ease migration: +1. Update dependencies to `0.3.0` +2. Fix compilation errors (missing fields, changed enums) +3. Update pattern matching to `(scope, type)` tuples +4. Run tests to catch runtime issues +5. Use helper methods to simplify event creation + +--- + +## Need Help? + +- See [EVENT_STREAMING.md](EVENT_STREAMING.md) for comprehensive event system guide +- Check [CHANGELOG.md](../CHANGELOG.md) for complete list of changes +- Review updated examples in `src/bin/` directory +- File issues at project repository + +--- + +## Summary Checklist + +- [ ] Updated `agent-runtime` dependency to `0.3.0` +- [ ] Added imports for `EventScope`, `EventType`, `ComponentStatus` +- [ ] Changed event matching from `event.event_type` to `(event.scope, event.event_type)` +- [ ] Updated event field access (`agent_name` → `component_id`, added `status`, `message`) +- [ ] Replaced custom event creation with helper methods +- [ ] Updated component IDs to follow enforced formats +- [ ] Updated tests and assertions +- [ ] Verified all compilation errors resolved +- [ ] Ran test suite successfully + +**Welcome to v0.3.0!** 🎉 diff --git a/docs/UI_INTEGRATION.md b/docs/UI_INTEGRATION.md new file mode 100644 index 0000000..d0dd017 --- /dev/null +++ b/docs/UI_INTEGRATION.md @@ -0,0 +1,607 @@ +# Building UIs with the Async Event System + +The v0.3.0 unified event system is designed specifically to support real-time UI updates. This guide shows how to integrate various UI frameworks. + +## Core Concept + +The `EventStream` uses Tokio's `broadcast` channel, which supports **multiple subscribers**. Any UI can subscribe and receive real-time events as workflows execute. + +```rust +use agent_runtime::{Runtime, Event}; + +// Create runtime +let runtime = Runtime::new(); + +// Subscribe to events (can have multiple subscribers) +let mut ui_rx = runtime.event_stream().subscribe(); + +// Listen for events in UI thread/task +tokio::spawn(async move { + while let Ok(event) = ui_rx.recv().await { + // Update UI with event + update_ui(event); + } +}); +``` + +## Architecture Patterns + +### Pattern 1: WebSocket Server (Web UI) + +Stream events to web browsers via WebSocket: + +```rust +use axum::{ + extract::ws::{WebSocket, WebSocketUpgrade}, + response::Response, + routing::get, + Router, +}; +use agent_runtime::Runtime; +use std::sync::Arc; + +async fn ws_events( + ws: WebSocketUpgrade, + runtime: Arc, +) -> Response { + ws.on_upgrade(|socket| handle_socket(socket, runtime)) +} + +async fn handle_socket(mut socket: WebSocket, runtime: Arc) { + let mut rx = runtime.event_stream().subscribe(); + + while let Ok(event) = rx.recv().await { + // Serialize event to JSON and send to browser + let json = serde_json::to_string(&event).unwrap(); + if socket.send(Message::Text(json)).await.is_err() { + break; // Client disconnected + } + } +} + +#[tokio::main] +async fn main() { + let runtime = Arc::new(Runtime::new()); + + let app = Router::new() + .route("/events", get(ws_events)) + .with_state(runtime.clone()); + + // Start server + axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); +} +``` + +### Pattern 2: Server-Sent Events (SSE) + +For one-way streaming to browsers: + +```rust +use axum::{ + response::sse::{Event as SseEvent, Sse}, + routing::get, + Router, +}; +use futures::stream::{Stream, StreamExt}; +use agent_runtime::Runtime; +use std::sync::Arc; + +async fn sse_events( + runtime: Arc, +) -> Sse>> { + let rx = runtime.event_stream().subscribe(); + + let stream = async_stream::stream! { + let mut rx = rx; + while let Ok(event) = rx.recv().await { + let json = serde_json::to_string(&event).unwrap(); + yield Ok(SseEvent::default().data(json)); + } + }; + + Sse::new(stream) +} + +#[tokio::main] +async fn main() { + let runtime = Arc::new(Runtime::new()); + + let app = Router::new() + .route("/events", get(sse_events)) + .with_state(runtime); + + axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); +} +``` + +### Pattern 3: Desktop UI (egui, iced, etc.) + +For native desktop applications: + +```rust +use agent_runtime::{Runtime, Event, EventScope, EventType}; +use tokio::sync::mpsc; +use std::sync::Arc; + +struct AppState { + runtime: Arc, + events: Vec, + current_status: String, +} + +impl AppState { + fn new() -> Self { + let runtime = Arc::new(Runtime::new()); + + // Spawn event listener + let runtime_clone = runtime.clone(); + let (tx, mut rx) = mpsc::channel(100); + + tokio::spawn(async move { + let mut event_rx = runtime_clone.event_stream().subscribe(); + while let Ok(event) = event_rx.recv().await { + let _ = tx.send(event).await; + } + }); + + Self { + runtime, + events: Vec::new(), + current_status: "Ready".to_string(), + } + } + + fn update(&mut self, event: Event) { + // Update UI state based on event + match (event.scope, event.event_type) { + (EventScope::Workflow, EventType::Started) => { + self.current_status = format!("Workflow {} started", event.component_id); + } + (EventScope::Agent, EventType::Started) => { + self.current_status = format!("Agent {} processing", event.component_id); + } + (EventScope::LlmRequest, EventType::Progress) => { + // Streaming LLM response + if let Some(chunk) = event.data.get("chunk") { + self.current_status.push_str(chunk.as_str().unwrap_or("")); + } + } + _ => {} + } + + self.events.push(event); + } +} +``` + +### Pattern 4: Terminal UI (ratatui) + +For rich terminal interfaces: + +```rust +use ratatui::{ + backend::CrosstermBackend, + layout::{Constraint, Direction, Layout}, + widgets::{Block, Borders, List, ListItem, Paragraph}, + Terminal, +}; +use agent_runtime::{Event, Runtime, EventScope, EventType}; +use std::sync::Arc; +use tokio::sync::mpsc; + +async fn run_tui(runtime: Arc) -> Result<(), Box> { + let mut terminal = Terminal::new(CrosstermBackend::new(std::io::stdout()))?; + + let (event_tx, mut event_rx) = mpsc::channel(100); + + // Spawn event listener + let runtime_clone = runtime.clone(); + tokio::spawn(async move { + let mut rx = runtime_clone.event_stream().subscribe(); + while let Ok(event) = rx.recv().await { + let _ = event_tx.send(event).await; + } + }); + + let mut events = Vec::new(); + + loop { + // Try to receive new events + while let Ok(event) = event_rx.try_recv() { + events.push(format!( + "[{:?}] {:?}::{}", + event.timestamp.format("%H:%M:%S"), + event.scope, + event.event_type + )); + } + + // Render UI + terminal.draw(|f| { + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints([Constraint::Percentage(80), Constraint::Percentage(20)]) + .split(f.size()); + + let items: Vec = events.iter().map(|e| ListItem::new(e.as_str())).collect(); + let list = List::new(items).block(Block::default().title("Events").borders(Borders::ALL)); + + f.render_widget(list, chunks[0]); + })?; + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } +} +``` + +## Real-World Example: React Frontend + +### Backend (Axum + WebSocket) + +```rust +// src/main.rs +use axum::{ + extract::{ws::WebSocket, WebSocketUpgrade, State}, + response::Response, + routing::{get, post}, + Json, Router, +}; +use agent_runtime::{Runtime, Workflow, WorkflowBuilder}; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Clone)] +struct AppState { + runtime: Arc, + workflows: Arc>>, +} + +// WebSocket endpoint for event streaming +async fn ws_handler( + ws: WebSocketUpgrade, + State(state): State, +) -> Response { + ws.on_upgrade(move |socket| handle_ws_connection(socket, state)) +} + +async fn handle_ws_connection(mut socket: WebSocket, state: AppState) { + let mut rx = state.runtime.event_stream().subscribe(); + + while let Ok(event) = rx.recv().await { + let json = match serde_json::to_string(&event) { + Ok(j) => j, + Err(_) => continue, + }; + + if socket.send(axum::extract::ws::Message::Text(json)).await.is_err() { + break; + } + } +} + +// REST endpoint to start workflow +async fn start_workflow( + State(state): State, + Json(input): Json, +) -> Json { + // Start workflow execution (returns immediately, events stream via WebSocket) + let workflow = state.workflows.read().await[0].clone(); + + tokio::spawn(async move { + let _ = state.runtime.execute(workflow).await; + }); + + Json(serde_json::json!({"status": "started"})) +} + +#[tokio::main] +async fn main() { + let runtime = Arc::new(Runtime::new()); + + let state = AppState { + runtime, + workflows: Arc::new(RwLock::new(Vec::new())), + }; + + let app = Router::new() + .route("/ws", get(ws_handler)) + .route("/workflows/start", post(start_workflow)) + .with_state(state); + + axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); +} +``` + +### Frontend (React + TypeScript) + +```typescript +// EventStream.tsx +import { useEffect, useState } from 'react'; + +interface Event { + event_id: string; + scope: 'Workflow' | 'WorkflowStep' | 'Agent' | 'LlmRequest' | 'Tool' | 'System'; + event_type: 'Started' | 'Progress' | 'Completed' | 'Failed' | 'Canceled'; + component_id: string; + status: 'Pending' | 'Running' | 'Completed' | 'Failed' | 'Canceled'; + message?: string; + timestamp: string; + data: any; +} + +export const useEventStream = () => { + const [events, setEvents] = useState([]); + const [status, setStatus] = useState<'connecting' | 'connected' | 'disconnected'>('disconnected'); + + useEffect(() => { + const ws = new WebSocket('ws://localhost:3000/ws'); + + ws.onopen = () => setStatus('connected'); + ws.onclose = () => setStatus('disconnected'); + + ws.onmessage = (msg) => { + const event: Event = JSON.parse(msg.data); + setEvents(prev => [...prev, event]); + }; + + return () => ws.close(); + }, []); + + return { events, status }; +}; + +// WorkflowMonitor.tsx +export const WorkflowMonitor = () => { + const { events, status } = useEventStream(); + const [llmResponse, setLlmResponse] = useState(''); + + useEffect(() => { + // Update LLM response from streaming chunks + const latestEvent = events[events.length - 1]; + if (latestEvent?.scope === 'LlmRequest' && latestEvent?.event_type === 'Progress') { + const chunk = latestEvent.data.chunk || ''; + setLlmResponse(prev => prev + chunk); + } + }, [events]); + + return ( +
+
Connection: {status}
+ +
+

LLM Response

+

{llmResponse}

+
+ +
+

Event Log

+ {events.map(event => ( +
+ {event.scope} + {event.event_type} + {event.component_id} + {event.message && {event.message}} +
+ ))} +
+
+ ); +}; +``` + +## Multi-Subscriber Pattern + +The event system supports **unlimited subscribers**, each receiving all events: + +```rust +let runtime = Runtime::new(); + +// UI subscriber +let mut ui_rx = runtime.event_stream().subscribe(); +tokio::spawn(async move { + while let Ok(event) = ui_rx.recv().await { + update_ui(event); + } +}); + +// Logging subscriber +let mut log_rx = runtime.event_stream().subscribe(); +tokio::spawn(async move { + while let Ok(event) = log_rx.recv().await { + log_to_file(event); + } +}); + +// Metrics subscriber +let mut metrics_rx = runtime.event_stream().subscribe(); +tokio::spawn(async move { + while let Ok(event) = metrics_rx.recv().await { + record_metrics(event); + } +}); + +// Database subscriber +let mut db_rx = runtime.event_stream().subscribe(); +tokio::spawn(async move { + while let Ok(event) = db_rx.recv().await { + save_to_database(event); + } +}); +``` + +## Event Filtering in UI + +Filter events by scope, type, or component: + +```rust +while let Ok(event) = rx.recv().await { + match (event.scope, event.event_type) { + // Only show workflow-level events in top status bar + (EventScope::Workflow, _) => { + update_status_bar(event); + } + + // Show LLM streaming in response area + (EventScope::LlmRequest, EventType::Progress) => { + append_to_response_area(event.data["chunk"]); + } + + // Show all failures in error panel + (_, EventType::Failed) => { + show_error_notification(event); + } + + // Log everything to debug panel + _ => { + append_to_debug_log(event); + } + } +} +``` + +## Performance Considerations + +### Buffering High-Frequency Events + +For UIs, you may want to batch rapid events: + +```rust +use tokio::time::{interval, Duration}; + +let mut rx = runtime.event_stream().subscribe(); +let mut buffer = Vec::new(); +let mut tick = interval(Duration::from_millis(100)); + +loop { + tokio::select! { + Ok(event) = rx.recv() => { + buffer.push(event); + } + _ = tick.tick() => { + if !buffer.is_empty() { + update_ui_batch(&buffer); + buffer.clear(); + } + } + } +} +``` + +### Limiting Event History + +Prevent memory growth in long-running UIs: + +```rust +const MAX_EVENTS: usize = 1000; + +let mut events = Vec::new(); + +while let Ok(event) = rx.recv().await { + events.push(event); + + // Keep only recent events + if events.len() > MAX_EVENTS { + events.drain(0..events.len() - MAX_EVENTS); + } + + update_ui(&events); +} +``` + +## Complete Example: Progress Bar UI + +```rust +use indicatif::{ProgressBar, ProgressStyle}; +use agent_runtime::{Runtime, Event, EventScope, EventType}; +use std::sync::Arc; + +async fn run_workflow_with_progress(runtime: Arc) { + let mut rx = runtime.event_stream().subscribe(); + + let pb = ProgressBar::new(100); + pb.set_style( + ProgressStyle::default_bar() + .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}") + .unwrap() + ); + + let mut total_steps = 0; + let mut completed_steps = 0; + + while let Ok(event) = rx.recv().await { + match (event.scope, event.event_type) { + (EventScope::Workflow, EventType::Started) => { + if let Some(steps) = event.data.get("num_steps") { + total_steps = steps.as_u64().unwrap_or(0); + pb.set_length(total_steps); + } + pb.set_message("Workflow started"); + } + + (EventScope::WorkflowStep, EventType::Completed) => { + completed_steps += 1; + pb.set_position(completed_steps); + pb.set_message(format!("Step {} complete", event.component_id)); + } + + (EventScope::LlmRequest, EventType::Progress) => { + if let Some(chunk) = event.data.get("chunk") { + pb.println(chunk.as_str().unwrap_or("")); + } + } + + (EventScope::Workflow, EventType::Completed) => { + pb.finish_with_message("✓ Workflow complete"); + break; + } + + (_, EventType::Failed) => { + pb.abandon_with_message(format!("✗ Failed: {}", event.message.unwrap_or_default())); + break; + } + + _ => {} + } + } +} +``` + +## Benefits for UI Development + +### 1. Real-Time Updates +- No polling required +- Events arrive immediately as they happen +- Perfect for progress indicators, live logs, streaming responses + +### 2. Separation of Concerns +- Backend focuses on workflow execution +- Frontend focuses on presentation +- Events provide clean interface between layers + +### 3. Multiple Views +- Same event stream can power multiple UI components +- Dashboard, debug panel, notifications all from one source + +### 4. Historical Replay +- `event_stream().get_events(offset)` allows UI reconnection without losing history +- Perfect for page refreshes or mobile app backgrounds + +### 5. Testable +- Easy to mock event streams for UI testing +- Replay recorded events for UI development + +## See Also + +- [EVENT_STREAMING.md](EVENT_STREAMING.md) - Complete event system guide +- [MIGRATION_0.2_TO_0.3.md](MIGRATION_0.2_TO_0.3.md) - Migration from v0.2.x +- Example: `src/bin/async_events_demo.rs` - Basic event monitoring +- Example: `src/bin/workflow_demo.rs` - Real workflow with event display diff --git a/src/agent.rs b/src/agent.rs index d3170a4..8ebc261 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -1,4 +1,4 @@ -use crate::event::{EventStream, EventType}; +use crate::event::EventStream; use crate::llm::types::ToolCall; use crate::llm::{ChatClient, ChatMessage, ChatRequest}; use crate::tool::ToolRegistry; @@ -144,17 +144,18 @@ impl Agent { ) -> AgentResult { let start = std::time::Instant::now(); - // Emit agent processing event + let workflow_id = input + .metadata + .previous_agent + .clone() + .unwrap_or_else(|| "workflow".to_string()); + + // Emit Agent::Started event if let Some(stream) = event_stream { - stream.append( - EventType::AgentProcessing, - input - .metadata - .previous_agent - .clone() - .unwrap_or_else(|| "workflow".to_string()), + stream.agent_started( + &self.config.name, + workflow_id.clone(), serde_json::json!({ - "agent": self.config.name, "input": input.data, }), ); @@ -220,18 +221,14 @@ impl Agent { request = request.with_tools(schemas.clone()); } - // Emit LLM request started event + // Emit LlmRequest::Started event if let Some(stream) = event_stream { - stream.append( - EventType::AgentLlmRequestStarted, - input - .metadata - .previous_agent - .clone() - .unwrap_or_else(|| "workflow".to_string()), + stream.llm_started( + &self.config.name, + iteration, + workflow_id.clone(), serde_json::json!({ - "agent": self.config.name, - "iteration": iteration, + "messages": request.messages.len(), }), ); } @@ -239,11 +236,7 @@ impl Agent { // Call LLM with streaming + full response (for tool calls) let event_stream_for_streaming = event_stream.cloned(); let agent_name = self.config.name.clone(); - let previous_agent = input - .metadata - .previous_agent - .clone() - .unwrap_or_else(|| "workflow".to_string()); + let workflow_id_for_streaming = workflow_id.clone(); // Create channel for streaming chunks let (chunk_tx, mut chunk_rx) = tokio::sync::mpsc::channel(100); @@ -252,13 +245,11 @@ impl Agent { let _chunk_event_task = tokio::spawn(async move { while let Some(chunk) = chunk_rx.recv().await { if let Some(stream) = &event_stream_for_streaming { - stream.append( - EventType::AgentLlmStreamChunk, - previous_agent.clone(), - serde_json::json!({ - "agent": &agent_name, - "chunk": chunk, - }), + stream.llm_progress( + &agent_name, + iteration, + workflow_id_for_streaming.clone(), + chunk, ); } } @@ -266,17 +257,15 @@ impl Agent { match client.chat_stream(request.clone(), chunk_tx).await { Ok(response) => { - // Emit LLM request completed event + // Emit LlmRequest::Completed event if let Some(stream) = event_stream { - stream.append( - EventType::AgentLlmRequestCompleted, - input - .metadata - .previous_agent - .clone() - .unwrap_or_else(|| "workflow".to_string()), + stream.llm_completed( + &self.config.name, + iteration, + workflow_id.clone(), serde_json::json!({ - "agent": self.config.name, + "content": response.content.chars().take(100).collect::(), + "has_tool_calls": response.tool_calls.is_some(), }), ); } @@ -327,17 +316,18 @@ impl Agent { &previous_result, ); - // Emit loop detected event + // Emit tool loop detected event (System scope) if let Some(stream) = event_stream { stream.append( - EventType::AgentToolLoopDetected, - input - .metadata - .previous_agent - .clone() - .unwrap_or_else(|| { - "workflow".to_string() - }), + crate::event::EventScope::System, + crate::event::EventType::Progress, + "system:tool_loop_detection".to_string(), + crate::event::ComponentStatus::Running, + workflow_id.clone(), + Some(format!( + "Tool loop detected: {}", + tool_call.function.name + )), serde_json::json!({ "agent": self.config.name, "tool": tool_call.function.name, @@ -426,18 +416,19 @@ impl Agent { // Add final assistant response to chat history request.messages.push(ChatMessage::assistant(response_text)); - // Emit agent completed event + // Emit Agent::Completed event if let Some(stream) = event_stream { - stream.append( - EventType::AgentCompleted, - input - .metadata - .previous_agent - .clone() - .unwrap_or_else(|| "workflow".to_string()), + stream.agent_completed( + &self.config.name, + workflow_id.clone(), + Some(format!( + "Agent completed in {}ms", + start.elapsed().as_millis() + )), serde_json::json!({ - "agent": self.config.name, "execution_time_ms": start.elapsed().as_millis() as u64, + "tool_calls": total_tool_calls, + "iterations": iteration, }), ); } @@ -453,35 +444,23 @@ impl Agent { }); } Err(e) => { - // Emit LLM request failed event + // Emit LlmRequest::Failed event if let Some(stream) = event_stream { - stream.append( - EventType::AgentLlmRequestFailed, - input - .metadata - .previous_agent - .clone() - .unwrap_or_else(|| "workflow".to_string()), - serde_json::json!({ - "agent": self.config.name, - "error": e.to_string(), - }), + stream.llm_failed( + &self.config.name, + iteration, + workflow_id.clone(), + &e.to_string(), ); } - // Emit agent failed event + // Emit Agent::Failed event if let Some(stream) = event_stream { - stream.append( - EventType::AgentFailed, - input - .metadata - .previous_agent - .clone() - .unwrap_or_else(|| "workflow".to_string()), - serde_json::json!({ - "agent": self.config.name, - "error": e.to_string(), - }), + stream.agent_failed( + &self.config.name, + workflow_id.clone(), + &e.to_string(), + serde_json::json!({}), ); } @@ -502,15 +481,11 @@ impl Agent { }); if let Some(stream) = event_stream { - stream.append( - EventType::AgentCompleted, - input - .metadata - .previous_agent - .clone() - .unwrap_or_else(|| "workflow".to_string()), + stream.agent_completed( + &self.config.name, + workflow_id.clone(), + Some("Agent completed (no LLM)".to_string()), serde_json::json!({ - "agent": self.config.name, "execution_time_ms": start.elapsed().as_millis() as u64, "mock": true, }), @@ -538,14 +513,13 @@ impl Agent { ) -> String { let tool_name = &tool_call.function.name; - // Emit tool call started event + // Emit Tool::Started event if let Some(stream) = event_stream { - stream.append( - EventType::ToolCallStarted, + stream.tool_started( + tool_name, previous_agent.to_string(), serde_json::json!({ "agent": self.config.name, - "tool": tool_name, "tool_call_id": tool_call.id, "arguments": tool_call.function.arguments, }), @@ -558,15 +532,13 @@ impl Agent { None => { let error_msg = "No tool registry configured".to_string(); if let Some(stream) = event_stream { - stream.append( - EventType::ToolCallFailed, + stream.tool_failed( + tool_name, previous_agent.to_string(), + &error_msg, serde_json::json!({ "agent": self.config.name, - "tool": tool_name, "tool_call_id": tool_call.id, - "arguments": tool_call.function.arguments, - "error": error_msg, "duration_ms": 0, }), ); @@ -582,15 +554,14 @@ impl Agent { Err(e) => { let error_msg = format!("Failed to parse tool arguments: {}", e); if let Some(stream) = event_stream { - stream.append( - EventType::ToolCallFailed, + stream.tool_failed( + tool_name, previous_agent.to_string(), + &error_msg, serde_json::json!({ "agent": self.config.name, - "tool": tool_name, "tool_call_id": tool_call.id, - "arguments": tool_call.function.arguments, - "error": error_msg, + "duration_ms": 0, "duration_ms": 0, }), ); @@ -603,16 +574,14 @@ impl Agent { let start_time = std::time::Instant::now(); match registry.call_tool(tool_name, params.clone()).await { Ok(result) => { - // Emit tool call completed event + // Emit Tool::Completed event if let Some(stream) = event_stream { - stream.append( - EventType::ToolCallCompleted, + stream.tool_completed( + tool_name, previous_agent.to_string(), serde_json::json!({ "agent": self.config.name, - "tool": tool_name, "tool_call_id": tool_call.id, - "arguments": params, "result": result.output, "duration_ms": (result.duration_ms * 1000.0).round() / 1000.0, }), @@ -625,15 +594,13 @@ impl Agent { Err(e) => { let error_msg = format!("Tool execution failed: {}", e); if let Some(stream) = event_stream { - stream.append( - EventType::ToolCallFailed, + stream.tool_failed( + tool_name, previous_agent.to_string(), + &error_msg, serde_json::json!({ "agent": self.config.name, - "tool": tool_name, "tool_call_id": tool_call.id, - "arguments": params, - "error": error_msg, "duration_ms": start_time.elapsed().as_secs_f64() * 1000.0, }), ); diff --git a/src/bin/agent_with_tools_demo.rs b/src/bin/agent_with_tools_demo.rs index 4330369..d35743a 100644 --- a/src/bin/agent_with_tools_demo.rs +++ b/src/bin/agent_with_tools_demo.rs @@ -1,7 +1,7 @@ use agent_runtime::llm::LlamaClient; use agent_runtime::tool::{NativeTool, ToolRegistry}; use agent_runtime::types::{AgentInputMetadata, ToolError, ToolResult}; -use agent_runtime::{Agent, AgentConfig, AgentInput, EventType, FileLogger, Runtime}; +use agent_runtime::{Agent, AgentConfig, AgentInput, FileLogger, Runtime}; use serde_json::json; use std::fs; use std::sync::Arc; @@ -144,17 +144,22 @@ async fn main() -> Result<(), Box> { ); // Print tool call events to console - match event.event_type { - EventType::ToolCallStarted => { - if let Some(tool) = event.data.get("tool_name").and_then(|v| v.as_str()) { - println!(" 🔧 Calling tool: {}", tool); - } + match (event.scope.clone(), event.event_type.clone()) { + ( + agent_runtime::event::EventScope::Tool, + agent_runtime::event::EventType::Started, + ) => { + println!(" 🔧 Calling tool: {}", event.component_id); } - EventType::ToolCallCompleted => { - if let Some(tool) = event.data.get("tool_name").and_then(|v| v.as_str()) { - if let Some(duration) = event.data.get("duration_ms") { - println!(" ✓ Tool {} completed in {}ms", tool, duration); - } + ( + agent_runtime::event::EventScope::Tool, + agent_runtime::event::EventType::Completed, + ) => { + if let Some(duration) = event.data.get("duration_ms") { + println!( + " ✓ Tool {} completed in {}ms", + event.component_id, duration + ); } } _ => {} diff --git a/src/bin/async_events_demo.rs b/src/bin/async_events_demo.rs new file mode 100644 index 0000000..c9de83e --- /dev/null +++ b/src/bin/async_events_demo.rs @@ -0,0 +1,251 @@ +/// Async Event Streaming Demonstration +/// +/// This demo shows the v0.3.0 unified event system with artificial delays +/// to make the async event sequence clearly visible. +/// +/// Features demonstrated: +/// - Workflow lifecycle events (Started, Completed) +/// - WorkflowStep events for each step +/// - Complete event timestamps showing async behavior +/// - 500ms artificial delays to make sequence observable +/// +/// Run with: cargo run --bin async_events_demo + +use agent_runtime::event::{Event, EventScope, EventType}; +use agent_runtime::EventStream; +use std::io::{self, Write}; +use std::time::Duration; +use tokio::time::sleep; + +/// Event monitor that displays events in real-time with formatting +async fn monitor_events(mut rx: tokio::sync::broadcast::Receiver) { + println!("\n╔═══════════════════════════════════════════════════════════════╗"); + println!("║ EVENT STREAM MONITOR ║"); + println!("╚═══════════════════════════════════════════════════════════════╝\n"); + + let start_time = std::time::Instant::now(); + + while let Ok(event) = rx.recv().await { + let elapsed = start_time.elapsed().as_secs_f64(); + let scope = event.scope.clone(); + let event_type = event.event_type.clone(); + + match (scope, event_type) { + (EventScope::Workflow, EventType::Started) => { + println!( + "🚀 [{:>6.2}s] Workflow Started: {}", + elapsed, event.component_id + ); + println!(" └─ Status: {:?}", event.status); + } + (EventScope::Workflow, EventType::Completed) => { + println!( + "\n✅ [{:>6.2}s] Workflow Completed: {}", + elapsed, event.component_id + ); + if let Some(duration) = event.data.get("duration_ms") { + println!(" └─ Total Duration: {}ms", duration); + } + println!("\n{}", "═".repeat(65)); + break; // Exit after workflow completes + } + + (EventScope::WorkflowStep, EventType::Started) => { + println!( + "\n▶ [{:>6.2}s] Step Started: {}", + elapsed, event.component_id + ); + if let Some(step_type) = event.data.get("step_type") { + println!(" └─ Type: {}", step_type); + } + } + (EventScope::WorkflowStep, EventType::Completed) => { + println!( + "⏹ [{:>6.2}s] Step Completed: {}", + elapsed, event.component_id + ); + if let Some(duration) = event.data.get("duration_ms") { + println!(" └─ Duration: {}ms", duration); + } + } + + (EventScope::System, EventType::Progress) => { + println!( + " ⚙ [{:>6.2}s] System: {}", + elapsed, + event.message.as_deref().unwrap_or("Progress") + ); + } + + (_, EventType::Failed) => { + eprintln!( + "❌ [{:>6.2}s] {:?} Failed: {}", + elapsed, event.scope, event.component_id + ); + if let Some(msg) = &event.message { + eprintln!(" └─ Error: {}", msg); + } + break; // Exit on failure + } + + _ => { + // Other events + println!( + " · [{:>6.2}s] {:?}::{:?} ({})", + elapsed, event.scope, event.event_type, event.component_id + ); + } + } + + io::stdout().flush().unwrap(); + } +} + +/// Simulates a workflow step with artificial delay +async fn simulate_step( + stream: &EventStream, + workflow_id: &str, + step_num: usize, + delay_ms: u64, +) { + use agent_runtime::event::{ComponentStatus, EventScope, EventType}; + + let component_id = format!("demo_workflow:step:{}", step_num); + + // Emit step started event + stream + .append( + EventScope::WorkflowStep, + EventType::Started, + component_id.clone(), + ComponentStatus::Running, + workflow_id.to_string(), + None, + serde_json::json!({ + "step_type": "transform", + "step_number": step_num + }), + ) + .await; + + // Simulate work with delay + sleep(Duration::from_millis(delay_ms)).await; + + // Emit step completed event + stream + .append( + EventScope::WorkflowStep, + EventType::Completed, + component_id, + ComponentStatus::Completed, + workflow_id.to_string(), + None, + serde_json::json!({ + "step_type": "transform", + "duration_ms": delay_ms, + "output": format!("Step {} result", step_num) + }), + ) + .await; +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("\n╔═══════════════════════════════════════════════════════════════╗"); + println!("║ ASYNC EVENT STREAMING DEMONSTRATION (v0.3.0) ║"); + println!("║ ║"); + println!("║ This demo shows the unified event system with artificial ║"); + println!("║ delays to make the event sequence clearly visible. ║"); + println!("║ ║"); + println!("║ • 10 workflow steps ║"); + println!("║ • 500ms delay per step ║"); + println!("║ • Real-time async event emission ║"); + println!("║ • Complete lifecycle tracking ║"); + println!("║ • Unified Scope × Type × Status pattern ║"); + println!("╚═══════════════════════════════════════════════════════════════╝\n"); + + const NUM_STEPS: usize = 10; + const STEP_DELAY_MS: u64 = 500; + + println!("✓ Creating workflow with {} steps", NUM_STEPS); + println!("✓ Each step has {}ms artificial delay", STEP_DELAY_MS); + println!( + "✓ Total expected runtime: ~{:.1}s", + (NUM_STEPS as f64 * STEP_DELAY_MS as f64) / 1000.0 + ); + + // Create event stream + let stream = EventStream::new(); + let rx = stream.subscribe(); + + // Spawn event monitor in background + tokio::spawn(monitor_events(rx)); + + // Small delay to let monitor start + sleep(Duration::from_millis(100)).await; + + println!("\n⏳ Starting workflow execution...\n"); + + let workflow_id = "demo_workflow"; + let start_time = std::time::Instant::now(); + + // Emit workflow started event + stream + .append( + EventScope::Workflow, + EventType::Started, + workflow_id.to_string(), + agent_runtime::event::ComponentStatus::Running, + workflow_id.to_string(), + None, + serde_json::json!({ + "num_steps": NUM_STEPS, + "input": "Demonstration input data" + }), + ) + .await; + + // Execute each step sequentially + for step_num in 0..NUM_STEPS { + simulate_step(&stream, workflow_id, step_num, STEP_DELAY_MS).await; + } + + let total_duration_ms = start_time.elapsed().as_millis() as u64; + + // Emit workflow completed event + stream + .append( + EventScope::Workflow, + EventType::Completed, + workflow_id.to_string(), + agent_runtime::event::ComponentStatus::Completed, + workflow_id.to_string(), + None, + serde_json::json!({ + "steps_completed": NUM_STEPS, + "duration_ms": total_duration_ms, + "output": "All steps completed successfully" + }), + ) + .await; + + // Wait for final events to be displayed + sleep(Duration::from_millis(500)).await; + + println!("\n╔═══════════════════════════════════════════════════════════════╗"); + println!("║ DEMONSTRATION COMPLETE ║"); + println!("╚═══════════════════════════════════════════════════════════════╝\n"); + println!("✓ {} steps executed", NUM_STEPS); + println!("✓ Total time: {:.2}s", total_duration_ms as f64 / 1000.0); + println!("✓ All events emitted asynchronously"); + println!("\nKey observations:"); + println!(" • Events appear in real-time as work progresses"); + println!(" • Timestamps show async execution (no blocking)"); + println!(" • Unified event pattern (Scope × Type × Status)"); + println!(" • Component IDs follow enforced format (workflow:step:N)"); + println!("\nTry this with a real workflow:"); + println!(" cargo run --bin workflow_demo"); + println!(); + + Ok(()) +} diff --git a/src/bin/multi_subscriber.rs b/src/bin/multi_subscriber.rs index e208802..a0110dd 100644 --- a/src/bin/multi_subscriber.rs +++ b/src/bin/multi_subscriber.rs @@ -1,5 +1,5 @@ use agent_runtime::{ - event::EventType, + event::{EventScope, EventType}, tool::{EchoTool, ToolRegistry}, AgentConfig, AgentStep, Runtime, Workflow, }; @@ -36,7 +36,10 @@ async fn main() { let logger = tokio::spawn(async move { println!("[Logger] Started"); while let Ok(event) = subscriber1.recv().await { - println!("[Logger] {:?} @ offset {}", event.event_type, event.offset); + println!( + "[Logger] {:?}::{:?} @ offset {}", + event.scope, event.event_type, event.offset + ); } }); @@ -44,14 +47,10 @@ async fn main() { let workflow_monitor = tokio::spawn(async move { println!("[Workflow Monitor] Started"); while let Ok(event) = subscriber2.recv().await { - if matches!( - event.event_type, - EventType::WorkflowStarted - | EventType::WorkflowCompleted - | EventType::WorkflowFailed - ) { + if event.scope == EventScope::Workflow { println!( - "[Workflow Monitor] 🔔 {:?} - {}", + "[Workflow Monitor] 🔔 {:?}::{:?} - {}", + event.scope, event.event_type, serde_json::to_string(&event.data).unwrap() ); @@ -68,15 +67,12 @@ async fn main() { while let Ok(event) = subscriber3.recv().await { total_events += 1; - if matches!( - event.event_type, - EventType::AgentInitialized | EventType::AgentCompleted | EventType::AgentFailed - ) { + if event.scope == EventScope::Agent { agent_events += 1; } // Print periodic summary - if event.event_type == EventType::WorkflowCompleted { + if event.scope == EventScope::Workflow && event.event_type == EventType::Completed { println!( "[Metrics] 📊 Total: {}, Agent-related: {}", total_events, agent_events diff --git a/src/bin/workflow_demo.rs b/src/bin/workflow_demo.rs index 61b189c..90583f1 100644 --- a/src/bin/workflow_demo.rs +++ b/src/bin/workflow_demo.rs @@ -1,6 +1,6 @@ use agent_runtime::{ llm::{ChatClient, LlamaClient}, - Agent, AgentConfig, AgentStep, EventType, FileLogger, Runtime, Workflow, + Agent, AgentConfig, AgentStep, FileLogger, Runtime, Workflow, }; use std::fs; use std::sync::Arc; @@ -94,35 +94,51 @@ async fn main() { serde_json::to_string(&event.data).unwrap_or_default(), ); - match event.event_type { - EventType::AgentProcessing => { + match (event.scope.clone(), event.event_type.clone()) { + ( + agent_runtime::event::EventScope::Agent, + agent_runtime::event::EventType::Started, + ) => { if let Some(agent) = event.data.get("agent").and_then(|v| v.as_str()) { println!("\n🤖 {} >", agent); std::io::Write::flush(&mut std::io::stdout()).ok(); } } - EventType::AgentLlmStreamChunk => { + ( + agent_runtime::event::EventScope::LlmRequest, + agent_runtime::event::EventType::Progress, + ) => { if let Some(chunk) = event.data.get("chunk").and_then(|v| v.as_str()) { print!("{}", chunk); std::io::Write::flush(&mut std::io::stdout()).ok(); } } - EventType::AgentLlmRequestCompleted => { + ( + agent_runtime::event::EventScope::LlmRequest, + agent_runtime::event::EventType::Completed, + ) => { println!(); // New line after streaming completes } - EventType::AgentLlmRequestFailed => { - if let Some(_agent) = event.data.get("agent").and_then(|v| v.as_str()) { - if let Some(error) = event.data.get("error").and_then(|v| v.as_str()) { - println!("\n ❌ Error: {}", error); - } + ( + agent_runtime::event::EventScope::LlmRequest, + agent_runtime::event::EventType::Failed, + ) => { + if let Some(error) = event.message.as_ref() { + println!("\n ❌ Error: {}", error); } } - EventType::WorkflowCompleted => { + ( + agent_runtime::event::EventScope::Workflow, + agent_runtime::event::EventType::Completed, + ) => { println!("\n{}", "=".repeat(60)); println!("✅ Workflow Completed"); break; } - EventType::WorkflowFailed => { + ( + agent_runtime::event::EventScope::Workflow, + agent_runtime::event::EventType::Failed, + ) => { println!("\n{}", "=".repeat(60)); println!("❌ Workflow Failed"); break; diff --git a/src/event.rs b/src/event.rs index 07bd491..f26e192 100644 --- a/src/event.rs +++ b/src/event.rs @@ -8,38 +8,38 @@ use tokio::sync::broadcast; #[path = "event_test.rs"] mod event_test; -/// Event types that can occur in the system +/// Event scope - which component is emitting the event +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum EventScope { + Workflow, + WorkflowStep, + Agent, + LlmRequest, + Tool, + System, +} + +/// Event type - standard lifecycle events #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum EventType { - // Workflow events - WorkflowStarted, - WorkflowStepStarted, - WorkflowStepCompleted, - WorkflowCompleted, - WorkflowFailed, - - // Agent events - AgentInitialized, - AgentProcessing, - AgentCompleted, - AgentFailed, - - // LLM events - AgentLlmRequestStarted, - AgentLlmStreamChunk, - AgentLlmRequestCompleted, - AgentLlmRequestFailed, - - // Tool events - ToolCallStarted, - ToolCallCompleted, - ToolCallFailed, - AgentToolLoopDetected, - - // System events - SystemError, - StateSaved, + Started, + Progress, + Completed, + Failed, + Canceled, +} + +/// Component status after event +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ComponentStatus { + Pending, + Running, + Completed, + Failed, + Canceled, } /// An immutable event record @@ -48,50 +48,161 @@ pub struct Event { pub id: EventId, pub offset: EventOffset, pub timestamp: DateTime, + + /// Event scope (component type) + pub scope: EventScope, + + /// Event type (lifecycle stage) #[serde(rename = "type")] pub event_type: EventType, + + /// Component identifier (follows standardized format per scope) + pub component_id: String, + + /// Current status of the component + pub status: ComponentStatus, + + /// Workflow context pub workflow_id: WorkflowId, /// Optional parent workflow ID for nested workflows #[serde(skip_serializing_if = "Option::is_none")] pub parent_workflow_id: Option, + /// Optional human-readable message + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, + + /// Flexible payload for component-specific data pub data: JsonValue, } impl Event { + #[allow(clippy::too_many_arguments)] pub fn new( offset: EventOffset, + scope: EventScope, event_type: EventType, + component_id: String, + status: ComponentStatus, workflow_id: WorkflowId, + message: Option, data: JsonValue, - ) -> Self { - Self { + ) -> Result { + // Validate component_id format + Self::validate_component_id(&scope, &component_id)?; + + Ok(Self { id: format!("evt_{}", uuid::Uuid::new_v4()), offset, timestamp: Utc::now(), + scope, event_type, + component_id, + status, workflow_id, parent_workflow_id: None, + message, data, - } + }) } + #[allow(clippy::too_many_arguments)] pub fn with_parent( offset: EventOffset, + scope: EventScope, event_type: EventType, + component_id: String, + status: ComponentStatus, workflow_id: WorkflowId, parent_workflow_id: Option, + message: Option, data: JsonValue, - ) -> Self { - Self { + ) -> Result { + // Validate component_id format + Self::validate_component_id(&scope, &component_id)?; + + Ok(Self { id: format!("evt_{}", uuid::Uuid::new_v4()), offset, timestamp: Utc::now(), + scope, event_type, + component_id, + status, workflow_id, parent_workflow_id, + message, data, + }) + } + + /// Validate component_id follows the required format for the scope + fn validate_component_id(scope: &EventScope, component_id: &str) -> Result<(), String> { + if component_id.is_empty() { + return Err(format!("{:?} component_id cannot be empty", scope)); + } + + match scope { + EventScope::Workflow => { + // Simple name, no special format required + Ok(()) + } + EventScope::WorkflowStep => { + // Must match: name:step:N + let parts: Vec<&str> = component_id.split(':').collect(); + if parts.len() != 3 || parts[1] != "step" { + return Err(format!( + "WorkflowStep component_id must be 'workflow_name:step:N', got '{}'", + component_id + )); + } + // Validate N is a number + if parts[2].parse::().is_err() { + return Err(format!( + "WorkflowStep index must be a number, got '{}'", + parts[2] + )); + } + Ok(()) + } + EventScope::Agent => { + // Simple name, no special format required + Ok(()) + } + EventScope::LlmRequest => { + // Must match: agent_name:llm:N + let parts: Vec<&str> = component_id.split(':').collect(); + if parts.len() != 3 || parts[1] != "llm" { + return Err(format!( + "LlmRequest component_id must be 'agent_name:llm:N', got '{}'", + component_id + )); + } + // Validate N is a number + if parts[2].parse::().is_err() { + return Err(format!( + "LlmRequest iteration must be a number, got '{}'", + parts[2] + )); + } + Ok(()) + } + EventScope::Tool => { + // tool_name or tool_name:N + // No strict format required, but validate not empty + Ok(()) + } + EventScope::System => { + // Must start with 'system:' + if !component_id.starts_with("system:") { + return Err(format!( + "System component_id must start with 'system:', got '{}'", + component_id + )); + } + Ok(()) + } } } } @@ -133,41 +244,74 @@ impl EventStream { /// /// # Examples /// ```no_run - /// use agent_runtime::event::{EventStream, EventType}; + /// use agent_runtime::event::{EventStream, EventScope, EventType, ComponentStatus}; /// use serde_json::json; /// /// # async fn example() { /// let stream = EventStream::new(); /// /// // Fire and forget (most common) - /// stream.append(EventType::AgentInitialized, "workflow_1".to_string(), json!({})); + /// stream.append( + /// EventScope::Agent, + /// EventType::Started, + /// "my_agent".to_string(), + /// ComponentStatus::Running, + /// "workflow_1".to_string(), + /// None, + /// json!({}) + /// ); /// /// // Wait for event if needed - /// let event = stream.append(EventType::AgentCompleted, "workflow_1".to_string(), json!({})) - /// .await - /// .unwrap(); + /// let event = stream.append( + /// EventScope::Agent, + /// EventType::Completed, + /// "my_agent".to_string(), + /// ComponentStatus::Completed, + /// "workflow_1".to_string(), + /// Some("Agent completed successfully".to_string()), + /// json!({}) + /// ).await.unwrap(); /// # } /// ``` + #[allow(clippy::too_many_arguments)] pub fn append( &self, + scope: EventScope, event_type: EventType, + component_id: String, + status: ComponentStatus, workflow_id: WorkflowId, + message: Option, data: JsonValue, - ) -> tokio::task::JoinHandle { - self.append_with_parent(event_type, workflow_id, None, data) + ) -> tokio::task::JoinHandle> { + self.append_with_parent( + scope, + event_type, + component_id, + status, + workflow_id, + None, + message, + data, + ) } /// Append event with optional parent workflow ID /// /// Events are emitted asynchronously to avoid blocking execution. /// Returns a JoinHandle that resolves to the created Event. + #[allow(clippy::too_many_arguments)] pub fn append_with_parent( &self, + scope: EventScope, event_type: EventType, + component_id: String, + status: ComponentStatus, workflow_id: WorkflowId, parent_workflow_id: Option, + message: Option, data: JsonValue, - ) -> tokio::task::JoinHandle { + ) -> tokio::task::JoinHandle> { let sender = self.sender.clone(); let history = self.history.clone(); let next_offset = self.next_offset.clone(); @@ -182,8 +326,17 @@ impl EventStream { current }; - let event = - Event::with_parent(offset, event_type, workflow_id, parent_workflow_id, data); + let event = Event::with_parent( + offset, + scope, + event_type, + component_id, + status, + workflow_id, + parent_workflow_id, + message, + data, + )?; // Store in history history.write().unwrap().push(event.clone()); @@ -191,10 +344,325 @@ impl EventStream { // Broadcast to subscribers (ignore if no active receivers) let _ = sender.send(event.clone()); - event + Ok(event) }) } + // Helper methods for common event patterns + + /// Emit Agent::Started event + pub fn agent_started( + &self, + agent_name: &str, + workflow_id: WorkflowId, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::Agent, + EventType::Started, + agent_name.to_string(), + ComponentStatus::Running, + workflow_id, + None, + data, + ) + } + + /// Emit Agent::Completed event + pub fn agent_completed( + &self, + agent_name: &str, + workflow_id: WorkflowId, + message: Option, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::Agent, + EventType::Completed, + agent_name.to_string(), + ComponentStatus::Completed, + workflow_id, + message, + data, + ) + } + + /// Emit Agent::Failed event + pub fn agent_failed( + &self, + agent_name: &str, + workflow_id: WorkflowId, + error: &str, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::Agent, + EventType::Failed, + agent_name.to_string(), + ComponentStatus::Failed, + workflow_id, + Some(error.to_string()), + data, + ) + } + + /// Emit LlmRequest::Started event + pub fn llm_started( + &self, + agent_name: &str, + iteration: usize, + workflow_id: WorkflowId, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::LlmRequest, + EventType::Started, + format!("{}:llm:{}", agent_name, iteration), + ComponentStatus::Running, + workflow_id, + None, + data, + ) + } + + /// Emit LlmRequest::Progress event (streaming chunk) + pub fn llm_progress( + &self, + agent_name: &str, + iteration: usize, + workflow_id: WorkflowId, + chunk: String, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::LlmRequest, + EventType::Progress, + format!("{}:llm:{}", agent_name, iteration), + ComponentStatus::Running, + workflow_id, + None, + serde_json::json!({ "chunk": chunk }), + ) + } + + /// Emit LlmRequest::Completed event + pub fn llm_completed( + &self, + agent_name: &str, + iteration: usize, + workflow_id: WorkflowId, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::LlmRequest, + EventType::Completed, + format!("{}:llm:{}", agent_name, iteration), + ComponentStatus::Completed, + workflow_id, + None, + data, + ) + } + + /// Emit LlmRequest::Failed event + pub fn llm_failed( + &self, + agent_name: &str, + iteration: usize, + workflow_id: WorkflowId, + error: &str, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::LlmRequest, + EventType::Failed, + format!("{}:llm:{}", agent_name, iteration), + ComponentStatus::Failed, + workflow_id, + Some(error.to_string()), + serde_json::json!({}), + ) + } + + /// Emit Tool::Started event + pub fn tool_started( + &self, + tool_name: &str, + workflow_id: WorkflowId, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::Tool, + EventType::Started, + tool_name.to_string(), + ComponentStatus::Running, + workflow_id, + None, + data, + ) + } + + /// Emit Tool::Progress event + pub fn tool_progress( + &self, + tool_name: &str, + workflow_id: WorkflowId, + message: &str, + percent: Option, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::Tool, + EventType::Progress, + tool_name.to_string(), + ComponentStatus::Running, + workflow_id, + Some(message.to_string()), + serde_json::json!({ "percent": percent }), + ) + } + + /// Emit Tool::Completed event + pub fn tool_completed( + &self, + tool_name: &str, + workflow_id: WorkflowId, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::Tool, + EventType::Completed, + tool_name.to_string(), + ComponentStatus::Completed, + workflow_id, + None, + data, + ) + } + + /// Emit Tool::Failed event + pub fn tool_failed( + &self, + tool_name: &str, + workflow_id: WorkflowId, + error: &str, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::Tool, + EventType::Failed, + tool_name.to_string(), + ComponentStatus::Failed, + workflow_id, + Some(error.to_string()), + data, + ) + } + + /// Emit Workflow::Started event + pub fn workflow_started( + &self, + workflow_name: &str, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::Workflow, + EventType::Started, + workflow_name.to_string(), + ComponentStatus::Running, + workflow_name.to_string(), + None, + data, + ) + } + + /// Emit Workflow::Completed event + pub fn workflow_completed( + &self, + workflow_name: &str, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::Workflow, + EventType::Completed, + workflow_name.to_string(), + ComponentStatus::Completed, + workflow_name.to_string(), + None, + data, + ) + } + + /// Emit Workflow::Failed event + pub fn workflow_failed( + &self, + workflow_name: &str, + error: &str, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::Workflow, + EventType::Failed, + workflow_name.to_string(), + ComponentStatus::Failed, + workflow_name.to_string(), + Some(error.to_string()), + data, + ) + } + + /// Emit WorkflowStep::Started event + pub fn step_started( + &self, + workflow_name: &str, + step_index: usize, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::WorkflowStep, + EventType::Started, + format!("{}:step:{}", workflow_name, step_index), + ComponentStatus::Running, + workflow_name.to_string(), + None, + data, + ) + } + + /// Emit WorkflowStep::Completed event + pub fn step_completed( + &self, + workflow_name: &str, + step_index: usize, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::WorkflowStep, + EventType::Completed, + format!("{}:step:{}", workflow_name, step_index), + ComponentStatus::Completed, + workflow_name.to_string(), + None, + data, + ) + } + + /// Emit WorkflowStep::Failed event + pub fn step_failed( + &self, + workflow_name: &str, + step_index: usize, + error: &str, + data: JsonValue, + ) -> tokio::task::JoinHandle> { + self.append( + EventScope::WorkflowStep, + EventType::Failed, + format!("{}:step:{}", workflow_name, step_index), + ComponentStatus::Failed, + workflow_name.to_string(), + Some(error.to_string()), + data, + ) + } + /// Subscribe to real-time event stream /// Returns a receiver that will get all future events pub fn subscribe(&self) -> broadcast::Receiver { diff --git a/src/event_test.rs b/src/event_test.rs index ba21d98..a44542a 100644 --- a/src/event_test.rs +++ b/src/event_test.rs @@ -1,6 +1,6 @@ #[cfg(test)] mod tests { - use crate::event::{EventStream, EventType}; + use crate::event::{ComponentStatus, EventScope, EventStream, EventType}; use serde_json::json; #[test] @@ -16,16 +16,16 @@ mod tests { let stream = EventStream::new(); let event = stream - .append( - EventType::WorkflowStarted, - "wf_123".to_string(), - json!({"step_count": 3}), - ) + .workflow_started("wf_123", json!({"step_count": 3})) .await + .unwrap() .unwrap(); assert_eq!(event.offset, 0); - assert_eq!(event.event_type, EventType::WorkflowStarted); + assert_eq!(event.scope, EventScope::Workflow); + assert_eq!(event.event_type, EventType::Started); + assert_eq!(event.component_id, "wf_123"); + assert_eq!(event.status, ComponentStatus::Running); assert_eq!(event.workflow_id, "wf_123"); // Give async task time to complete @@ -38,19 +38,9 @@ mod tests { async fn test_event_stream_multiple_events() { let stream = EventStream::new(); - stream.append(EventType::WorkflowStarted, "wf_123".to_string(), json!({})); - - stream.append( - EventType::WorkflowStepStarted, - "wf_123".to_string(), - json!({"step_index": 0}), - ); - - stream.append( - EventType::WorkflowStepCompleted, - "wf_123".to_string(), - json!({"step_index": 0}), - ); + stream.workflow_started("wf_123", json!({})); + stream.step_started("wf_123", 0, json!({"step_name": "first"})); + stream.step_completed("wf_123", 0, json!({})); // Give async tasks time to complete tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; @@ -58,41 +48,12 @@ mod tests { assert_eq!(stream.current_offset(), 3); } - #[tokio::test] - async fn test_event_stream_from_offset() { - let stream = EventStream::new(); - - stream.append(EventType::WorkflowStarted, "wf_123".to_string(), json!({})); - stream.append( - EventType::WorkflowStepStarted, - "wf_123".to_string(), - json!({}), - ); - stream.append( - EventType::WorkflowStepCompleted, - "wf_123".to_string(), - json!({}), - ); - - // Give async tasks time to complete - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - - let events = stream.from_offset(1); - assert_eq!(events.len(), 2); - assert_eq!(events[0].offset, 1); - assert_eq!(events[1].offset, 2); - } - #[tokio::test] async fn test_event_stream_all() { let stream = EventStream::new(); - stream.append(EventType::WorkflowStarted, "wf_123".to_string(), json!({})); - stream.append( - EventType::WorkflowCompleted, - "wf_123".to_string(), - json!({}), - ); + stream.workflow_started("wf_123", json!({})); + stream.workflow_completed("wf_123", json!({})); // Give async tasks time to complete tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; @@ -101,53 +62,18 @@ mod tests { assert_eq!(all_events.len(), 2); } - #[tokio::test] - async fn test_event_stream_subscribe() { - let stream = EventStream::new(); - let mut receiver = stream.subscribe(); - - // Spawn a task to send an event - let stream_clone = stream.clone(); - tokio::spawn(async move { - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - stream_clone.append(EventType::WorkflowStarted, "wf_test".to_string(), json!({})); - }); - - // Receive the event - let event = - tokio::time::timeout(tokio::time::Duration::from_secs(1), receiver.recv()).await; - - assert!(event.is_ok()); - let event = event.unwrap().unwrap(); - assert_eq!(event.event_type, EventType::WorkflowStarted); - assert_eq!(event.workflow_id, "wf_test"); - } - - #[tokio::test] - async fn test_event_with_parent() { - let stream = EventStream::new(); - - let event = stream - .append_with_parent( - EventType::WorkflowStarted, - "wf_child".to_string(), - Some("wf_parent".to_string()), - json!({}), - ) - .await - .unwrap(); - - assert_eq!(event.parent_workflow_id, Some("wf_parent".to_string())); - } - #[test] fn test_event_type_serialization() { - let event_type = EventType::WorkflowStarted; + let event_type = EventType::Started; let json = serde_json::to_string(&event_type).unwrap(); - assert_eq!(json, "\"workflow_started\""); + assert_eq!(json, "\"started\""); - let event_type = EventType::AgentLlmStreamChunk; - let json = serde_json::to_string(&event_type).unwrap(); - assert_eq!(json, "\"agent_llm_stream_chunk\""); + let scope = EventScope::LlmRequest; + let json = serde_json::to_string(&scope).unwrap(); + assert_eq!(json, "\"llm_request\""); + + let status = ComponentStatus::Running; + let json = serde_json::to_string(&status).unwrap(); + assert_eq!(json, "\"running\""); } -} +} diff --git a/src/lib.rs b/src/lib.rs index 9f9128b..b771530 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ pub use error::{ AgentError, AgentErrorCode, ConfigError, ConfigErrorCode, LlmError, LlmErrorCode, RuntimeError, ToolError, ToolErrorCode, WorkflowError, WorkflowErrorCode, }; -pub use event::{Event, EventStream, EventType}; +pub use event::{ComponentStatus, Event, EventScope, EventStream, EventType}; pub use llm::{ChatClient, ChatMessage, ChatRequest, ChatResponse, Role}; pub use logging::FileLogger; pub use retry::RetryPolicy; @@ -43,7 +43,7 @@ pub use workflow::{Workflow, WorkflowBuilder, WorkflowState}; // Prelude module for convenient imports in tests and examples pub mod prelude { pub use crate::agent::{Agent, AgentConfig}; - pub use crate::event::{Event, EventStream, EventType}; + pub use crate::event::{ComponentStatus, Event, EventScope, EventStream, EventType}; pub use crate::llm::{ChatClient, ChatMessage, ChatRequest, ChatResponse, Role}; pub use crate::step_impls::{AgentStep, ConditionalStep, SubWorkflowStep, TransformStep}; pub use crate::tool::{NativeTool, Tool, ToolRegistry}; diff --git a/src/runtime.rs b/src/runtime.rs index 29e1ecc..bbd00e3 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,5 +1,5 @@ use crate::{ - event::{Event, EventStream, EventType}, + event::{Event, EventStream}, step::{StepInput, StepInputMetadata, StepType}, step_impls::SubWorkflowStep, workflow::{Workflow, WorkflowRun, WorkflowState, WorkflowStepRecord}, @@ -35,11 +35,9 @@ impl Runtime { ) -> WorkflowRun { let workflow_id = workflow.id.clone(); - // Emit workflow started event - self.event_stream.append_with_parent( - EventType::WorkflowStarted, - workflow_id.clone(), - parent_workflow_id.clone(), + // Emit Workflow::Started event + self.event_stream.workflow_started( + &workflow_id, serde_json::json!({ "step_count": workflow.steps.len(), "parent_workflow_id": parent_workflow_id, @@ -64,13 +62,11 @@ impl Runtime { let step_type_enum = step.step_type(); let step_type = format!("{:?}", step_type_enum); - // Emit step started event - self.event_stream.append_with_parent( - EventType::WorkflowStepStarted, - workflow_id.clone(), - parent_workflow_id.clone(), + // Emit WorkflowStep::Started event + self.event_stream.step_started( + &workflow_id, + step_index, serde_json::json!({ - "step_index": step_index, "step_name": &step_name, "step_type": &step_type, }), @@ -109,13 +105,11 @@ impl Runtime { match result { Ok(output) => { - // Emit step completed - self.event_stream.append_with_parent( - EventType::WorkflowStepCompleted, - workflow_id.clone(), - parent_workflow_id.clone(), + // Emit WorkflowStep::Completed event + self.event_stream.step_completed( + &workflow_id, + step_index, serde_json::json!({ - "step_index": step_index, "step_name": &step_name, "execution_time_ms": output.metadata.execution_time_ms, }), @@ -135,25 +129,23 @@ impl Runtime { current_data = output.data; } Err(e) => { - // Emit step failed - self.event_stream.append_with_parent( - EventType::AgentFailed, // TODO: Add StepFailed event type - workflow_id.clone(), - parent_workflow_id.clone(), + // Emit WorkflowStep::Failed event + self.event_stream.step_failed( + &workflow_id, + step_index, + &e.to_string(), serde_json::json!({ "step_name": &step_name, - "error": e.to_string(), }), ); - // Emit workflow failed - self.event_stream.append_with_parent( - EventType::WorkflowFailed, - workflow_id.clone(), - parent_workflow_id.clone(), + // Emit Workflow::Failed event + self.event_stream.workflow_failed( + &workflow_id, + &e.to_string(), serde_json::json!({ - "error": e.to_string(), "failed_step": step_index, + "failed_step_name": &step_name, }), ); @@ -169,10 +161,8 @@ impl Runtime { run.state = WorkflowState::Completed; workflow.state = WorkflowState::Completed; - self.event_stream.append_with_parent( - EventType::WorkflowCompleted, - workflow_id.clone(), - parent_workflow_id.clone(), + self.event_stream.workflow_completed( + &workflow_id, serde_json::json!({ "steps_completed": run.steps.len(), }), diff --git a/tests/load_tests.rs b/tests/load_tests.rs index 7921112..9a16197 100644 --- a/tests/load_tests.rs +++ b/tests/load_tests.rs @@ -163,10 +163,14 @@ async fn test_event_broadcast_to_multiple_subscribers() { id: i.to_string(), offset: i as u64, timestamp: chrono::Utc::now(), - event_type: agent_runtime::EventType::AgentProcessing, + scope: agent_runtime::EventScope::Agent, + event_type: agent_runtime::EventType::Started, + component_id: format!("agent_{}", i), + status: agent_runtime::ComponentStatus::Running, workflow_id: format!("workflow_{}", i), parent_workflow_id: None, - data: json!({"agent_name": format!("agent_{}", i), "step_index": i}), + message: None, + data: json!({"step_index": i}), }; let _ = tx.send(event); } @@ -374,10 +378,14 @@ async fn test_event_throughput() { id: i.to_string(), offset: i as u64, timestamp: chrono::Utc::now(), - event_type: agent_runtime::EventType::AgentCompleted, + scope: agent_runtime::EventScope::Agent, + event_type: agent_runtime::EventType::Completed, + component_id: format!("agent_{}", i), + status: agent_runtime::ComponentStatus::Completed, workflow_id: format!("workflow_{}", i), parent_workflow_id: None, - data: json!({"agent_name": format!("agent_{}", i), "step_index": i}), + message: None, + data: json!({"step_index": i}), }; let _ = tx.send(event); }