Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion packages/opencode/src/acp/event.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { AgentSideConnection } from "@agentclientprotocol/sdk"
import type { AgentSideConnection, PlanEntry } from "@agentclientprotocol/sdk"
import * as Log from "@opencode-ai/core/util/log"
import type {
Event,
Expand Down Expand Up @@ -43,6 +43,9 @@ export class Subscription {
private readonly abort = new AbortController()
private readonly shellSnapshots = new Map<string, string>()
private readonly toolStarts = new Set<string>()
private readonly lastPlanBySession = new Map<string, PlanEntry[]>()
private readonly lastPlanFingerprintBySession = new Map<string, string>()
private readonly everInProgressBySession = new Map<string, Set<string>>()
private readonly permission: ACPPermission.Handler
private started = false

Expand Down Expand Up @@ -78,6 +81,10 @@ export class Subscription {
return this.handlePartUpdated(event)
case "message.part.delta":
return this.handlePartDelta(event)
case "todo.updated":
return this.handleTodoUpdated(event)
case "session.idle":
return this.handleSessionIdle(event)
}
}

Expand Down Expand Up @@ -339,6 +346,81 @@ export class Subscription {
this.toolStarts.delete(toolCallId)
this.shellSnapshots.delete(toolCallId)
}

private async handleTodoUpdated(event: Extract<Event, { type: "todo.updated" }>) {
const sessionId = event.properties.sessionID
const session = await Effect.runPromise(this.input.session.tryGet(sessionId))
if (!session) return
const entries = event.properties.todos.flatMap((todo) => {
const entry = toPlanEntry(todo)
return entry ? [entry] : []
})

const everStarted = this.everInProgressBySession.get(sessionId) ?? new Set<string>()
for (const entry of entries) {
if (entry.status === "in_progress") everStarted.add(entry.content)
}
if (everStarted.size > 0) this.everInProgressBySession.set(sessionId, everStarted)

await this.sendPlan(sessionId, entries)
}

// When a turn ends, any entry the agent ever started during the session
// should be considered done — either it really was finished (model forgot to
// mark it completed) or the model demoted it back to pending without
// updating it again before stopping. Both render as a non-checked item in
// ACP clients, which contradicts the visible reality that work stopped.
// Genuinely planned-but-never-started entries (status: pending and never
// in_progress) are left alone so future plans still render correctly.
private async handleSessionIdle(event: Extract<Event, { type: "session.idle" }>) {
const sessionId = event.properties.sessionID
const previous = this.lastPlanBySession.get(sessionId)
if (!previous) return
const everStarted = this.everInProgressBySession.get(sessionId)
if (!everStarted || everStarted.size === 0) return
if (!previous.some((entry) => everStarted.has(entry.content) && entry.status !== "completed")) {
return
}
const next = previous.map((entry) =>
everStarted.has(entry.content) && entry.status !== "completed"
? { ...entry, status: "completed" as const }
: entry,
)
await this.sendPlan(sessionId, next)
}

private async sendPlan(sessionId: string, entries: PlanEntry[]) {
const fingerprint = JSON.stringify(entries)
if (this.lastPlanFingerprintBySession.get(sessionId) === fingerprint) return
this.lastPlanFingerprintBySession.set(sessionId, fingerprint)
this.lastPlanBySession.set(sessionId, entries)
await this.input.connection.sessionUpdate({
sessionId,
update: {
sessionUpdate: "plan",
entries,
},
})
}
}

// opencode's todowrite allows a "cancelled" status and free-form priority;
// ACP's PlanEntry only models pending/in_progress/completed and high/medium/low,
// so collapse cancelled into completed and default unknown priorities to medium.
function toPlanEntry(raw: unknown): PlanEntry | undefined {
if (!raw || typeof raw !== "object") return undefined
const todo = raw as { content?: unknown; status?: unknown; priority?: unknown }
if (typeof todo.content !== "string" || todo.content.length === 0) return undefined
return {
content: todo.content,
status:
todo.status === "in_progress"
? "in_progress"
: todo.status === "completed" || todo.status === "cancelled"
? "completed"
: "pending",
priority: todo.priority === "high" || todo.priority === "low" ? todo.priority : "medium",
}
}

export * as ACPEvent from "./event"
19 changes: 17 additions & 2 deletions packages/opencode/src/session/todo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { Effect, Layer, Context, Schema } from "effect"
import { Database } from "@opencode-ai/core/database/database"
import { eq } from "drizzle-orm"
import { asc } from "drizzle-orm"
import { TodoTable } from "@opencode-ai/core/session/sql"
import { SessionTable, TodoTable } from "@opencode-ai/core/session/sql"
import { AbsolutePath } from "@opencode-ai/core/schema"
import { EventV2Bridge } from "@/event-v2-bridge"
import { EventV2 } from "@opencode-ai/core/event"

Expand Down Expand Up @@ -60,7 +61,21 @@ export const layer = Layer.effect(
}),
)
.pipe(Effect.orDie)
yield* events.publish(Event.Updated, input)
// The SSE event stream filters by location.directory, so events
// published without a location are dropped before reaching external
// subscribers (e.g. ACP clients) even though in-process subscribers
// like the TUI still see them. Attach the owning session's directory
// so the event reaches every subscriber.
const row = yield* db
.select({ directory: SessionTable.directory, workspaceID: SessionTable.workspace_id })
.from(SessionTable)
.where(eq(SessionTable.id, input.sessionID))
.get()
.pipe(Effect.orDie)
const location = row
? { directory: AbsolutePath.make(row.directory), workspaceID: row.workspaceID ?? undefined }
: undefined
yield* events.publish(Event.Updated, input, location ? { location } : undefined)
})

