Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,29 @@ parallel = MARS::Workflows::Parallel.new(

### Gates

Create conditional branching in your workflows:
Gates act as guards that either let the workflow continue or divert to a fallback path:

```ruby
gate = MARS::Gate.new(
"Decision Gate",
condition: ->(input) { input[:score] > 0.5 ? :success : :failure },
branches: {
success: success_workflow,
"Validation Gate",
check: ->(input) { :failure unless input[:score] > 0.5 },
fallbacks: {
failure: failure_workflow
}
)
```

Control halt scope — `:local` (default) stops only the parent workflow, `:global` propagates to the root:

```ruby
gate = MARS::Gate.new(
"Critical Gate",
check: ->(input) { :error unless input[:valid] },
fallbacks: { error: error_workflow },
halt_scope: :global
)
```

### Visualization

Generate Mermaid diagrams to visualize your workflows:
Expand Down
4 changes: 2 additions & 2 deletions examples/complex_llm_workflow/generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def tools
)

gate = MARS::Gate.new(
condition: ->(input) { input.split.length < 10 ? :success : :failure },
branches: {
check: ->(input) { :failure unless input.split.length < 10 },
fallbacks: {
failure: error_workflow
}
)
Expand Down
4 changes: 2 additions & 2 deletions examples/complex_workflow/generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class Agent5 < MARS::Agent

# Create the gate that decides between exit or continue
gate = MARS::Gate.new(
condition: ->(input) { input[:result] },
branches: {
check: ->(input) { input[:result] },
fallbacks: {
warning: sequential_workflow,
error: parallel_workflow
}
Expand Down
4 changes: 2 additions & 2 deletions examples/simple_workflow/generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class Agent3 < MARS::Agent

# Create the gate that decides between exit or continue
gate = MARS::Gate.new(
condition: ->(input) { input[:result] },
branches: {
check: ->(input) { input[:result] },
fallbacks: {
success: success_workflow
}
)
Expand Down
42 changes: 36 additions & 6 deletions lib/mars/gate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,51 @@

module MARS
class Gate < Runnable
def initialize(name = "Gate", condition:, branches:, **kwargs)
class << self
def check(&block)
@check_block = block
end

attr_reader :check_block

def fallback(key, runnable)
fallbacks_map[key] = runnable
end

def fallbacks_map
@fallbacks_map ||= {}
end

def halt_scope(scope = nil)
scope ? @halt_scope = scope : @halt_scope
end
end

def initialize(name = "Gate", check: nil, fallbacks: nil, halt_scope: nil, **kwargs)
super(name: name, **kwargs)

@condition = condition
@branches = branches
@check = check || self.class.check_block
@fallbacks = fallbacks || self.class.fallbacks_map
@halt_scope = halt_scope || self.class.halt_scope || :local
end

def run(input)
result = condition.call(input)
result = check.call(input)

return input unless result

branch = fallbacks[result]
raise ArgumentError, "No fallback registered for #{result.inspect}" unless branch

branches[result] || input
Halt.new(resolve_branch(branch).run(input), scope: @halt_scope)
end

private

attr_reader :condition, :branches
attr_reader :check, :fallbacks

def resolve_branch(branch)
branch.is_a?(Class) ? branch.new : branch
end
end
end
15 changes: 15 additions & 0 deletions lib/mars/halt.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

module MARS
class Halt
attr_reader :result, :scope

def initialize(result, scope: :local)
@result = result
@scope = scope
end

def local? = scope == :local
def global? = scope == :global
end
end
4 changes: 2 additions & 2 deletions lib/mars/rendering/graph/gate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ def to_graph(builder, parent_id: nil, value: nil)
builder.add_node(node_id, name, Node::GATE)
builder.add_edge(parent_id, node_id, value)

sink_nodes = branches.map do |condition_result, branch|
branch.to_graph(builder, parent_id: node_id, value: condition_result)
sink_nodes = fallbacks.map do |fallback_key, branch|
branch.to_graph(builder, parent_id: node_id, value: fallback_key)
end

sink_nodes.flatten
Expand Down
27 changes: 17 additions & 10 deletions lib/mars/workflows/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,23 @@ def initialize(name, steps:, aggregator: nil, **kwargs)

def run(input)
errors = []
results = Async do |workflow|
tasks = @steps.map do |step|
results = execute_steps(input, errors)

raise AggregateError, errors if errors.any?

has_global_halt = results.any? { |r| r.is_a?(Halt) && r.global? }
unwrapped = results.map { |r| r.is_a?(Halt) ? r.result : r }
result = aggregator.run(unwrapped)
has_global_halt ? Halt.new(result, scope: :global) : result
end

private

attr_reader :steps, :aggregator

def execute_steps(input, errors)
Async do |workflow|
tasks = steps.map do |step|
workflow.async do
step.run(input)
rescue StandardError => e
Expand All @@ -23,15 +38,7 @@ def run(input)

tasks.map(&:wait)
end.result

raise AggregateError, errors if errors.any?

aggregator.run(results)
end

private

attr_reader :steps, :aggregator
end
end
end
16 changes: 8 additions & 8 deletions lib/mars/workflows/sequential.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ def initialize(name, steps:, **kwargs)

def run(input)
@steps.each do |step|
result = step.run(input)

if result.is_a?(Runnable)
input = result.run(input)
break
else
input = result
end
input = step.run(input)

next unless input.is_a?(Halt)

return input if input.global?

input = input.result
break
end

input
Expand Down
6 changes: 3 additions & 3 deletions spec/mars/aggregator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

RSpec.describe MARS::Aggregator do
describe "#run" do
context "when called without a block" do
context "when called without an operation" do
let(:aggregator) { described_class.new }

it "returns the input as is" do
Expand All @@ -11,10 +11,10 @@
end
end

context "when initialized with a block operation" do
context "when initialized with an operation" do
let(:aggregator) { described_class.new("Aggregator", operation: lambda(&:join)) }

it "executes the block and returns its value" do
it "executes the operation and returns its value" do
result = aggregator.run(%w[a b c])
expect(result).to eq("abc")
end
Expand Down
Loading