diff --git a/lib/mars/execution_context.rb b/lib/mars/execution_context.rb index 381de13..2e4b7be 100644 --- a/lib/mars/execution_context.rb +++ b/lib/mars/execution_context.rb @@ -11,11 +11,11 @@ def initialize(input: nil, global_state: {}) end def [](step_name) - outputs[step_name] + outputs[step_name.to_sym] end def record(step_name, output) - @outputs[step_name] = output + @outputs[step_name.to_sym] = output @current_input = output end diff --git a/lib/mars/rendering/graph.rb b/lib/mars/rendering/graph.rb index b2ab350..6cea810 100644 --- a/lib/mars/rendering/graph.rb +++ b/lib/mars/rendering/graph.rb @@ -4,6 +4,7 @@ module MARS module Rendering module Graph def self.include_extensions + MARS::Runnable.include(Runnable) MARS::AgentStep.include(AgentStep) MARS::Gate.include(Gate) MARS::Workflows::Sequential.include(SequentialWorkflow) diff --git a/lib/mars/rendering/graph/runnable.rb b/lib/mars/rendering/graph/runnable.rb new file mode 100644 index 0000000..c3b3549 --- /dev/null +++ b/lib/mars/rendering/graph/runnable.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module MARS + module Rendering + module Graph + module Runnable + include Base + + def to_graph(builder, parent_id: nil, value: nil) + builder.add_node(node_id, name, Node::STEP) + builder.add_edge(parent_id, node_id, value) + + [node_id] + end + end + end + end +end diff --git a/lib/mars/rendering/html.rb b/lib/mars/rendering/html.rb new file mode 100644 index 0000000..d8a1cb2 --- /dev/null +++ b/lib/mars/rendering/html.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +module MARS + module Rendering + class Html + MERMAID_CDN = "https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js" + + def initialize(obj) + @mermaid = Mermaid.new(obj) + end + + def render + diagram = @mermaid.graph_mermaid.join("\n") + direction = "LR" + + <<~HTML + + + + + + MARS Workflow + + + +
+          flowchart #{direction}
+          #{diagram}
+            
+ + + + HTML + end + end + end +end diff --git a/lib/mars/workflows/parallel.rb b/lib/mars/workflows/parallel.rb index 99667cc..12c5d41 100644 --- a/lib/mars/workflows/parallel.rb +++ b/lib/mars/workflows/parallel.rb @@ -11,11 +11,15 @@ def initialize(name, steps:, aggregator: nil, **kwargs) end def run(input) + context = ensure_context(input) errors = [] - results = execute_steps(input, errors) + child_contexts = [] + results = execute_steps(context, errors, child_contexts) raise AggregateError, errors if errors.any? + context.merge(child_contexts) + has_global_halt = results.any? { |r| r.is_a?(Halt) && r.global? } unwrapped = results.map { |r| r.is_a?(Halt) ? r.result : r } result = aggregator.run(unwrapped) @@ -26,11 +30,27 @@ def run(input) attr_reader :steps, :aggregator - def execute_steps(input, errors) + def execute_steps(context, errors, child_contexts) Async do |workflow| tasks = steps.map do |step| + child_ctx = context.fork + child_contexts << child_ctx + workflow.async do - step.run(input) + step.run_before_hooks(child_ctx) + + step_input = step.formatter.format_input(child_ctx) + result = step.run(step_input) + + if result.is_a?(Halt) + step.run_after_hooks(child_ctx, result) + result + else + formatted = step.formatter.format_output(result) + child_ctx.record(step.name, formatted) + step.run_after_hooks(child_ctx, formatted) + formatted + end rescue StandardError => e errors << { error: e, step_name: step.name } end @@ -39,6 +59,10 @@ def execute_steps(input, errors) tasks.map(&:wait) end.result end + + def ensure_context(input) + input.is_a?(ExecutionContext) ? input : ExecutionContext.new(input: input) + end end end end diff --git a/lib/mars/workflows/sequential.rb b/lib/mars/workflows/sequential.rb index 9f7a6ed..008dd18 100644 --- a/lib/mars/workflows/sequential.rb +++ b/lib/mars/workflows/sequential.rb @@ -10,23 +10,41 @@ def initialize(name, steps:, **kwargs) end def run(input) - @steps.each do |step| - input = step.run(input) - - next unless input.is_a?(Halt) - - return input if input.global? + context = ensure_context(input) - input = input.result - break + @steps.each do |step| + step.run_before_hooks(context) + + step_input = step.formatter.format_input(context) + result = step.run(step_input) + + if result.is_a?(Halt) + if result.global? + step.run_after_hooks(context, result) + return result + end + + formatted = step.formatter.format_output(result.result) + context.record(step.name, formatted) + step.run_after_hooks(context, formatted) + break + end + + formatted = step.formatter.format_output(result) + context.record(step.name, formatted) + step.run_after_hooks(context, formatted) end - input + context.current_input end private attr_reader :steps + + def ensure_context(input) + input.is_a?(ExecutionContext) ? input : ExecutionContext.new(input: input) + end end end end diff --git a/spec/mars/rendering/html_spec.rb b/spec/mars/rendering/html_spec.rb new file mode 100644 index 0000000..02f8c0b --- /dev/null +++ b/spec/mars/rendering/html_spec.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +RSpec.describe MARS::Rendering::Html do + it "renders a self-contained HTML page with mermaid diagram" do + step1 = MARS::AgentStep.new(name: "step1") + step2 = MARS::AgentStep.new(name: "step2") + workflow = MARS::Workflows::Sequential.new("pipeline", steps: [step1, step2]) + + html = described_class.new(workflow).render + + expect(html).to include("") + expect(html).to include("mermaid") + expect(html).to include("flowchart LR") + expect(html).to include("step1") + expect(html).to include("step2") + expect(html).to include("mermaid.initialize") + end + + it "includes the Mermaid CDN script" do + step = MARS::AgentStep.new(name: "step") + workflow = MARS::Workflows::Sequential.new("simple", steps: [step]) + + html = described_class.new(workflow).render + + expect(html).to include(MARS::Rendering::Html::MERMAID_CDN) + end +end diff --git a/spec/mars/rendering/mermaid_spec.rb b/spec/mars/rendering/mermaid_spec.rb new file mode 100644 index 0000000..f4f8da1 --- /dev/null +++ b/spec/mars/rendering/mermaid_spec.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +RSpec.describe MARS::Rendering::Mermaid do + it "renders a custom Runnable subclass as a box node" do + step = Class.new(MARS::Runnable) do + def run(input) = input + end.new(name: "custom_step") + + mermaid = described_class.new(step) + output = mermaid.render + + expect(output).to include("custom_step[custom_step]") + end + + it "renders an AgentStep as a box node" do + step = MARS::AgentStep.new(name: "my_agent") + mermaid = described_class.new(step) + output = mermaid.render + + expect(output).to include("my_agent[my_agent]") + end + + it "renders a Gate as a diamond node" do + gate = MARS::Gate.new( + "my_gate", + check: ->(_input) { :branch }, + fallbacks: { + branch: Class.new(MARS::Runnable) do + def run(input) = input + end.new(name: "branch_step") + } + ) + + mermaid = described_class.new(gate) + output = mermaid.render + + expect(output).to include("my_gate{my_gate}") + expect(output).to include("|branch|") + end + + it "renders a Sequential workflow with subgraph" do + step1 = MARS::AgentStep.new(name: "step1") + step2 = MARS::AgentStep.new(name: "step2") + workflow = MARS::Workflows::Sequential.new("pipeline", steps: [step1, step2]) + + mermaid = described_class.new(workflow) + output = mermaid.render + + expect(output).to include("subgraph pipeline") + expect(output).to include("step1[step1]") + expect(output).to include("step2[step2]") + end + + it "renders a Parallel workflow with aggregator" do + step1 = MARS::AgentStep.new(name: "step1") + step2 = MARS::AgentStep.new(name: "step2") + workflow = MARS::Workflows::Parallel.new("parallel", steps: [step1, step2]) + + mermaid = described_class.new(workflow) + output = mermaid.render + + expect(output).to include("subgraph parallel") + expect(output).to include("step1[step1]") + expect(output).to include("step2[step2]") + expect(output).to include("parallel_aggregator") + end +end diff --git a/spec/mars/workflows/parallel_spec.rb b/spec/mars/workflows/parallel_spec.rb index e0bcb38..97407c8 100644 --- a/spec/mars/workflows/parallel_spec.rb +++ b/spec/mars/workflows/parallel_spec.rb @@ -2,9 +2,11 @@ RSpec.describe MARS::Workflows::Parallel do let(:sum_aggregator) { MARS::Aggregator.new("Sum Aggregator", operation: lambda(&:sum)) } + let(:add_step_class) do - Class.new do - def initialize(value) + Class.new(MARS::Runnable) do + def initialize(value, **kwargs) + super(**kwargs) @value = value end @@ -16,8 +18,9 @@ def run(input) end let(:multiply_step_class) do - Class.new do - def initialize(multiplier) + Class.new(MARS::Runnable) do + def initialize(multiplier, **kwargs) + super(**kwargs) @multiplier = multiplier end @@ -28,12 +31,10 @@ def run(input) end let(:error_step_class) do - Class.new do - attr_reader :name - - def initialize(message, name) + Class.new(MARS::Runnable) do + def initialize(message, **kwargs) + super(**kwargs) @message = message - @name = name end def run(_input) @@ -44,9 +45,9 @@ def run(_input) describe "#run" do it "executes steps in parallel without an aggregator" do - add_five = add_step_class.new(5) - multiply_three = multiply_step_class.new(3) - add_two = add_step_class.new(2) + add_five = add_step_class.new(5, name: "add_five") + multiply_three = multiply_step_class.new(3, name: "multiply_three") + add_two = add_step_class.new(2, name: "add_two") workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two]) @@ -55,9 +56,9 @@ def run(_input) end it "executes steps in parallel with a custom aggregator" do - add_five = add_step_class.new(5) - multiply_three = multiply_step_class.new(3) - add_two = add_step_class.new(2) + add_five = add_step_class.new(5, name: "add_five") + multiply_three = multiply_step_class.new(3, name: "multiply_three") + add_two = add_step_class.new(2, name: "add_two") workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two], aggregator: sum_aggregator) @@ -65,7 +66,7 @@ def run(_input) end it "handles single step" do - multiply_step = multiply_step_class.new(7) + multiply_step = multiply_step_class.new(7, name: "multiply") workflow = described_class.new("single_step", steps: [multiply_step]) expect(workflow.run(6)).to eq([42]) @@ -77,42 +78,123 @@ def run(_input) expect(workflow.run(42)).to eq([]) end + it "records outputs in context per step" do + step1 = Class.new(MARS::Runnable) do + def run(input) = "from_step1:#{input}" + end.new(name: "step1") + + step2 = Class.new(MARS::Runnable) do + def run(input) = "from_step2:#{input}" + end.new(name: "step2") + + context = MARS::ExecutionContext.new(input: "hello") + workflow = described_class.new("ctx_workflow", steps: [step1, step2]) + workflow.run(context) + + expect(context[:step1]).to eq("from_step1:hello") + expect(context[:step2]).to eq("from_step2:hello") + end + + it "forks context so parallel steps get independent current_input" do + step1 = Class.new(MARS::Runnable) do + def run(input) = "#{input}_modified" + end.new(name: "step1") + + step2 = Class.new(MARS::Runnable) do + def run(input) = "#{input}_also_modified" + end.new(name: "step2") + + context = MARS::ExecutionContext.new(input: "original") + workflow = described_class.new("fork_test", steps: [step1, step2]) + workflow.run(context) + + # Both steps received the same original input + expect(context[:step1]).to eq("original_modified") + expect(context[:step2]).to eq("original_also_modified") + end + + it "shares global_state across forked contexts" do + step1 = Class.new(MARS::Runnable) do + def run(_input) + "done" + end + end.new(name: "step1") + + context = MARS::ExecutionContext.new(input: "test", global_state: { shared: true }) + workflow = described_class.new("shared_state", steps: [step1]) + workflow.run(context) + + expect(context.global_state[:shared]).to be true + end + + it "calls formatter on each step" do + uppercase_formatter = Class.new(MARS::Formatter) do + def format_output(output) + output.upcase + end + end + + step = Class.new(MARS::Runnable) do + def run(input) = "result:#{input}" + end.new(name: "step", formatter: uppercase_formatter.new) + + workflow = described_class.new("fmt_workflow", steps: [step]) + + expect(workflow.run("hello")).to eq(["RESULT:HELLO"]) + end + + it "fires before_run and after_run hooks" do + hook_log = [] + + step_class = Class.new(MARS::Runnable) do + before_run { |_ctx, step| hook_log << "before:#{step.name}" } + after_run { |_ctx, _result, step| hook_log << "after:#{step.name}" } + + def run(input) = input + end + + step = step_class.new(name: "hooked") + workflow = described_class.new("hook_workflow", steps: [step]) + workflow.run("test") + + expect(hook_log).to eq(["before:hooked", "after:hooked"]) + end + it "unwraps local halts and returns plain result" do gate = MARS::Gate.new( - "LocalBranch", + "local_branch", check: ->(_input) { :branch }, fallbacks: { branch: Class.new(MARS::Runnable) do def run(input) "branched:#{input}" end - end.new + end.new(name: "branch_step") } ) - add_five = add_step_class.new(5) + add_five = add_step_class.new(5, name: "add_five") workflow = described_class.new("halt_workflow", steps: [gate, add_five]) result = workflow.run(10) - # Local halts are unwrapped, aggregated as plain values expect(result).not_to be_a(MARS::Halt) expect(result).to eq(["branched:10", 15]) end it "propagates global halt to parent workflow" do gate = MARS::Gate.new( - "GlobalBranch", + "global_branch", check: ->(_input) { :branch }, fallbacks: { branch: Class.new(MARS::Runnable) do def run(input) "branched:#{input}" end - end.new + end.new(name: "branch_step") }, halt_scope: :global ) - add_five = add_step_class.new(5) + add_five = add_step_class.new(5, name: "add_five") workflow = described_class.new("halt_workflow", steps: [gate, add_five]) @@ -123,9 +205,9 @@ def run(input) end it "propagates errors from steps" do - add_step = add_step_class.new(5) - error_step = error_step_class.new("Step failed", "error_step_one") - error_step_two = error_step_class.new("Step failed two", "error_step_two") + add_step = add_step_class.new(5, name: "add") + error_step = error_step_class.new("Step failed", name: "error_step_one") + error_step_two = error_step_class.new("Step failed two", name: "error_step_two") workflow = described_class.new("error_workflow", steps: [add_step, error_step, error_step_two]) diff --git a/spec/mars/workflows/sequential_spec.rb b/spec/mars/workflows/sequential_spec.rb index 01c3e44..b2ea2b5 100644 --- a/spec/mars/workflows/sequential_spec.rb +++ b/spec/mars/workflows/sequential_spec.rb @@ -2,8 +2,9 @@ RSpec.describe MARS::Workflows::Sequential do let(:add_step_class) do - Class.new do - def initialize(value) + Class.new(MARS::Runnable) do + def initialize(value, **kwargs) + super(**kwargs) @value = value end @@ -14,8 +15,9 @@ def run(input) end let(:multiply_step_class) do - Class.new do - def initialize(multiplier) + Class.new(MARS::Runnable) do + def initialize(multiplier, **kwargs) + super(**kwargs) @multiplier = multiplier end @@ -26,8 +28,9 @@ def run(input) end let(:error_step_class) do - Class.new do - def initialize(message) + Class.new(MARS::Runnable) do + def initialize(message, **kwargs) + super(**kwargs) @message = message end @@ -39,9 +42,9 @@ def run(_input) describe "#run" do it "executes steps sequentially" do - add_five = add_step_class.new(5) - multiply_three = multiply_step_class.new(3) - add_two = add_step_class.new(2) + add_five = add_step_class.new(5, name: "add_five") + multiply_three = multiply_step_class.new(3, name: "multiply_three") + add_two = add_step_class.new(2, name: "add_two") workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two]) @@ -50,7 +53,7 @@ def run(_input) end it "handles single step" do - multiply_step = multiply_step_class.new(7) + multiply_step = multiply_step_class.new(7, name: "multiply") workflow = described_class.new("single_step", steps: [multiply_step]) expect(workflow.run(6)).to eq(42) @@ -62,45 +65,104 @@ def run(_input) expect(workflow.run(42)).to eq(42) end + it "records outputs in context accessible by step name" do + step1 = Class.new(MARS::Runnable) do + def run(input) = "from_step1:#{input}" + end.new(name: "step1") + + step2 = Class.new(MARS::Runnable) do + def run(input) = "from_step2:#{input}" + end.new(name: "step2") + + context = MARS::ExecutionContext.new(input: "hello") + workflow = described_class.new("ctx_workflow", steps: [step1, step2]) + workflow.run(context) + + expect(context[:step1]).to eq("from_step1:hello") + expect(context[:step2]).to eq("from_step2:from_step1:hello") + end + + it "wraps raw input in ExecutionContext automatically" do + step = Class.new(MARS::Runnable) do + def run(input) = "processed:#{input}" + end.new(name: "step") + + workflow = described_class.new("auto_wrap", steps: [step]) + + expect(workflow.run("raw")).to eq("processed:raw") + end + + it "calls formatter on each step" do + uppercase_formatter = Class.new(MARS::Formatter) do + def format_output(output) + output.upcase + end + end + + step = Class.new(MARS::Runnable) do + def run(input) = "result:#{input}" + end.new(name: "step", formatter: uppercase_formatter.new) + + workflow = described_class.new("fmt_workflow", steps: [step]) + + expect(workflow.run("hello")).to eq("RESULT:HELLO") + end + + it "fires before_run and after_run hooks" do + hook_log = [] + + step_class = Class.new(MARS::Runnable) do + before_run { |_ctx, step| hook_log << "before:#{step.name}" } + after_run { |_ctx, _result, step| hook_log << "after:#{step.name}" } + + def run(input) = input + end + + step = step_class.new(name: "hooked") + workflow = described_class.new("hook_workflow", steps: [step]) + workflow.run("test") + + expect(hook_log).to eq(["before:hooked", "after:hooked"]) + end + it "halts locally when a gate triggers with local scope" do - add_five = add_step_class.new(5) + add_five = add_step_class.new(5, name: "add_five") gate = MARS::Gate.new( - "LocalGate", + "local_gate", check: ->(_input) { :branch }, fallbacks: { branch: Class.new(MARS::Runnable) do def run(input) "branched:#{input}" end - end.new + end.new(name: "branch_step") } ) - multiply_three = multiply_step_class.new(3) + multiply_three = multiply_step_class.new(3, name: "multiply_three") workflow = described_class.new("halt_workflow", steps: [add_five, gate, multiply_three]) # 10 + 5 = 15, gate branches -> "branched:15", multiply_three is never reached - # Local halt is consumed — returns plain value result = workflow.run(10) expect(result).to eq("branched:15") expect(result).not_to be_a(MARS::Halt) end it "propagates global halt without unwrapping" do - add_five = add_step_class.new(5) + add_five = add_step_class.new(5, name: "add_five") gate = MARS::Gate.new( - "GlobalGate", + "global_gate", check: ->(_input) { :branch }, fallbacks: { branch: Class.new(MARS::Runnable) do def run(input) "branched:#{input}" end - end.new + end.new(name: "branch_step") }, halt_scope: :global ) - multiply_three = multiply_step_class.new(3) + multiply_three = multiply_step_class.new(3, name: "multiply_three") workflow = described_class.new("halt_workflow", steps: [add_five, gate, multiply_three]) @@ -112,50 +174,47 @@ def run(input) it "propagates global halt through nested sequential workflows" do inner_gate = MARS::Gate.new( - "InnerGate", + "inner_gate", check: ->(_input) { :stop }, fallbacks: { stop: Class.new(MARS::Runnable) do def run(input) "stopped:#{input}" end - end.new + end.new(name: "stop_step") }, halt_scope: :global ) inner = described_class.new("inner", steps: [inner_gate]) - after_inner = add_step_class.new(100) + after_inner = add_step_class.new(100, name: "after_inner") outer = described_class.new("outer", steps: [inner, after_inner]) result = outer.run(1) - # Global halt propagates through both sequential levels expect(result).to be_a(MARS::Halt) expect(result.result).to eq("stopped:1") end it "consumes local halt — outer workflow continues" do inner_gate = MARS::Gate.new( - "InnerGate", + "inner_gate", check: ->(_input) { :stop }, fallbacks: { stop: Class.new(MARS::Runnable) do def run(input) "stopped:#{input}" end - end.new + end.new(name: "stop_step") } - # default :local scope ) inner = described_class.new("inner", steps: [inner_gate]) - # Inner halts locally -> returns "stopped:1" as plain value string_step = Class.new(MARS::Runnable) do def run(input) "after:#{input}" end - end.new + end.new(name: "after_step") outer = described_class.new("outer", steps: [inner, string_step]) @@ -165,8 +224,8 @@ def run(input) end it "propagates errors from steps" do - add_step = add_step_class.new(5) - error_step = error_step_class.new("Step failed") + add_step = add_step_class.new(5, name: "add") + error_step = error_step_class.new("Step failed", name: "error") workflow = described_class.new("error_workflow", steps: [add_step, error_step])