diff --git a/packages/opencode/src/acp/event.ts b/packages/opencode/src/acp/event.ts index 05b8b77b370d..4bbb6b3ed322 100644 --- a/packages/opencode/src/acp/event.ts +++ b/packages/opencode/src/acp/event.ts @@ -1,4 +1,4 @@ -import type { AgentSideConnection } from "@agentclientprotocol/sdk" +import type { AgentSideConnection, PlanEntry } from "@agentclientprotocol/sdk" import * as Log from "@opencode-ai/core/util/log" import type { Event, @@ -43,6 +43,9 @@ export class Subscription { private readonly abort = new AbortController() private readonly shellSnapshots = new Map() private readonly toolStarts = new Set() + private readonly lastPlanBySession = new Map() + private readonly lastPlanFingerprintBySession = new Map() + private readonly everInProgressBySession = new Map>() private readonly permission: ACPPermission.Handler private started = false @@ -78,6 +81,10 @@ export class Subscription { return this.handlePartUpdated(event) case "message.part.delta": return this.handlePartDelta(event) + case "todo.updated": + return this.handleTodoUpdated(event) + case "session.idle": + return this.handleSessionIdle(event) } } @@ -339,6 +346,81 @@ export class Subscription { this.toolStarts.delete(toolCallId) this.shellSnapshots.delete(toolCallId) } + + private async handleTodoUpdated(event: Extract) { + const sessionId = event.properties.sessionID + const session = await Effect.runPromise(this.input.session.tryGet(sessionId)) + if (!session) return + const entries = event.properties.todos.flatMap((todo) => { + const entry = toPlanEntry(todo) + return entry ? [entry] : [] + }) + + const everStarted = this.everInProgressBySession.get(sessionId) ?? new Set() + for (const entry of entries) { + if (entry.status === "in_progress") everStarted.add(entry.content) + } + if (everStarted.size > 0) this.everInProgressBySession.set(sessionId, everStarted) + + await this.sendPlan(sessionId, entries) + } + + // When a turn ends, any entry the agent ever started during the session + // should be considered done — either it really was finished (model forgot to + // mark it completed) or the model demoted it back to pending without + // updating it again before stopping. Both render as a non-checked item in + // ACP clients, which contradicts the visible reality that work stopped. + // Genuinely planned-but-never-started entries (status: pending and never + // in_progress) are left alone so future plans still render correctly. + private async handleSessionIdle(event: Extract) { + const sessionId = event.properties.sessionID + const previous = this.lastPlanBySession.get(sessionId) + if (!previous) return + const everStarted = this.everInProgressBySession.get(sessionId) + if (!everStarted || everStarted.size === 0) return + if (!previous.some((entry) => everStarted.has(entry.content) && entry.status !== "completed")) { + return + } + const next = previous.map((entry) => + everStarted.has(entry.content) && entry.status !== "completed" + ? { ...entry, status: "completed" as const } + : entry, + ) + await this.sendPlan(sessionId, next) + } + + private async sendPlan(sessionId: string, entries: PlanEntry[]) { + const fingerprint = JSON.stringify(entries) + if (this.lastPlanFingerprintBySession.get(sessionId) === fingerprint) return + this.lastPlanFingerprintBySession.set(sessionId, fingerprint) + this.lastPlanBySession.set(sessionId, entries) + await this.input.connection.sessionUpdate({ + sessionId, + update: { + sessionUpdate: "plan", + entries, + }, + }) + } +} + +// opencode's todowrite allows a "cancelled" status and free-form priority; +// ACP's PlanEntry only models pending/in_progress/completed and high/medium/low, +// so collapse cancelled into completed and default unknown priorities to medium. +function toPlanEntry(raw: unknown): PlanEntry | undefined { + if (!raw || typeof raw !== "object") return undefined + const todo = raw as { content?: unknown; status?: unknown; priority?: unknown } + if (typeof todo.content !== "string" || todo.content.length === 0) return undefined + return { + content: todo.content, + status: + todo.status === "in_progress" + ? "in_progress" + : todo.status === "completed" || todo.status === "cancelled" + ? "completed" + : "pending", + priority: todo.priority === "high" || todo.priority === "low" ? todo.priority : "medium", + } } export * as ACPEvent from "./event" diff --git a/packages/opencode/src/session/todo.ts b/packages/opencode/src/session/todo.ts index 37598f9d560b..39a7331f2c39 100644 --- a/packages/opencode/src/session/todo.ts +++ b/packages/opencode/src/session/todo.ts @@ -3,7 +3,8 @@ import { Effect, Layer, Context, Schema } from "effect" import { Database } from "@opencode-ai/core/database/database" import { eq } from "drizzle-orm" import { asc } from "drizzle-orm" -import { TodoTable } from "@opencode-ai/core/session/sql" +import { SessionTable, TodoTable } from "@opencode-ai/core/session/sql" +import { AbsolutePath } from "@opencode-ai/core/schema" import { EventV2Bridge } from "@/event-v2-bridge" import { EventV2 } from "@opencode-ai/core/event" @@ -60,7 +61,21 @@ export const layer = Layer.effect( }), ) .pipe(Effect.orDie) - yield* events.publish(Event.Updated, input) + // The SSE event stream filters by location.directory, so events + // published without a location are dropped before reaching external + // subscribers (e.g. ACP clients) even though in-process subscribers + // like the TUI still see them. Attach the owning session's directory + // so the event reaches every subscriber. + const row = yield* db + .select({ directory: SessionTable.directory, workspaceID: SessionTable.workspace_id }) + .from(SessionTable) + .where(eq(SessionTable.id, input.sessionID)) + .get() + .pipe(Effect.orDie) + const location = row + ? { directory: AbsolutePath.make(row.directory), workspaceID: row.workspaceID ?? undefined } + : undefined + yield* events.publish(Event.Updated, input, location ? { location } : undefined) }) const get = Effect.fn("Todo.get")(function* (sessionID: SessionID) { diff --git a/packages/opencode/test/acp/event.test.ts b/packages/opencode/test/acp/event.test.ts index 6edd7f0fbba0..cf2d370ec2f5 100644 --- a/packages/opencode/test/acp/event.test.ts +++ b/packages/opencode/test/acp/event.test.ts @@ -276,6 +276,28 @@ function completedTool( } satisfies ToolPart } +function todoUpdatedEvent( + sessionID: string, + todos: Array<{ content: string; status: string; priority?: string }>, +): Event { + return { + id: `evt_todo_${sessionID}_${Math.random()}`, + type: "todo.updated", + properties: { + sessionID, + todos: todos.map((todo) => ({ ...todo, priority: todo.priority ?? "" })), + }, + } +} + +function sessionIdleEvent(sessionID: string): Event { + return { + id: `evt_idle_${sessionID}_${Math.random()}`, + type: "session.idle", + properties: { sessionID }, + } +} + function errorTool(sessionID: string, callID: string) { return { id: `part_${callID}`, @@ -674,6 +696,120 @@ describe("acp event routing", () => { }) }) + it("emits an ACP plan session update from todo.updated events", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_plan", cwd: "/workspace" })) + + await harness.subscription.handle( + todoUpdatedEvent("ses_plan", [ + { content: "first", status: "in_progress", priority: "high" }, + { content: "second", status: "pending", priority: "weird" }, + { content: "third", status: "cancelled", priority: "low" }, + { content: "fourth", status: "completed" }, + ]), + ) + + const planUpdates = harness.updates.filter((u) => u.update.sessionUpdate === "plan") + expect(planUpdates).toHaveLength(1) + expect(planUpdates[0]).toMatchObject({ + sessionId: "ses_plan", + update: { + sessionUpdate: "plan", + entries: [ + { content: "first", status: "in_progress", priority: "high" }, + { content: "second", status: "pending", priority: "medium" }, + { content: "third", status: "completed", priority: "low" }, + { content: "fourth", status: "completed", priority: "medium" }, + ], + }, + }) + }) + + it("dedupes identical sequential plan updates and re-emits on change", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_plan2", cwd: "/workspace" })) + + const todos = [{ content: "a", status: "pending", priority: "medium" }] + await harness.subscription.handle(todoUpdatedEvent("ses_plan2", todos)) + await harness.subscription.handle(todoUpdatedEvent("ses_plan2", todos)) + await harness.subscription.handle( + todoUpdatedEvent("ses_plan2", [{ content: "a", status: "in_progress", priority: "medium" }]), + ) + + const planUpdates = harness.updates.filter((u) => u.update.sessionUpdate === "plan") + expect(planUpdates).toHaveLength(2) + expect(planUpdates[0]?.update).toMatchObject({ entries: [{ status: "pending" }] }) + expect(planUpdates[1]?.update).toMatchObject({ entries: [{ status: "in_progress" }] }) + }) + + it("promotes leftover in_progress entries to completed on session.idle", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_idle", cwd: "/workspace" })) + + await harness.subscription.handle( + todoUpdatedEvent("ses_idle", [ + { content: "done", status: "completed", priority: "medium" }, + { content: "stuck", status: "in_progress", priority: "high" }, + { content: "later", status: "pending", priority: "low" }, + ]), + ) + await harness.subscription.handle(sessionIdleEvent("ses_idle")) + + const planUpdates = harness.updates.filter((u) => u.update.sessionUpdate === "plan") + expect(planUpdates).toHaveLength(2) + expect(planUpdates[1]?.update).toMatchObject({ + entries: [ + { content: "done", status: "completed" }, + { content: "stuck", status: "completed" }, + { content: "later", status: "pending" }, + ], + }) + }) + + it("promotes entries demoted from in_progress back to pending on session.idle", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_demoted", cwd: "/workspace" })) + + await harness.subscription.handle( + todoUpdatedEvent("ses_demoted", [ + { content: "done", status: "completed", priority: "medium" }, + { content: "active", status: "in_progress", priority: "high" }, + ]), + ) + await harness.subscription.handle( + todoUpdatedEvent("ses_demoted", [ + { content: "done", status: "completed", priority: "medium" }, + { content: "active", status: "pending", priority: "high" }, + ]), + ) + await harness.subscription.handle(sessionIdleEvent("ses_demoted")) + + const planUpdates = harness.updates.filter((u) => u.update.sessionUpdate === "plan") + expect(planUpdates).toHaveLength(3) + expect(planUpdates[2]?.update).toMatchObject({ + entries: [ + { content: "done", status: "completed" }, + { content: "active", status: "completed" }, + ], + }) + }) + + it("does not promote pending entries that were never in_progress on session.idle", async () => { + const harness = createHarness() + await Effect.runPromise(harness.session.create({ id: "ses_idle2", cwd: "/workspace" })) + + await harness.subscription.handle( + todoUpdatedEvent("ses_idle2", [ + { content: "done", status: "completed", priority: "medium" }, + { content: "future", status: "pending", priority: "low" }, + ]), + ) + await harness.subscription.handle(sessionIdleEvent("ses_idle2")) + + const planUpdates = harness.updates.filter((u) => u.update.sessionUpdate === "plan") + expect(planUpdates).toHaveLength(1) + }) + it("emits image attachments as ACP image content for live and replayed completed tool updates", async () => { const harness = createHarness() const image = Buffer.from("image-data").toString("base64")