const get = Effect.fn("Todo.get")(function* (sessionID: SessionID) {
Expand Down
136 changes: 136 additions & 0 deletions packages/opencode/test/acp/event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,28 @@ function completedTool(
} satisfies ToolPart
}

function todoUpdatedEvent(
sessionID: string,
todos: Array<{ content: string; status: string; priority?: string }>,
): Event {
return {
id: `evt_todo_${sessionID}_${Math.random()}`,
type: "todo.updated",
properties: {
sessionID,
todos: todos.map((todo) => ({ ...todo, priority: todo.priority ?? "" })),
},
}
}

function sessionIdleEvent(sessionID: string): Event {
return {
id: `evt_idle_${sessionID}_${Math.random()}`,
type: "session.idle",
properties: { sessionID },
}
}

function errorTool(sessionID: string, callID: string) {
return {
id: `part_${callID}`,
Expand Down Expand Up @@ -674,6 +696,120 @@ describe("acp event routing", () => {
})
})

it("emits an ACP plan session update from todo.updated events", async () => {
const harness = createHarness()
await Effect.runPromise(harness.session.create({ id: "ses_plan", cwd: "/workspace" }))

await harness.subscription.handle(
todoUpdatedEvent("ses_plan", [
{ content: "first", status: "in_progress", priority: "high" },
{ content: "second", status: "pending", priority: "weird" },
{ content: "third", status: "cancelled", priority: "low" },
{ content: "fourth", status: "completed" },
]),
)

const planUpdates = harness.updates.filter((u) => u.update.sessionUpdate === "plan")
expect(planUpdates).toHaveLength(1)
expect(planUpdates[0]).toMatchObject({
sessionId: "ses_plan",
update: {
sessionUpdate: "plan",
entries: [
{ content: "first", status: "in_progress", priority: "high" },
{ content: "second", status: "pending", priority: "medium" },
{ content: "third", status: "completed", priority: "low" },
{ content: "fourth", status: "completed", priority: "medium" },
],
},
})
})

it("dedupes identical sequential plan updates and re-emits on change", async () => {
const harness = createHarness()
await Effect.runPromise(harness.session.create({ id: "ses_plan2", cwd: "/workspace" }))

const todos = [{ content: "a", status: "pending", priority: "medium" }]
await harness.subscription.handle(todoUpdatedEvent("ses_plan2", todos))
await harness.subscription.handle(todoUpdatedEvent("ses_plan2", todos))
await harness.subscription.handle(
todoUpdatedEvent("ses_plan2", [{ content: "a", status: "in_progress", priority: "medium" }]),
)

const planUpdates = harness.updates.filter((u) => u.update.sessionUpdate === "plan")
expect(planUpdates).toHaveLength(2)
expect(planUpdates[0]?.update).toMatchObject({ entries: [{ status: "pending" }] })
expect(planUpdates[1]?.update).toMatchObject({ entries: [{ status: "in_progress" }] })
})

it("promotes leftover in_progress entries to completed on session.idle", async () => {
const harness = createHarness()
await Effect.runPromise(harness.session.create({ id: "ses_idle", cwd: "/workspace" }))

await harness.subscription.handle(
todoUpdatedEvent("ses_idle", [
{ content: "done", status: "completed", priority: "medium" },
{ content: "stuck", status: "in_progress", priority: "high" },
{ content: "later", status: "pending", priority: "low" },
]),
)
await harness.subscription.handle(sessionIdleEvent("ses_idle"))

const planUpdates = harness.updates.filter((u) => u.update.sessionUpdate === "plan")
expect(planUpdates).toHaveLength(2)
expect(planUpdates[1]?.update).toMatchObject({
entries: [
{ content: "done", status: "completed" },
{ content: "stuck", status: "completed" },
{ content: "later", status: "pending" },
],
})
})

it("promotes entries demoted from in_progress back to pending on session.idle", async () => {
const harness = createHarness()
await Effect.runPromise(harness.session.create({ id: "ses_demoted", cwd: "/workspace" }))

await harness.subscription.handle(
todoUpdatedEvent("ses_demoted", [
{ content: "done", status: "completed", priority: "medium" },
{ content: "active", status: "in_progress", priority: "high" },
]),
)
await harness.subscription.handle(
todoUpdatedEvent("ses_demoted", [
{ content: "done", status: "completed", priority: "medium" },
{ content: "active", status: "pending", priority: "high" },
]),
)
await harness.subscription.handle(sessionIdleEvent("ses_demoted"))

const planUpdates = harness.updates.filter((u) => u.update.sessionUpdate === "plan")
expect(planUpdates).toHaveLength(3)
expect(planUpdates[2]?.update).toMatchObject({
entries: [
{ content: "done", status: "completed" },
{ content: "active", status: "completed" },
],
})
})

it("does not promote pending entries that were never in_progress on session.idle", async () => {
const harness = createHarness()
await Effect.runPromise(harness.session.create({ id: "ses_idle2", cwd: "/workspace" }))

await harness.subscription.handle(
todoUpdatedEvent("ses_idle2", [
{ content: "done", status: "completed", priority: "medium" },
{ content: "future", status: "pending", priority: "low" },
]),
)
await harness.subscription.handle(sessionIdleEvent("ses_idle2"))

const planUpdates = harness.updates.filter((u) => u.update.sessionUpdate === "plan")
expect(planUpdates).toHaveLength(1)
})

it("emits image attachments as ACP image content for live and replayed completed tool updates", async () => {
const harness = createHarness()
const image = Buffer.from("image-data").toString("base64")
Expand Down
Loading