Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cli/commands/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async function maybeFireTerminated(
): Promise<WorkflowRunTerminatedOutput | null> {
const run = await rt.getRun(runId)
if (!run) return null
const graph = await rt.getGraph(run.graph_id)
const graph = await rt.getRunGraph(run)
if (!graph) return null

const states = rt.deriveNodeStates(graph, run)
Expand Down Expand Up @@ -234,7 +234,7 @@ export const workflowStatusCommand: Command = async (ctx, args) => {
const rt = makeRuntime(ctx)
const run = await rt.getRun(runId)
if (!run) throw new Error(`Unknown run: ${runId}`)
const graph = await rt.getGraph(run.graph_id)
const graph = await rt.getRunGraph(run)
if (!graph) throw new Error(`Unknown graph: ${run.graph_id}`)

const states = rt.deriveNodeStates(graph, run)
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export type NodeState = {
export type Run = {
run_id: string
graph_id: string
graph_version?: string | undefined
name?: string
created_at: string
history: Transition[]
Expand Down
4 changes: 2 additions & 2 deletions src/workflow/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import type { GraphDef, Run, Transition } from "../types.js"

export interface WorkflowAdapter {
saveGraph(graph: GraphDef): Promise<void>
loadGraph(graphId: string): Promise<GraphDef | undefined>
loadGraph(graphId: string, version?: string): Promise<GraphDef | undefined>
listGraphs(): Promise<GraphDef[]>
saveSourceGraph?(graph: GraphDef): Promise<void>
loadSourceGraph?(graphId: string): Promise<GraphDef | undefined>

createRun(graphId: string, name?: string): Promise<Run>
createRun(graphId: string, name?: string, graphVersion?: string): Promise<Run>
loadRun(runId: string): Promise<Run | undefined>
listRuns(graphId?: string): Promise<Run[]>
appendTransition(runId: string, transition: Transition): Promise<void>
Expand Down
26 changes: 25 additions & 1 deletion src/workflow/filesystem.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ describe("FilesystemWorkflowAdapter", () => {
const loaded = await store.loadGraph("g1")
expect(loaded?.name).toBe("Updated")
})

it("loads a specific saved graph version", async () => {
const v1 = makeGraph()
const v2 = { ...v1, version: "2.0.0", name: "Version Two" }
await store.saveGraph(v1)
await store.saveGraph(v2)

const current = await store.loadGraph("g1")
const pinned = await store.loadGraph("g1", "1.0.0")

expect(current?.version).toBe("2.0.0")
expect(pinned?.version).toBe("1.0.0")
expect(pinned?.name).toBe("Test Graph")
})
})

describe("loadGraph — missing", () => {
Expand All @@ -84,6 +98,15 @@ describe("FilesystemWorkflowAdapter", () => {
expect(ids).toEqual(["g1", "g2"])
})

it("excludes version snapshots from the graph list", async () => {
await store.saveGraph(makeGraph("g1"))
await store.saveGraph({ ...makeGraph("g1"), version: "2.0.0" })
const graphs = await store.listGraphs()
expect(graphs).toHaveLength(1)
expect(graphs[0]!.id).toBe("g1")
expect(graphs[0]!.version).toBe("2.0.0")
})

it("returns an empty array when no graphs exist", async () => {
const graphs = await store.listGraphs()
expect(graphs).toEqual([])
Expand All @@ -92,9 +115,10 @@ describe("FilesystemWorkflowAdapter", () => {

describe("createRun / loadRun", () => {
it("creates a run with a generated id and empty history", async () => {
const run = await store.createRun("g1")
const run = await store.createRun("g1", undefined, "1.0.0")
expect(run.run_id).toBeDefined()
expect(run.graph_id).toBe("g1")
expect(run.graph_version).toBe("1.0.0")
expect(run.history).toEqual([])
expect(run.created_at).toBeDefined()
})
Expand Down
23 changes: 19 additions & 4 deletions src/workflow/filesystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ function isNodeError(err: unknown): err is NodeError {
return err instanceof Error
}

function versionedGraphName(graphId: string, version: string): string {
return `${graphId}@${encodeURIComponent(version)}`
}

async function safeReaddir(dir: string): Promise<string[]> {
try {
return await readdir(dir)
Expand All @@ -41,6 +45,10 @@ export class FilesystemWorkflowAdapter implements WorkflowAdapter {
join(this.graphsDir, `${graph.id}.json`),
JSON.stringify(graph, null, 2),
)
await writeFile(
join(this.graphsDir, `${versionedGraphName(graph.id, graph.version)}.json`),
JSON.stringify(graph, null, 2),
)
}

async saveSourceGraph(graph: GraphDef): Promise<void> {
Expand All @@ -65,9 +73,11 @@ export class FilesystemWorkflowAdapter implements WorkflowAdapter {
return undefined
}

async loadGraph(graphId: string): Promise<GraphDef | undefined> {
async loadGraph(graphId: string, version?: string): Promise<GraphDef | undefined> {
const name =
version === undefined ? graphId : versionedGraphName(graphId, version)
for (const ext of [".json", ".yaml", ".yml"]) {
const filePath = join(this.graphsDir, `${graphId}${ext}`)
const filePath = join(this.graphsDir, `${name}${ext}`)
try {
const data = await readFile(filePath, "utf-8")
return parseGraph(data, filePath)
Expand All @@ -91,7 +101,7 @@ export class FilesystemWorkflowAdapter implements WorkflowAdapter {
file.endsWith(".json") ||
file.endsWith(".yaml") ||
file.endsWith(".yml")
if (!isGraphFile || isSourceFile) continue
if (!isGraphFile || isSourceFile || file.includes("@")) continue
const filePath = join(this.graphsDir, file)
const data = await readFile(filePath, "utf-8")
graphs.push(parseGraph(data, filePath))
Expand All @@ -101,11 +111,16 @@ export class FilesystemWorkflowAdapter implements WorkflowAdapter {

// ---- Runs ----------------------------------------------------------------

async createRun(graphId: string, name?: string): Promise<Run> {
async createRun(
graphId: string,
name?: string,
graphVersion?: string,
): Promise<Run> {
await mkdir(this.runsDir, { recursive: true })
const run: Run = {
run_id: randomUUID(),
graph_id: graphId,
...(graphVersion !== undefined ? { graph_version: graphVersion } : {}),
...(name !== undefined ? { name } : {}),
created_at: new Date().toISOString(),
history: [],
Expand Down
68 changes: 66 additions & 2 deletions src/workflow/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,37 @@ import { Runtime } from "./runtime.js"

class MemoryAdapter implements WorkflowAdapter {
private graphs = new Map<string, GraphDef>()
private graphVersions = new Map<string, GraphDef>()
private runs = new Map<string, Run>()

async saveGraph(graph: GraphDef): Promise<void> {
this.graphs.set(graph.id, graph)
this.graphVersions.set(`${graph.id}@${graph.version}`, graph)
}

async loadGraph(graphId: string): Promise<GraphDef | undefined> {
async loadGraph(
graphId: string,
version?: string,
): Promise<GraphDef | undefined> {
if (version !== undefined) {
return this.graphVersions.get(`${graphId}@${version}`)
}
return this.graphs.get(graphId)
}

async listGraphs(): Promise<GraphDef[]> {
return [...this.graphs.values()]
}

async createRun(graphId: string, name?: string): Promise<Run> {
async createRun(
graphId: string,
name?: string,
graphVersion?: string,
): Promise<Run> {
const run: Run = {
run_id: `run-${this.runs.size + 1}`,
graph_id: graphId,
...(graphVersion !== undefined ? { graph_version: graphVersion } : {}),
...(name !== undefined ? { name } : {}),
created_at: new Date().toISOString(),
history: [],
Expand Down Expand Up @@ -802,13 +815,64 @@ describe("run lifecycle", () => {
await expect(rt.createRun("nope")).rejects.toThrow(/Unknown graph/)
})

it("createRun pins the run to the current graph version", async () => {
const g = linearGraph()
await rt.registerGraph(g)
const run = await rt.createRun(g.id)
expect(run.graph_version).toBe("1.0")
})

it("next() throws for unknown graph or run", async () => {
const g = linearGraph()
await rt.registerGraph(g)
const run = await rt.createRun(g.id)
await expect(rt.next("nope", run.run_id)).rejects.toThrow(/Unknown graph/)
await expect(rt.next(g.id, "nope")).rejects.toThrow(/Unknown run/)
})

it("continues pinned runs against their original graph after the current graph changes", async () => {
const v1 = linearGraph()
await rt.registerGraph(v1)
const run = await rt.createRun(v1.id)

await rt.transition(v1.id, run.run_id, "A", "in_progress", "agent")
await rt.transition(v1.id, run.run_id, "A", "completed", "agent")

const v2: GraphDef = {
...v1,
version: "2.0",
nodes: [
{ id: "A", label: "Step A", artifact_type: "doc" },
{ id: "renamed", label: "Renamed Step", artifact_type: "doc" },
],
edges: [{ from: "A", to: "renamed" }],
}
await rt.registerGraph(v2)

expect(await rt.next(v1.id, run.run_id)).toEqual(["B"])
await rt.transition(v1.id, run.run_id, "B", "in_progress", "agent")
await expect(
rt.transition(v1.id, run.run_id, "renamed", "in_progress", "agent"),
).rejects.toThrow(/Unknown node/)
})

it("legacy runs without graph_version use the current graph", async () => {
const v1 = linearGraph()
await rt.registerGraph(v1)
const run = await store.createRun(v1.id)

const v2: GraphDef = {
...v1,
version: "2.0",
nodes: [{ id: "A", label: "Step A", artifact_type: "doc" }],
edges: [],
}
await rt.registerGraph(v2)

const graph = await rt.getRunGraph(run)
expect(graph?.version).toBe("2.0")
expect(await rt.next(v1.id, run.run_id)).toEqual(["A"])
})
})

describe("subgraph integration", () => {
Expand Down
30 changes: 25 additions & 5 deletions src/workflow/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class Runtime {
async createRun(graphId: string, name?: string): Promise<Run> {
const graph = await this.store.loadGraph(graphId)
if (!graph) throw new Error(`Unknown graph: ${graphId}`)
return this.store.createRun(graphId, name)
return this.store.createRun(graphId, name, graph.version)
}

async getRun(runId: string): Promise<Run | undefined> {
Expand All @@ -47,6 +47,10 @@ export class Runtime {
return this.store.listRuns(graphId)
}

async getRunGraph(run: Run): Promise<GraphDef | undefined> {
return this.loadGraphForRun(run.graph_id, run)
}

// ---- Derived state ------------------------------------------------------

deriveNodeStates(graph: GraphDef, run: Run): Record<string, NodeState> {
Expand All @@ -68,10 +72,10 @@ export class Runtime {
}

async next(graphId: string, runId: string): Promise<string[]> {
const graph = await this.store.loadGraph(graphId)
if (!graph) throw new Error(`Unknown graph: ${graphId}`)
const run = await this.store.loadRun(runId)
if (!run) throw new Error(`Unknown run: ${runId}`)
const graph = await this.loadGraphForRun(graphId, run)
if (!graph) throw new Error(this.unknownGraphMessage(graphId, run))

const states = this.deriveNodeStates(graph, run)
const available: string[] = []
Expand Down Expand Up @@ -113,10 +117,10 @@ export class Runtime {
metadata?: Record<string, unknown>
},
): Promise<Transition> {
const graph = await this.store.loadGraph(graphId)
if (!graph) throw new Error(`Unknown graph: ${graphId}`)
const run = await this.store.loadRun(runId)
if (!run) throw new Error(`Unknown run: ${runId}`)
const graph = await this.loadGraphForRun(graphId, run)
if (!graph) throw new Error(this.unknownGraphMessage(graphId, run))

const nodeDef = graph.nodes.find((n) => n.id === nodeId)
if (!nodeDef) throw new Error(`Unknown node: ${nodeId}`)
Expand Down Expand Up @@ -191,6 +195,22 @@ export class Runtime {
}
}

private async loadGraphForRun(
graphId: string,
run: Run,
): Promise<GraphDef | undefined> {
if (run.graph_id !== graphId) return undefined
return this.store.loadGraph(graphId, run.graph_version)
}

private unknownGraphMessage(graphId: string, run: Run): string {
if (run.graph_id !== graphId) return `Unknown graph: ${graphId}`
if (run.graph_version !== undefined) {
return `Unknown graph: ${graphId}@${run.graph_version}`
}
return `Unknown graph: ${graphId}`
}

private validateArtifactType(node: NodeDef, artifactType: string): void {
// Resolve the expected type: structured contract wins over legacy shorthand.
const expected = node.artifact?.type ?? node.artifact_type
Expand Down
Loading