diff --git a/lib/mars/execution_context.rb b/lib/mars/execution_context.rb index 381de13..2e4b7be 100644 --- a/lib/mars/execution_context.rb +++ b/lib/mars/execution_context.rb @@ -11,11 +11,11 @@ def initialize(input: nil, global_state: {}) end def [](step_name) - outputs[step_name] + outputs[step_name.to_sym] end def record(step_name, output) - @outputs[step_name] = output + @outputs[step_name.to_sym] = output @current_input = output end diff --git a/lib/mars/rendering/graph.rb b/lib/mars/rendering/graph.rb index b2ab350..6cea810 100644 --- a/lib/mars/rendering/graph.rb +++ b/lib/mars/rendering/graph.rb @@ -4,6 +4,7 @@ module MARS module Rendering module Graph def self.include_extensions + MARS::Runnable.include(Runnable) MARS::AgentStep.include(AgentStep) MARS::Gate.include(Gate) MARS::Workflows::Sequential.include(SequentialWorkflow) diff --git a/lib/mars/rendering/graph/runnable.rb b/lib/mars/rendering/graph/runnable.rb new file mode 100644 index 0000000..c3b3549 --- /dev/null +++ b/lib/mars/rendering/graph/runnable.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module MARS + module Rendering + module Graph + module Runnable + include Base + + def to_graph(builder, parent_id: nil, value: nil) + builder.add_node(node_id, name, Node::STEP) + builder.add_edge(parent_id, node_id, value) + + [node_id] + end + end + end + end +end diff --git a/lib/mars/rendering/html.rb b/lib/mars/rendering/html.rb new file mode 100644 index 0000000..d8a1cb2 --- /dev/null +++ b/lib/mars/rendering/html.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +module MARS + module Rendering + class Html + MERMAID_CDN = "https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js" + + def initialize(obj) + @mermaid = Mermaid.new(obj) + end + + def render + diagram = @mermaid.graph_mermaid.join("\n") + direction = "LR" + + <<~HTML + + +
+ + +
+ flowchart #{direction}
+ #{diagram}
+
+
+
+
+ HTML
+ end
+ end
+ end
+end
diff --git a/lib/mars/workflows/parallel.rb b/lib/mars/workflows/parallel.rb
index 99667cc..12c5d41 100644
--- a/lib/mars/workflows/parallel.rb
+++ b/lib/mars/workflows/parallel.rb
@@ -11,11 +11,15 @@ def initialize(name, steps:, aggregator: nil, **kwargs)
end
def run(input)
+ context = ensure_context(input)
errors = []
- results = execute_steps(input, errors)
+ child_contexts = []
+ results = execute_steps(context, errors, child_contexts)
raise AggregateError, errors if errors.any?
+ context.merge(child_contexts)
+
has_global_halt = results.any? { |r| r.is_a?(Halt) && r.global? }
unwrapped = results.map { |r| r.is_a?(Halt) ? r.result : r }
result = aggregator.run(unwrapped)
@@ -26,11 +30,27 @@ def run(input)
attr_reader :steps, :aggregator
- def execute_steps(input, errors)
+ def execute_steps(context, errors, child_contexts)
Async do |workflow|
tasks = steps.map do |step|
+ child_ctx = context.fork
+ child_contexts << child_ctx
+
workflow.async do
- step.run(input)
+ step.run_before_hooks(child_ctx)
+
+ step_input = step.formatter.format_input(child_ctx)
+ result = step.run(step_input)
+
+ if result.is_a?(Halt)
+ step.run_after_hooks(child_ctx, result)
+ result
+ else
+ formatted = step.formatter.format_output(result)
+ child_ctx.record(step.name, formatted)
+ step.run_after_hooks(child_ctx, formatted)
+ formatted
+ end
rescue StandardError => e
errors << { error: e, step_name: step.name }
end
@@ -39,6 +59,10 @@ def execute_steps(input, errors)
tasks.map(&:wait)
end.result
end
+
+ def ensure_context(input)
+ input.is_a?(ExecutionContext) ? input : ExecutionContext.new(input: input)
+ end
end
end
end
diff --git a/lib/mars/workflows/sequential.rb b/lib/mars/workflows/sequential.rb
index 9f7a6ed..008dd18 100644
--- a/lib/mars/workflows/sequential.rb
+++ b/lib/mars/workflows/sequential.rb
@@ -10,23 +10,41 @@ def initialize(name, steps:, **kwargs)
end
def run(input)
- @steps.each do |step|
- input = step.run(input)
-
- next unless input.is_a?(Halt)
-
- return input if input.global?
+ context = ensure_context(input)
- input = input.result
- break
+ @steps.each do |step|
+ step.run_before_hooks(context)
+
+ step_input = step.formatter.format_input(context)
+ result = step.run(step_input)
+
+ if result.is_a?(Halt)
+ if result.global?
+ step.run_after_hooks(context, result)
+ return result
+ end
+
+ formatted = step.formatter.format_output(result.result)
+ context.record(step.name, formatted)
+ step.run_after_hooks(context, formatted)
+ break
+ end
+
+ formatted = step.formatter.format_output(result)
+ context.record(step.name, formatted)
+ step.run_after_hooks(context, formatted)
end
- input
+ context.current_input
end
private
attr_reader :steps
+
+ def ensure_context(input)
+ input.is_a?(ExecutionContext) ? input : ExecutionContext.new(input: input)
+ end
end
end
end
diff --git a/spec/mars/rendering/html_spec.rb b/spec/mars/rendering/html_spec.rb
new file mode 100644
index 0000000..02f8c0b
--- /dev/null
+++ b/spec/mars/rendering/html_spec.rb
@@ -0,0 +1,27 @@
+# frozen_string_literal: true
+
+RSpec.describe MARS::Rendering::Html do
+ it "renders a self-contained HTML page with mermaid diagram" do
+ step1 = MARS::AgentStep.new(name: "step1")
+ step2 = MARS::AgentStep.new(name: "step2")
+ workflow = MARS::Workflows::Sequential.new("pipeline", steps: [step1, step2])
+
+ html = described_class.new(workflow).render
+
+ expect(html).to include("")
+ expect(html).to include("mermaid")
+ expect(html).to include("flowchart LR")
+ expect(html).to include("step1")
+ expect(html).to include("step2")
+ expect(html).to include("mermaid.initialize")
+ end
+
+ it "includes the Mermaid CDN script" do
+ step = MARS::AgentStep.new(name: "step")
+ workflow = MARS::Workflows::Sequential.new("simple", steps: [step])
+
+ html = described_class.new(workflow).render
+
+ expect(html).to include(MARS::Rendering::Html::MERMAID_CDN)
+ end
+end
diff --git a/spec/mars/rendering/mermaid_spec.rb b/spec/mars/rendering/mermaid_spec.rb
new file mode 100644
index 0000000..f4f8da1
--- /dev/null
+++ b/spec/mars/rendering/mermaid_spec.rb
@@ -0,0 +1,67 @@
+# frozen_string_literal: true
+
+RSpec.describe MARS::Rendering::Mermaid do
+ it "renders a custom Runnable subclass as a box node" do
+ step = Class.new(MARS::Runnable) do
+ def run(input) = input
+ end.new(name: "custom_step")
+
+ mermaid = described_class.new(step)
+ output = mermaid.render
+
+ expect(output).to include("custom_step[custom_step]")
+ end
+
+ it "renders an AgentStep as a box node" do
+ step = MARS::AgentStep.new(name: "my_agent")
+ mermaid = described_class.new(step)
+ output = mermaid.render
+
+ expect(output).to include("my_agent[my_agent]")
+ end
+
+ it "renders a Gate as a diamond node" do
+ gate = MARS::Gate.new(
+ "my_gate",
+ check: ->(_input) { :branch },
+ fallbacks: {
+ branch: Class.new(MARS::Runnable) do
+ def run(input) = input
+ end.new(name: "branch_step")
+ }
+ )
+
+ mermaid = described_class.new(gate)
+ output = mermaid.render
+
+ expect(output).to include("my_gate{my_gate}")
+ expect(output).to include("|branch|")
+ end
+
+ it "renders a Sequential workflow with subgraph" do
+ step1 = MARS::AgentStep.new(name: "step1")
+ step2 = MARS::AgentStep.new(name: "step2")
+ workflow = MARS::Workflows::Sequential.new("pipeline", steps: [step1, step2])
+
+ mermaid = described_class.new(workflow)
+ output = mermaid.render
+
+ expect(output).to include("subgraph pipeline")
+ expect(output).to include("step1[step1]")
+ expect(output).to include("step2[step2]")
+ end
+
+ it "renders a Parallel workflow with aggregator" do
+ step1 = MARS::AgentStep.new(name: "step1")
+ step2 = MARS::AgentStep.new(name: "step2")
+ workflow = MARS::Workflows::Parallel.new("parallel", steps: [step1, step2])
+
+ mermaid = described_class.new(workflow)
+ output = mermaid.render
+
+ expect(output).to include("subgraph parallel")
+ expect(output).to include("step1[step1]")
+ expect(output).to include("step2[step2]")
+ expect(output).to include("parallel_aggregator")
+ end
+end
diff --git a/spec/mars/workflows/parallel_spec.rb b/spec/mars/workflows/parallel_spec.rb
index e0bcb38..97407c8 100644
--- a/spec/mars/workflows/parallel_spec.rb
+++ b/spec/mars/workflows/parallel_spec.rb
@@ -2,9 +2,11 @@
RSpec.describe MARS::Workflows::Parallel do
let(:sum_aggregator) { MARS::Aggregator.new("Sum Aggregator", operation: lambda(&:sum)) }
+
let(:add_step_class) do
- Class.new do
- def initialize(value)
+ Class.new(MARS::Runnable) do
+ def initialize(value, **kwargs)
+ super(**kwargs)
@value = value
end
@@ -16,8 +18,9 @@ def run(input)
end
let(:multiply_step_class) do
- Class.new do
- def initialize(multiplier)
+ Class.new(MARS::Runnable) do
+ def initialize(multiplier, **kwargs)
+ super(**kwargs)
@multiplier = multiplier
end
@@ -28,12 +31,10 @@ def run(input)
end
let(:error_step_class) do
- Class.new do
- attr_reader :name
-
- def initialize(message, name)
+ Class.new(MARS::Runnable) do
+ def initialize(message, **kwargs)
+ super(**kwargs)
@message = message
- @name = name
end
def run(_input)
@@ -44,9 +45,9 @@ def run(_input)
describe "#run" do
it "executes steps in parallel without an aggregator" do
- add_five = add_step_class.new(5)
- multiply_three = multiply_step_class.new(3)
- add_two = add_step_class.new(2)
+ add_five = add_step_class.new(5, name: "add_five")
+ multiply_three = multiply_step_class.new(3, name: "multiply_three")
+ add_two = add_step_class.new(2, name: "add_two")
workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two])
@@ -55,9 +56,9 @@ def run(_input)
end
it "executes steps in parallel with a custom aggregator" do
- add_five = add_step_class.new(5)
- multiply_three = multiply_step_class.new(3)
- add_two = add_step_class.new(2)
+ add_five = add_step_class.new(5, name: "add_five")
+ multiply_three = multiply_step_class.new(3, name: "multiply_three")
+ add_two = add_step_class.new(2, name: "add_two")
workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two],
aggregator: sum_aggregator)
@@ -65,7 +66,7 @@ def run(_input)
end
it "handles single step" do
- multiply_step = multiply_step_class.new(7)
+ multiply_step = multiply_step_class.new(7, name: "multiply")
workflow = described_class.new("single_step", steps: [multiply_step])
expect(workflow.run(6)).to eq([42])
@@ -77,42 +78,123 @@ def run(_input)
expect(workflow.run(42)).to eq([])
end
+ it "records outputs in context per step" do
+ step1 = Class.new(MARS::Runnable) do
+ def run(input) = "from_step1:#{input}"
+ end.new(name: "step1")
+
+ step2 = Class.new(MARS::Runnable) do
+ def run(input) = "from_step2:#{input}"
+ end.new(name: "step2")
+
+ context = MARS::ExecutionContext.new(input: "hello")
+ workflow = described_class.new("ctx_workflow", steps: [step1, step2])
+ workflow.run(context)
+
+ expect(context[:step1]).to eq("from_step1:hello")
+ expect(context[:step2]).to eq("from_step2:hello")
+ end
+
+ it "forks context so parallel steps get independent current_input" do
+ step1 = Class.new(MARS::Runnable) do
+ def run(input) = "#{input}_modified"
+ end.new(name: "step1")
+
+ step2 = Class.new(MARS::Runnable) do
+ def run(input) = "#{input}_also_modified"
+ end.new(name: "step2")
+
+ context = MARS::ExecutionContext.new(input: "original")
+ workflow = described_class.new("fork_test", steps: [step1, step2])
+ workflow.run(context)
+
+ # Both steps received the same original input
+ expect(context[:step1]).to eq("original_modified")
+ expect(context[:step2]).to eq("original_also_modified")
+ end
+
+ it "shares global_state across forked contexts" do
+ step1 = Class.new(MARS::Runnable) do
+ def run(_input)
+ "done"
+ end
+ end.new(name: "step1")
+
+ context = MARS::ExecutionContext.new(input: "test", global_state: { shared: true })
+ workflow = described_class.new("shared_state", steps: [step1])
+ workflow.run(context)
+
+ expect(context.global_state[:shared]).to be true
+ end
+
+ it "calls formatter on each step" do
+ uppercase_formatter = Class.new(MARS::Formatter) do
+ def format_output(output)
+ output.upcase
+ end
+ end
+
+ step = Class.new(MARS::Runnable) do
+ def run(input) = "result:#{input}"
+ end.new(name: "step", formatter: uppercase_formatter.new)
+
+ workflow = described_class.new("fmt_workflow", steps: [step])
+
+ expect(workflow.run("hello")).to eq(["RESULT:HELLO"])
+ end
+
+ it "fires before_run and after_run hooks" do
+ hook_log = []
+
+ step_class = Class.new(MARS::Runnable) do
+ before_run { |_ctx, step| hook_log << "before:#{step.name}" }
+ after_run { |_ctx, _result, step| hook_log << "after:#{step.name}" }
+
+ def run(input) = input
+ end
+
+ step = step_class.new(name: "hooked")
+ workflow = described_class.new("hook_workflow", steps: [step])
+ workflow.run("test")
+
+ expect(hook_log).to eq(["before:hooked", "after:hooked"])
+ end
+
it "unwraps local halts and returns plain result" do
gate = MARS::Gate.new(
- "LocalBranch",
+ "local_branch",
check: ->(_input) { :branch },
fallbacks: {
branch: Class.new(MARS::Runnable) do
def run(input)
"branched:#{input}"
end
- end.new
+ end.new(name: "branch_step")
}
)
- add_five = add_step_class.new(5)
+ add_five = add_step_class.new(5, name: "add_five")
workflow = described_class.new("halt_workflow", steps: [gate, add_five])
result = workflow.run(10)
- # Local halts are unwrapped, aggregated as plain values
expect(result).not_to be_a(MARS::Halt)
expect(result).to eq(["branched:10", 15])
end
it "propagates global halt to parent workflow" do
gate = MARS::Gate.new(
- "GlobalBranch",
+ "global_branch",
check: ->(_input) { :branch },
fallbacks: {
branch: Class.new(MARS::Runnable) do
def run(input)
"branched:#{input}"
end
- end.new
+ end.new(name: "branch_step")
},
halt_scope: :global
)
- add_five = add_step_class.new(5)
+ add_five = add_step_class.new(5, name: "add_five")
workflow = described_class.new("halt_workflow", steps: [gate, add_five])
@@ -123,9 +205,9 @@ def run(input)
end
it "propagates errors from steps" do
- add_step = add_step_class.new(5)
- error_step = error_step_class.new("Step failed", "error_step_one")
- error_step_two = error_step_class.new("Step failed two", "error_step_two")
+ add_step = add_step_class.new(5, name: "add")
+ error_step = error_step_class.new("Step failed", name: "error_step_one")
+ error_step_two = error_step_class.new("Step failed two", name: "error_step_two")
workflow = described_class.new("error_workflow", steps: [add_step, error_step, error_step_two])
diff --git a/spec/mars/workflows/sequential_spec.rb b/spec/mars/workflows/sequential_spec.rb
index 01c3e44..b2ea2b5 100644
--- a/spec/mars/workflows/sequential_spec.rb
+++ b/spec/mars/workflows/sequential_spec.rb
@@ -2,8 +2,9 @@
RSpec.describe MARS::Workflows::Sequential do
let(:add_step_class) do
- Class.new do
- def initialize(value)
+ Class.new(MARS::Runnable) do
+ def initialize(value, **kwargs)
+ super(**kwargs)
@value = value
end
@@ -14,8 +15,9 @@ def run(input)
end
let(:multiply_step_class) do
- Class.new do
- def initialize(multiplier)
+ Class.new(MARS::Runnable) do
+ def initialize(multiplier, **kwargs)
+ super(**kwargs)
@multiplier = multiplier
end
@@ -26,8 +28,9 @@ def run(input)
end
let(:error_step_class) do
- Class.new do
- def initialize(message)
+ Class.new(MARS::Runnable) do
+ def initialize(message, **kwargs)
+ super(**kwargs)
@message = message
end
@@ -39,9 +42,9 @@ def run(_input)
describe "#run" do
it "executes steps sequentially" do
- add_five = add_step_class.new(5)
- multiply_three = multiply_step_class.new(3)
- add_two = add_step_class.new(2)
+ add_five = add_step_class.new(5, name: "add_five")
+ multiply_three = multiply_step_class.new(3, name: "multiply_three")
+ add_two = add_step_class.new(2, name: "add_two")
workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two])
@@ -50,7 +53,7 @@ def run(_input)
end
it "handles single step" do
- multiply_step = multiply_step_class.new(7)
+ multiply_step = multiply_step_class.new(7, name: "multiply")
workflow = described_class.new("single_step", steps: [multiply_step])
expect(workflow.run(6)).to eq(42)
@@ -62,45 +65,104 @@ def run(_input)
expect(workflow.run(42)).to eq(42)
end
+ it "records outputs in context accessible by step name" do
+ step1 = Class.new(MARS::Runnable) do
+ def run(input) = "from_step1:#{input}"
+ end.new(name: "step1")
+
+ step2 = Class.new(MARS::Runnable) do
+ def run(input) = "from_step2:#{input}"
+ end.new(name: "step2")
+
+ context = MARS::ExecutionContext.new(input: "hello")
+ workflow = described_class.new("ctx_workflow", steps: [step1, step2])
+ workflow.run(context)
+
+ expect(context[:step1]).to eq("from_step1:hello")
+ expect(context[:step2]).to eq("from_step2:from_step1:hello")
+ end
+
+ it "wraps raw input in ExecutionContext automatically" do
+ step = Class.new(MARS::Runnable) do
+ def run(input) = "processed:#{input}"
+ end.new(name: "step")
+
+ workflow = described_class.new("auto_wrap", steps: [step])
+
+ expect(workflow.run("raw")).to eq("processed:raw")
+ end
+
+ it "calls formatter on each step" do
+ uppercase_formatter = Class.new(MARS::Formatter) do
+ def format_output(output)
+ output.upcase
+ end
+ end
+
+ step = Class.new(MARS::Runnable) do
+ def run(input) = "result:#{input}"
+ end.new(name: "step", formatter: uppercase_formatter.new)
+
+ workflow = described_class.new("fmt_workflow", steps: [step])
+
+ expect(workflow.run("hello")).to eq("RESULT:HELLO")
+ end
+
+ it "fires before_run and after_run hooks" do
+ hook_log = []
+
+ step_class = Class.new(MARS::Runnable) do
+ before_run { |_ctx, step| hook_log << "before:#{step.name}" }
+ after_run { |_ctx, _result, step| hook_log << "after:#{step.name}" }
+
+ def run(input) = input
+ end
+
+ step = step_class.new(name: "hooked")
+ workflow = described_class.new("hook_workflow", steps: [step])
+ workflow.run("test")
+
+ expect(hook_log).to eq(["before:hooked", "after:hooked"])
+ end
+
it "halts locally when a gate triggers with local scope" do
- add_five = add_step_class.new(5)
+ add_five = add_step_class.new(5, name: "add_five")
gate = MARS::Gate.new(
- "LocalGate",
+ "local_gate",
check: ->(_input) { :branch },
fallbacks: {
branch: Class.new(MARS::Runnable) do
def run(input)
"branched:#{input}"
end
- end.new
+ end.new(name: "branch_step")
}
)
- multiply_three = multiply_step_class.new(3)
+ multiply_three = multiply_step_class.new(3, name: "multiply_three")
workflow = described_class.new("halt_workflow", steps: [add_five, gate, multiply_three])
# 10 + 5 = 15, gate branches -> "branched:15", multiply_three is never reached
- # Local halt is consumed — returns plain value
result = workflow.run(10)
expect(result).to eq("branched:15")
expect(result).not_to be_a(MARS::Halt)
end
it "propagates global halt without unwrapping" do
- add_five = add_step_class.new(5)
+ add_five = add_step_class.new(5, name: "add_five")
gate = MARS::Gate.new(
- "GlobalGate",
+ "global_gate",
check: ->(_input) { :branch },
fallbacks: {
branch: Class.new(MARS::Runnable) do
def run(input)
"branched:#{input}"
end
- end.new
+ end.new(name: "branch_step")
},
halt_scope: :global
)
- multiply_three = multiply_step_class.new(3)
+ multiply_three = multiply_step_class.new(3, name: "multiply_three")
workflow = described_class.new("halt_workflow", steps: [add_five, gate, multiply_three])
@@ -112,50 +174,47 @@ def run(input)
it "propagates global halt through nested sequential workflows" do
inner_gate = MARS::Gate.new(
- "InnerGate",
+ "inner_gate",
check: ->(_input) { :stop },
fallbacks: {
stop: Class.new(MARS::Runnable) do
def run(input)
"stopped:#{input}"
end
- end.new
+ end.new(name: "stop_step")
},
halt_scope: :global
)
inner = described_class.new("inner", steps: [inner_gate])
- after_inner = add_step_class.new(100)
+ after_inner = add_step_class.new(100, name: "after_inner")
outer = described_class.new("outer", steps: [inner, after_inner])
result = outer.run(1)
- # Global halt propagates through both sequential levels
expect(result).to be_a(MARS::Halt)
expect(result.result).to eq("stopped:1")
end
it "consumes local halt — outer workflow continues" do
inner_gate = MARS::Gate.new(
- "InnerGate",
+ "inner_gate",
check: ->(_input) { :stop },
fallbacks: {
stop: Class.new(MARS::Runnable) do
def run(input)
"stopped:#{input}"
end
- end.new
+ end.new(name: "stop_step")
}
- # default :local scope
)
inner = described_class.new("inner", steps: [inner_gate])
- # Inner halts locally -> returns "stopped:1" as plain value
string_step = Class.new(MARS::Runnable) do
def run(input)
"after:#{input}"
end
- end.new
+ end.new(name: "after_step")
outer = described_class.new("outer", steps: [inner, string_step])
@@ -165,8 +224,8 @@ def run(input)
end
it "propagates errors from steps" do
- add_step = add_step_class.new(5)
- error_step = error_step_class.new("Step failed")
+ add_step = add_step_class.new(5, name: "add")
+ error_step = error_step_class.new("Step failed", name: "error")
workflow = described_class.new("error_workflow", steps: [add_step, error_step])