A comprehensive Rust implementation of LangGraph, a library for building stateful, multi-actor applications with LLMs. This implementation provides the same core functionality as the original Python version with Rust's performance, safety, and concurrency benefits.
- Stateful Graph Orchestration: Build complex, stateful applications using a graph-based approach
- Async/Await Support: Full async ecosystem integration with Tokio
- Checkpointing: Persistent state management with multiple storage backends
- Streaming: Real-time execution with event streaming
- Human-in-the-Loop: Built-in support for human approval workflows
- Type Safety: Leverage Rust's type system for reliable graph execution
- Concurrent Execution: Efficient parallel node execution
- Flexible Serialization: Multiple serialization protocols (JSON, MessagePack, compression)
- Observability: Comprehensive monitoring, tracing, and debugging toolkit similar to LangSmith
- StateGraph: The main graph building interface
- Pregel Engine: Distributed graph computation engine inspired by Google's Pregel
- Channels: Communication system between nodes
- Checkpointing: State persistence and recovery
- Streaming: Real-time event emission and processing
GraphState: Trait for defining application stateNodeFunction: Async trait for node implementationsExecutionContext: Runtime context with configuration and metadataStreamEvent: Event types for real-time updates
Add to your Cargo.toml:
[dependencies]
rust-langgraph = { path = "path/to/rust-langgraph" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }use rust_langgraph::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct State {
messages: Vec<String>,
count: u32,
}
async fn my_node(state: State, _ctx: ExecutionContext) -> GraphResult<State> {
Ok(State {
messages: {
let mut msgs = state.messages;
msgs.push("Hello from node!".to_string());
msgs
},
count: state.count + 1,
})
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Build graph
let mut graph = StateGraph::<State>::new();
graph.add_node("my_node", my_node)?;
graph.add_edge(START, "my_node")?;
graph.add_edge("my_node", END)?;
// Compile and run
let app = graph.compile().await?;
let initial_state = State { messages: vec![], count: 0 };
let result = app.invoke(initial_state).await?;
println!("Final state: {:?}", result);
Ok(())
}Demonstrates:
- Simple state management
- Node creation and connection
- Conditional routing
- Basic streaming
Demonstrates:
- Human-in-the-loop patterns
- Checkpointing
- Complex conditional logic
- Multi-step workflows
Demonstrates:
- Real-time event streaming
- Different streaming modes
- Token-by-token processing
- Progress monitoring
Demonstrates (Comprehensive observability and debugging toolkit):
- LangSmith-like monitoring and debugging
- Real-time dashboard at http://localhost:3000
- Distributed tracing with OpenTelemetry
- Prometheus metrics collection
- Prompt analysis and optimization
- WebSocket-based live event streaming
Demonstrates:
- Per-key channel configuration with
set_channel_type BinaryOp(add)+AccumulatorbehaviorBinaryOp(max)+BinaryOp(concat)behavior- Parallel superstep merge semantics
Run examples:
cargo run --example basic_agent
cargo run --example advanced_workflow
cargo run --example streaming
cargo run --example channels_api
# Run the observability demo
cargo run --example observability_demo
# OR use the convenience script
./run_observability_demo.shLangGraph Rust includes a comprehensive observability toolkit that provides LangSmith-like functionality for monitoring and debugging your graph applications:
- Real-time Dashboard: Web UI at http://localhost:3000 for monitoring runs
- Distributed Tracing: OpenTelemetry integration with Jaeger/OTLP support
- Metrics Collection: Prometheus-compatible metrics for performance monitoring
- Prompt Analysis: Automatic analysis of LLM interactions with optimization suggestions
- Event Streaming: WebSocket-based real-time event streaming
- Multiple Storage: In-memory, SQLite, and PostgreSQL backends
use langgraph_observability::{Observability, ObservabilityConfig};
let observability = Observability::new(
ObservabilityConfig::builder()
.with_tracing(true)
.with_metrics(true)
.with_dashboard(true)
.build()
).await?;
observability.start_dashboard("127.0.0.1:3000").await?;
let observer = observability.create_graph_observer();
// Use observer with your graphs...See crates/langgraph-observability/README.md for detailed documentation.
The main interface for building graphs:
let mut graph = StateGraph::<MyState>::new();
// Add nodes
graph.add_node("node_name", node_function)?;
// Add edges
graph.add_edge(START, "node_name")?;
graph.add_edge("node_name", END)?;
graph.add_conditional_edge("node_name", condition_fn, targets)?;
// Compile
let app = graph.compile().await?;Execute graphs in different ways:
// Simple execution
let result = app.invoke(initial_state).await?;
// Streaming execution
let mut stream = app.stream(initial_state).await?;
while let Some(event) = stream.next().await {
// Process streaming events
}
// With configuration
let config = ExecutionConfig {
thread_id: Some("thread-1".to_string()),
recursion_limit: 100,
stream_mode: StreamMode::Values,
..Default::default()
};
let result = app.invoke_with_config(initial_state, config).await?;Persist and restore state:
use rust_langgraph::checkpoint::InMemoryCheckpointer;
let checkpointer = InMemoryCheckpointer::new();
// Use with graphs for automatic state persistenceValues: Stream complete state after each nodeUpdates: Stream only state changes from each nodeDebug: Stream detailed execution information
Run tests for all crates:
# Run all tests
cargo test
# Run tests for specific crate
cargo test -p langgraph-core
# Run with output
cargo test -- --nocapture- Core graph building and execution
- Pregel-based execution engine
- State management and type system
- Basic streaming support
- In-memory checkpointing
- Channel system for communication
- Error handling and validation
- Comprehensive examples
- Observability Toolkit - Complete LangSmith-like monitoring system
- Real-time web dashboard
- Distributed tracing with OpenTelemetry
- Prometheus metrics collection
- Prompt analysis and optimization
- WebSocket event streaming
- Additional checkpoint backends (SQLite, PostgreSQL, Redis)
- Advanced streaming features
- Performance optimizations
- Integration with LLM libraries
- More prebuilt agent types
- Plugin system for extensions
- Advanced graph optimization features
- Integration with more LLM providers
This project is licensed under the MIT License - see the LICENSE file for details.
- Original LangGraph Python implementation
- Google's Pregel paper for the distributed graph computation model