Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/mars/execution_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ def initialize(input: nil, global_state: {})
end

def [](step_name)
outputs[step_name]
outputs[step_name.to_sym]
end

def record(step_name, output)
@outputs[step_name] = output
@outputs[step_name.to_sym] = output
@current_input = output
end

Expand Down
1 change: 1 addition & 0 deletions lib/mars/rendering/graph.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module MARS
module Rendering
module Graph
def self.include_extensions
MARS::Runnable.include(Runnable)
MARS::AgentStep.include(AgentStep)
MARS::Gate.include(Gate)
MARS::Workflows::Sequential.include(SequentialWorkflow)
Expand Down
18 changes: 18 additions & 0 deletions lib/mars/rendering/graph/runnable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module MARS
module Rendering
module Graph
module Runnable
include Base

def to_graph(builder, parent_id: nil, value: nil)
builder.add_node(node_id, name, Node::STEP)
builder.add_edge(parent_id, node_id, value)

[node_id]
end
end
end
end
end
37 changes: 37 additions & 0 deletions lib/mars/rendering/html.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

module MARS
module Rendering
class Html
MERMAID_CDN = "https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js"

def initialize(obj)
@mermaid = Mermaid.new(obj)
end

def render
diagram = @mermaid.graph_mermaid.join("\n")
direction = "LR"

<<~HTML
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MARS Workflow</title>
<script src="#{MERMAID_CDN}"></script>
</head>
<body>
<pre class="mermaid">
flowchart #{direction}
#{diagram}
</pre>
<script>mermaid.initialize({ startOnLoad: true });</script>
</body>
</html>
HTML
end
end
end
end
30 changes: 27 additions & 3 deletions lib/mars/workflows/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ def initialize(name, steps:, aggregator: nil, **kwargs)
end

def run(input)
context = ensure_context(input)
errors = []
results = execute_steps(input, errors)
child_contexts = []
results = execute_steps(context, errors, child_contexts)

raise AggregateError, errors if errors.any?

context.merge(child_contexts)

has_global_halt = results.any? { |r| r.is_a?(Halt) && r.global? }
unwrapped = results.map { |r| r.is_a?(Halt) ? r.result : r }
result = aggregator.run(unwrapped)
Expand All @@ -26,11 +30,27 @@ def run(input)

attr_reader :steps, :aggregator

def execute_steps(input, errors)
def execute_steps(context, errors, child_contexts)
Async do |workflow|
tasks = steps.map do |step|
child_ctx = context.fork
child_contexts << child_ctx

workflow.async do
step.run(input)
step.run_before_hooks(child_ctx)

step_input = step.formatter.format_input(child_ctx)
result = step.run(step_input)

if result.is_a?(Halt)
step.run_after_hooks(child_ctx, result)
result
else
formatted = step.formatter.format_output(result)
child_ctx.record(step.name, formatted)
step.run_after_hooks(child_ctx, formatted)
formatted
end
rescue StandardError => e
errors << { error: e, step_name: step.name }
end
Expand All @@ -39,6 +59,10 @@ def execute_steps(input, errors)
tasks.map(&:wait)
end.result
end

def ensure_context(input)
input.is_a?(ExecutionContext) ? input : ExecutionContext.new(input: input)
end
end
end
end
36 changes: 27 additions & 9 deletions lib/mars/workflows/sequential.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,41 @@ def initialize(name, steps:, **kwargs)
end

def run(input)
@steps.each do |step|
input = step.run(input)

next unless input.is_a?(Halt)

return input if input.global?
context = ensure_context(input)

input = input.result
break
@steps.each do |step|
step.run_before_hooks(context)

step_input = step.formatter.format_input(context)
result = step.run(step_input)

if result.is_a?(Halt)
if result.global?
step.run_after_hooks(context, result)
return result
end

formatted = step.formatter.format_output(result.result)
context.record(step.name, formatted)
step.run_after_hooks(context, formatted)
break
end

formatted = step.formatter.format_output(result)
context.record(step.name, formatted)
step.run_after_hooks(context, formatted)
end

input
context.current_input
end

private

attr_reader :steps

def ensure_context(input)
input.is_a?(ExecutionContext) ? input : ExecutionContext.new(input: input)
end
end
end
end
27 changes: 27 additions & 0 deletions spec/mars/rendering/html_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

RSpec.describe MARS::Rendering::Html do
it "renders a self-contained HTML page with mermaid diagram" do
step1 = MARS::AgentStep.new(name: "step1")
step2 = MARS::AgentStep.new(name: "step2")
workflow = MARS::Workflows::Sequential.new("pipeline", steps: [step1, step2])

html = described_class.new(workflow).render

expect(html).to include("<!DOCTYPE html>")
expect(html).to include("mermaid")
expect(html).to include("flowchart LR")
expect(html).to include("step1")
expect(html).to include("step2")
expect(html).to include("mermaid.initialize")
end

it "includes the Mermaid CDN script" do
step = MARS::AgentStep.new(name: "step")
workflow = MARS::Workflows::Sequential.new("simple", steps: [step])

html = described_class.new(workflow).render

expect(html).to include(MARS::Rendering::Html::MERMAID_CDN)
end
end
67 changes: 67 additions & 0 deletions spec/mars/rendering/mermaid_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# frozen_string_literal: true

RSpec.describe MARS::Rendering::Mermaid do
it "renders a custom Runnable subclass as a box node" do
step = Class.new(MARS::Runnable) do
def run(input) = input
end.new(name: "custom_step")

mermaid = described_class.new(step)
output = mermaid.render

expect(output).to include("custom_step[custom_step]")
end

it "renders an AgentStep as a box node" do
step = MARS::AgentStep.new(name: "my_agent")
mermaid = described_class.new(step)
output = mermaid.render

expect(output).to include("my_agent[my_agent]")
end

it "renders a Gate as a diamond node" do
gate = MARS::Gate.new(
"my_gate",
check: ->(_input) { :branch },
fallbacks: {
branch: Class.new(MARS::Runnable) do
def run(input) = input
end.new(name: "branch_step")
}
)

mermaid = described_class.new(gate)
output = mermaid.render

expect(output).to include("my_gate{my_gate}")
expect(output).to include("|branch|")
end

it "renders a Sequential workflow with subgraph" do
step1 = MARS::AgentStep.new(name: "step1")
step2 = MARS::AgentStep.new(name: "step2")
workflow = MARS::Workflows::Sequential.new("pipeline", steps: [step1, step2])

mermaid = described_class.new(workflow)
output = mermaid.render

expect(output).to include("subgraph pipeline")
expect(output).to include("step1[step1]")
expect(output).to include("step2[step2]")
end

it "renders a Parallel workflow with aggregator" do
step1 = MARS::AgentStep.new(name: "step1")
step2 = MARS::AgentStep.new(name: "step2")
workflow = MARS::Workflows::Parallel.new("parallel", steps: [step1, step2])

mermaid = described_class.new(workflow)
output = mermaid.render

expect(output).to include("subgraph parallel")
expect(output).to include("step1[step1]")
expect(output).to include("step2[step2]")
expect(output).to include("parallel_aggregator")
end
end
Loading
Loading