diff --git a/src/cli/commands/workflow.ts b/src/cli/commands/workflow.ts index 9957610..844936f 100644 --- a/src/cli/commands/workflow.ts +++ b/src/cli/commands/workflow.ts @@ -88,7 +88,7 @@ async function maybeFireTerminated( ): Promise { const run = await rt.getRun(runId) if (!run) return null - const graph = await rt.getGraph(run.graph_id) + const graph = await rt.getRunGraph(run) if (!graph) return null const states = rt.deriveNodeStates(graph, run) @@ -234,7 +234,7 @@ export const workflowStatusCommand: Command = async (ctx, args) => { const rt = makeRuntime(ctx) const run = await rt.getRun(runId) if (!run) throw new Error(`Unknown run: ${runId}`) - const graph = await rt.getGraph(run.graph_id) + const graph = await rt.getRunGraph(run) if (!graph) throw new Error(`Unknown graph: ${run.graph_id}`) const states = rt.deriveNodeStates(graph, run) diff --git a/src/types.ts b/src/types.ts index 3bbf5e4..47c61bd 100644 --- a/src/types.ts +++ b/src/types.ts @@ -140,6 +140,7 @@ export type NodeState = { export type Run = { run_id: string graph_id: string + graph_version?: string | undefined name?: string created_at: string history: Transition[] diff --git a/src/workflow/adapter.ts b/src/workflow/adapter.ts index 0cd8e94..6ae08a1 100644 --- a/src/workflow/adapter.ts +++ b/src/workflow/adapter.ts @@ -2,12 +2,12 @@ import type { GraphDef, Run, Transition } from "../types.js" export interface WorkflowAdapter { saveGraph(graph: GraphDef): Promise - loadGraph(graphId: string): Promise + loadGraph(graphId: string, version?: string): Promise listGraphs(): Promise saveSourceGraph?(graph: GraphDef): Promise loadSourceGraph?(graphId: string): Promise - createRun(graphId: string, name?: string): Promise + createRun(graphId: string, name?: string, graphVersion?: string): Promise loadRun(runId: string): Promise listRuns(graphId?: string): Promise appendTransition(runId: string, transition: Transition): Promise diff --git a/src/workflow/filesystem.test.ts b/src/workflow/filesystem.test.ts index 9b1b1ad..cc50d9d 100644 --- a/src/workflow/filesystem.test.ts +++ b/src/workflow/filesystem.test.ts @@ -65,6 +65,20 @@ describe("FilesystemWorkflowAdapter", () => { const loaded = await store.loadGraph("g1") expect(loaded?.name).toBe("Updated") }) + + it("loads a specific saved graph version", async () => { + const v1 = makeGraph() + const v2 = { ...v1, version: "2.0.0", name: "Version Two" } + await store.saveGraph(v1) + await store.saveGraph(v2) + + const current = await store.loadGraph("g1") + const pinned = await store.loadGraph("g1", "1.0.0") + + expect(current?.version).toBe("2.0.0") + expect(pinned?.version).toBe("1.0.0") + expect(pinned?.name).toBe("Test Graph") + }) }) describe("loadGraph — missing", () => { @@ -84,6 +98,15 @@ describe("FilesystemWorkflowAdapter", () => { expect(ids).toEqual(["g1", "g2"]) }) + it("excludes version snapshots from the graph list", async () => { + await store.saveGraph(makeGraph("g1")) + await store.saveGraph({ ...makeGraph("g1"), version: "2.0.0" }) + const graphs = await store.listGraphs() + expect(graphs).toHaveLength(1) + expect(graphs[0]!.id).toBe("g1") + expect(graphs[0]!.version).toBe("2.0.0") + }) + it("returns an empty array when no graphs exist", async () => { const graphs = await store.listGraphs() expect(graphs).toEqual([]) @@ -92,9 +115,10 @@ describe("FilesystemWorkflowAdapter", () => { describe("createRun / loadRun", () => { it("creates a run with a generated id and empty history", async () => { - const run = await store.createRun("g1") + const run = await store.createRun("g1", undefined, "1.0.0") expect(run.run_id).toBeDefined() expect(run.graph_id).toBe("g1") + expect(run.graph_version).toBe("1.0.0") expect(run.history).toEqual([]) expect(run.created_at).toBeDefined() }) diff --git a/src/workflow/filesystem.ts b/src/workflow/filesystem.ts index 0ecd6a7..2a15261 100644 --- a/src/workflow/filesystem.ts +++ b/src/workflow/filesystem.ts @@ -15,6 +15,10 @@ function isNodeError(err: unknown): err is NodeError { return err instanceof Error } +function versionedGraphName(graphId: string, version: string): string { + return `${graphId}@${encodeURIComponent(version)}` +} + async function safeReaddir(dir: string): Promise { try { return await readdir(dir) @@ -41,6 +45,10 @@ export class FilesystemWorkflowAdapter implements WorkflowAdapter { join(this.graphsDir, `${graph.id}.json`), JSON.stringify(graph, null, 2), ) + await writeFile( + join(this.graphsDir, `${versionedGraphName(graph.id, graph.version)}.json`), + JSON.stringify(graph, null, 2), + ) } async saveSourceGraph(graph: GraphDef): Promise { @@ -65,9 +73,11 @@ export class FilesystemWorkflowAdapter implements WorkflowAdapter { return undefined } - async loadGraph(graphId: string): Promise { + async loadGraph(graphId: string, version?: string): Promise { + const name = + version === undefined ? graphId : versionedGraphName(graphId, version) for (const ext of [".json", ".yaml", ".yml"]) { - const filePath = join(this.graphsDir, `${graphId}${ext}`) + const filePath = join(this.graphsDir, `${name}${ext}`) try { const data = await readFile(filePath, "utf-8") return parseGraph(data, filePath) @@ -91,7 +101,7 @@ export class FilesystemWorkflowAdapter implements WorkflowAdapter { file.endsWith(".json") || file.endsWith(".yaml") || file.endsWith(".yml") - if (!isGraphFile || isSourceFile) continue + if (!isGraphFile || isSourceFile || file.includes("@")) continue const filePath = join(this.graphsDir, file) const data = await readFile(filePath, "utf-8") graphs.push(parseGraph(data, filePath)) @@ -101,11 +111,16 @@ export class FilesystemWorkflowAdapter implements WorkflowAdapter { // ---- Runs ---------------------------------------------------------------- - async createRun(graphId: string, name?: string): Promise { + async createRun( + graphId: string, + name?: string, + graphVersion?: string, + ): Promise { await mkdir(this.runsDir, { recursive: true }) const run: Run = { run_id: randomUUID(), graph_id: graphId, + ...(graphVersion !== undefined ? { graph_version: graphVersion } : {}), ...(name !== undefined ? { name } : {}), created_at: new Date().toISOString(), history: [], diff --git a/src/workflow/runtime.test.ts b/src/workflow/runtime.test.ts index 944af93..f8fa747 100644 --- a/src/workflow/runtime.test.ts +++ b/src/workflow/runtime.test.ts @@ -9,13 +9,21 @@ import { Runtime } from "./runtime.js" class MemoryAdapter implements WorkflowAdapter { private graphs = new Map() + private graphVersions = new Map() private runs = new Map() async saveGraph(graph: GraphDef): Promise { this.graphs.set(graph.id, graph) + this.graphVersions.set(`${graph.id}@${graph.version}`, graph) } - async loadGraph(graphId: string): Promise { + async loadGraph( + graphId: string, + version?: string, + ): Promise { + if (version !== undefined) { + return this.graphVersions.get(`${graphId}@${version}`) + } return this.graphs.get(graphId) } @@ -23,10 +31,15 @@ class MemoryAdapter implements WorkflowAdapter { return [...this.graphs.values()] } - async createRun(graphId: string, name?: string): Promise { + async createRun( + graphId: string, + name?: string, + graphVersion?: string, + ): Promise { const run: Run = { run_id: `run-${this.runs.size + 1}`, graph_id: graphId, + ...(graphVersion !== undefined ? { graph_version: graphVersion } : {}), ...(name !== undefined ? { name } : {}), created_at: new Date().toISOString(), history: [], @@ -802,6 +815,13 @@ describe("run lifecycle", () => { await expect(rt.createRun("nope")).rejects.toThrow(/Unknown graph/) }) + it("createRun pins the run to the current graph version", async () => { + const g = linearGraph() + await rt.registerGraph(g) + const run = await rt.createRun(g.id) + expect(run.graph_version).toBe("1.0") + }) + it("next() throws for unknown graph or run", async () => { const g = linearGraph() await rt.registerGraph(g) @@ -809,6 +829,50 @@ describe("run lifecycle", () => { await expect(rt.next("nope", run.run_id)).rejects.toThrow(/Unknown graph/) await expect(rt.next(g.id, "nope")).rejects.toThrow(/Unknown run/) }) + + it("continues pinned runs against their original graph after the current graph changes", async () => { + const v1 = linearGraph() + await rt.registerGraph(v1) + const run = await rt.createRun(v1.id) + + await rt.transition(v1.id, run.run_id, "A", "in_progress", "agent") + await rt.transition(v1.id, run.run_id, "A", "completed", "agent") + + const v2: GraphDef = { + ...v1, + version: "2.0", + nodes: [ + { id: "A", label: "Step A", artifact_type: "doc" }, + { id: "renamed", label: "Renamed Step", artifact_type: "doc" }, + ], + edges: [{ from: "A", to: "renamed" }], + } + await rt.registerGraph(v2) + + expect(await rt.next(v1.id, run.run_id)).toEqual(["B"]) + await rt.transition(v1.id, run.run_id, "B", "in_progress", "agent") + await expect( + rt.transition(v1.id, run.run_id, "renamed", "in_progress", "agent"), + ).rejects.toThrow(/Unknown node/) + }) + + it("legacy runs without graph_version use the current graph", async () => { + const v1 = linearGraph() + await rt.registerGraph(v1) + const run = await store.createRun(v1.id) + + const v2: GraphDef = { + ...v1, + version: "2.0", + nodes: [{ id: "A", label: "Step A", artifact_type: "doc" }], + edges: [], + } + await rt.registerGraph(v2) + + const graph = await rt.getRunGraph(run) + expect(graph?.version).toBe("2.0") + expect(await rt.next(v1.id, run.run_id)).toEqual(["A"]) + }) }) describe("subgraph integration", () => { diff --git a/src/workflow/runtime.ts b/src/workflow/runtime.ts index 97f0a93..eeb64bc 100644 --- a/src/workflow/runtime.ts +++ b/src/workflow/runtime.ts @@ -36,7 +36,7 @@ export class Runtime { async createRun(graphId: string, name?: string): Promise { const graph = await this.store.loadGraph(graphId) if (!graph) throw new Error(`Unknown graph: ${graphId}`) - return this.store.createRun(graphId, name) + return this.store.createRun(graphId, name, graph.version) } async getRun(runId: string): Promise { @@ -47,6 +47,10 @@ export class Runtime { return this.store.listRuns(graphId) } + async getRunGraph(run: Run): Promise { + return this.loadGraphForRun(run.graph_id, run) + } + // ---- Derived state ------------------------------------------------------ deriveNodeStates(graph: GraphDef, run: Run): Record { @@ -68,10 +72,10 @@ export class Runtime { } async next(graphId: string, runId: string): Promise { - const graph = await this.store.loadGraph(graphId) - if (!graph) throw new Error(`Unknown graph: ${graphId}`) const run = await this.store.loadRun(runId) if (!run) throw new Error(`Unknown run: ${runId}`) + const graph = await this.loadGraphForRun(graphId, run) + if (!graph) throw new Error(this.unknownGraphMessage(graphId, run)) const states = this.deriveNodeStates(graph, run) const available: string[] = [] @@ -113,10 +117,10 @@ export class Runtime { metadata?: Record }, ): Promise { - const graph = await this.store.loadGraph(graphId) - if (!graph) throw new Error(`Unknown graph: ${graphId}`) const run = await this.store.loadRun(runId) if (!run) throw new Error(`Unknown run: ${runId}`) + const graph = await this.loadGraphForRun(graphId, run) + if (!graph) throw new Error(this.unknownGraphMessage(graphId, run)) const nodeDef = graph.nodes.find((n) => n.id === nodeId) if (!nodeDef) throw new Error(`Unknown node: ${nodeId}`) @@ -191,6 +195,22 @@ export class Runtime { } } + private async loadGraphForRun( + graphId: string, + run: Run, + ): Promise { + if (run.graph_id !== graphId) return undefined + return this.store.loadGraph(graphId, run.graph_version) + } + + private unknownGraphMessage(graphId: string, run: Run): string { + if (run.graph_id !== graphId) return `Unknown graph: ${graphId}` + if (run.graph_version !== undefined) { + return `Unknown graph: ${graphId}@${run.graph_version}` + } + return `Unknown graph: ${graphId}` + } + private validateArtifactType(node: NodeDef, artifactType: string): void { // Resolve the expected type: structured contract wins over legacy shorthand. const expected = node.artifact?.type ?? node.artifact_type