Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions packages/core/src/session/compaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ type Dependencies = {
readonly config: readonly Config.Entry[]
}

type Input = {
readonly sessionID: SessionSchema.ID
readonly entries: readonly Entry[]
readonly model: Model
readonly request: LLMRequest
}

const estimate = (value: unknown) => Token.estimate(JSON.stringify(value))

const truncate = (value: string) =>
Expand Down Expand Up @@ -160,21 +167,10 @@ export const buildPrompt = (input: { readonly previousSummary?: string; readonly

export const make = (dependencies: Dependencies) => {
const config = settings(dependencies.config)
return Effect.fn("SessionCompaction.compactIfNeeded")(function* (input: {
readonly sessionID: SessionSchema.ID
readonly entries: readonly Entry[]
readonly model: Model
readonly request: LLMRequest
}) {
const compactAfterOverflow = Effect.fn("SessionCompaction.compactAfterOverflow")(function* (input: Input) {
const context = input.model.route.defaults.limits?.context
if (!config.auto || context === undefined || context <= 0) return false
if (context === undefined || context <= 0) return false
const output = input.request.generation?.maxTokens ?? input.model.route.defaults.limits?.output ?? 0
if (
estimate({ system: input.request.system, messages: input.request.messages, tools: input.request.tools }) <=
context - Math.max(output, config.buffer)
)
return false

const selected = select(input.entries, config.tokens)
const previousSummary = input.entries.find((entry) => entry.message.type === "compaction")?.message
if (!selected || (selected.head.length === 0 && previousSummary?.type !== "compaction")) return false
Expand All @@ -193,7 +189,8 @@ export const make = (dependencies: Dependencies) => {
})

const chunks: string[] = []
yield* dependencies.llm
let failed = false
const summarized = yield* dependencies.llm
.stream(
LLM.request({
model: input.model,
Expand All @@ -204,13 +201,15 @@ export const make = (dependencies: Dependencies) => {
)
.pipe(
Stream.runForEach((event) => {
if (!LLMEvent.is.textDelta(event)) return Effect.void
chunks.push(event.text)
if (LLMEvent.is.providerError(event)) failed = true
if (LLMEvent.is.textDelta(event)) chunks.push(event.text)
return Effect.void
}),
Effect.as(true),
Effect.catchTag("LLM.Error", () => Effect.succeed(false)),
)
const summary = chunks.join("")
if (!summary.trim()) return yield* Effect.die("Compaction returned an empty summary")
if (!summarized || failed || !summary.trim()) return false
yield* dependencies.events.publish(SessionEvent.Compaction.Ended, {
sessionID: input.sessionID,
messageID,
Expand All @@ -221,4 +220,20 @@ export const make = (dependencies: Dependencies) => {
})
return true
})
const compactIfNeeded = Effect.fn("SessionCompaction.compactIfNeeded")(function* (input: Input) {
if (!config.auto) return false
const context = input.model.route.defaults.limits?.context
if (context === undefined || context <= 0) return false
const output = input.request.generation?.maxTokens ?? input.model.route.defaults.limits?.output ?? 0
if (
estimate({ system: input.request.system, messages: input.request.messages, tools: input.request.tools }) <=
context - Math.max(output, config.buffer)
)
return false
return yield* compactAfterOverflow(input)
})
return {
compactIfNeeded,
compactAfterOverflow,
}
}
105 changes: 81 additions & 24 deletions packages/core/src/session/runner/llm.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { LLM, LLMClient, LLMError, LLMEvent, SystemPart } from "@opencode-ai/llm"
import { Cause, DateTime, Effect, FiberSet, Layer, Schema, Semaphore, Stream } from "effect"
import {
LLM,
LLMClient,
LLMError,
LLMEvent,
SystemPart,
isContextOverflowFailure,
type ProviderErrorEvent,
} from "@opencode-ai/llm"
import { Cause, DateTime, Effect, FiberSet, Layer, Option, Schema, Semaphore, Stream } from "effect"
import { AgentV2 } from "../../agent"
import { Config } from "../../config"
import { Database } from "../../database/database"
Expand Down Expand Up @@ -91,7 +99,7 @@ export const layer = Layer.effect(
const skillGuidance = yield* SkillGuidance.Service
const config = yield* Config.Service
const db = (yield* Database.Service).db
const compact = SessionCompaction.make({ events, llm, config: yield* config.entries() })
const compaction = SessionCompaction.make({ events, llm, config: yield* config.entries() })
const getSession = Effect.fn("SessionRunner.getSession")(function* (sessionID: SessionSchema.ID) {
const session = yield* store.get(sessionID)
if (!session) return yield* Effect.die(`Session not found: ${sessionID}`)
Expand Down Expand Up @@ -130,14 +138,29 @@ export const layer = Layer.effect(
const isQuestionRejected = (cause: Cause.Cause<unknown>) =>
cause.reasons.some((reason) => Cause.isDieReason(reason) && reason.defect instanceof QuestionV2.RejectedError)

class RetryTurn extends Error {
constructor(readonly promotion: SessionInput.Delivery | undefined) {
type TurnTransition =
// Request preparation observed a concurrent Session change and must restart from durable state.
| { readonly _tag: "RebuildPreparedTurn"; readonly promotion?: SessionInput.Delivery }
// Overflow compaction completed; rebuild once through the path without overflow recovery.
| { readonly _tag: "ContinueAfterOverflowCompaction" }

class TurnTransitionError extends Error {
constructor(readonly transition: TurnTransition) {
super()
}
}

const rebuildPreparedTurn = (promotion?: SessionInput.Delivery) =>
new TurnTransitionError({ _tag: "RebuildPreparedTurn", promotion })
const continueAfterOverflowCompaction = new TurnTransitionError({
_tag: "ContinueAfterOverflowCompaction",
})

const retryAgentMismatch = (promotion: SessionInput.Delivery | undefined) =>
Effect.catchDefect((defect) =>
defect instanceof SessionContextEpoch.AgentMismatch ? Effect.die(new RetryTurn(promotion)) : Effect.die(defect),
defect instanceof SessionContextEpoch.AgentMismatch
? Effect.die(rebuildPreparedTurn(promotion))
: Effect.die(defect),
)

const sameModel = Schema.toEquivalence(Schema.UndefinedOr(ModelV2.Ref))
Expand All @@ -149,6 +172,7 @@ export const layer = Layer.effect(
const runTurnAttempt = Effect.fn("SessionRunner.runTurn")(function* (
sessionID: SessionSchema.ID,
promotion: SessionInput.Delivery | undefined,
recoverOverflow?: typeof compaction.compactAfterOverflow,
) {
const session = yield* getSession(sessionID)
if (session.location.directory !== location.directory || session.location.workspaceID !== location.workspaceID)
Expand Down Expand Up @@ -183,7 +207,7 @@ export const layer = Layer.effect(
).pipe(retryAgentMismatch(undefined)))
const current = yield* getSession(sessionID)
if ((yield* agents.select(current.agent)).id !== agent.id || !sameModel(current.model, session.model))
return yield* Effect.die(new RetryTurn(undefined))
return yield* Effect.die(rebuildPreparedTurn())
const model = yield* models.resolve(session)
const entries = yield* SessionHistory.entriesForRunner(db, session.id, system.baselineSeq)
const context = entries.map((entry) => entry.message)
Expand All @@ -195,8 +219,8 @@ export const layer = Layer.effect(
messages: toLLMMessages(context, model),
tools: yield* tools.definitions(),
})
if (yield* compact({ sessionID: session.id, entries, model, request }))
return yield* Effect.die(new RetryTurn(undefined))
if (yield* compaction.compactIfNeeded({ sessionID: session.id, entries, model, request }))
return yield* Effect.die(rebuildPreparedTurn())
const publisher = createLLMEventPublisher(events, {
sessionID: session.id,
agent: agent.id,
Expand All @@ -209,11 +233,19 @@ export const layer = Layer.effect(
const withPublication = Semaphore.makeUnsafe(1).withPermit
const publish = (event: LLMEvent, outputPaths: ReadonlyArray<string> = []) =>
withPublication(publisher.publish(event, outputPaths))
let overflowFailure: ProviderErrorEvent | undefined
if (!(yield* SessionContextEpoch.current(db, session.id, agent.id, system.revision)))
return yield* Effect.die(new RetryTurn(undefined))
return yield* Effect.die(rebuildPreparedTurn())
const providerStream = llm.stream(request).pipe(
Stream.runForEach((event) =>
Effect.gen(function* () {
if (overflowFailure || publisher.hasProviderError()) return
if (LLMEvent.is.providerError(event)) {
if (isContextOverflowFailure(event) && !publisher.hasAssistantStarted()) {
overflowFailure = event
return
}
}
yield* publish(event)
if (event.type !== "tool-call" || event.providerExecuted) return
needsContinuation = true
Expand Down Expand Up @@ -248,13 +280,17 @@ export const layer = Layer.effect(
return yield* Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const stream = yield* restore(providerStream).pipe(Effect.exit)
let llmFailure: LLMError | undefined
if (stream._tag === "Failure") {
for (const reason of stream.cause.reasons) {
if (!Cause.isFailReason(reason)) continue
if (reason.error instanceof LLMError) llmFailure = reason.error
}
}
const failure =
stream._tag === "Failure" ? Option.getOrUndefined(Cause.findErrorOption(stream.cause)) : undefined
if (
recoverOverflow &&
!publisher.hasAssistantStarted() &&
isContextOverflowFailure(overflowFailure ?? failure) &&
(yield* restore(recoverOverflow({ sessionID: session.id, entries, model, request })))
)
return yield* Effect.die(continueAfterOverflowCompaction)
if (overflowFailure) yield* publish(overflowFailure)
const llmFailure = failure instanceof LLMError ? failure : undefined
if (llmFailure && !publisher.hasProviderError()) {
yield* withPublication(publisher.failUnsettledTools("Provider did not return a tool result", true))
yield* withPublication(
Expand Down Expand Up @@ -290,17 +326,38 @@ export const layer = Layer.effect(
}),
)
}, Effect.scoped)
const runTurn: (
type RunTurn = (
sessionID: SessionSchema.ID,
promotion: SessionInput.Delivery | undefined,
) => Effect.Effect<boolean, RunError> = (sessionID, promotion) =>
runTurnAttempt(sessionID, promotion).pipe(
Effect.catchDefect((defect) =>
defect instanceof RetryTurn
? Effect.yieldNow.pipe(Effect.andThen(runTurn(sessionID, defect.promotion)))
: Effect.die(defect),
) => Effect.Effect<boolean, RunError>

const runAfterOverflowCompaction: RunTurn = Effect.fnUntraced(function* (sessionID, promotion) {
return yield* runTurnAttempt(sessionID, promotion).pipe(
Effect.catchDefect(
Effect.fnUntraced(function* (defect) {
if (!(defect instanceof TurnTransitionError)) return yield* Effect.die(defect)
if (defect.transition._tag === "ContinueAfterOverflowCompaction")
return yield* Effect.die("Post-compaction provider attempt cannot recover another overflow")
yield* Effect.yieldNow
return yield* runAfterOverflowCompaction(sessionID, defect.transition.promotion)
}),
),
)
})

const runTurn: RunTurn = Effect.fnUntraced(function* (sessionID, promotion) {
return yield* runTurnAttempt(sessionID, promotion, compaction.compactAfterOverflow).pipe(
Effect.catchDefect(
Effect.fnUntraced(function* (defect) {
if (!(defect instanceof TurnTransitionError)) return yield* Effect.die(defect)
yield* Effect.yieldNow
if (defect.transition._tag === "ContinueAfterOverflowCompaction")
return yield* runAfterOverflowCompaction(sessionID, undefined)
return yield* runTurn(sessionID, defect.transition.promotion)
}),
),
)
})

const run = Effect.fn("SessionRunner.run")(function* (input: {
readonly sessionID: SessionSchema.ID
Expand Down
14 changes: 10 additions & 4 deletions packages/core/src/session/runner/publish-llm-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)

const startToolInput = Effect.fnUntraced(function* (event: { readonly id: string; readonly name: string }) {
if (tools.has(event.id)) return yield* Effect.die(`Duplicate tool input start: ${event.id}`)
const assistantMessageID = yield* currentAssistantMessageID()
const assistantMessageID = yield* startAssistant()
tools.set(event.id, {
assistantMessageID,
name: event.name,
Expand Down Expand Up @@ -224,7 +224,6 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
) {
switch (event.type) {
case "step-start":
yield* startAssistant()
return
case "text-start":
yield* text.start(event.id)
Expand Down Expand Up @@ -381,7 +380,7 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
yield* events.publish(SessionEvent.Step.Ended, {
sessionID: input.sessionID,
timestamp: yield* timestamp,
assistantMessageID: yield* currentAssistantMessageID(),
assistantMessageID: yield* startAssistant(),
finish: event.reason,
cost: 0,
tokens: tokens(event.usage),
Expand All @@ -402,5 +401,12 @@ export const createLLMEventPublisher = (events: EventV2.Interface, input: Input)
}
})

return { publish, flush, failUnsettledTools, hasProviderError: () => providerFailed, startAssistant }
return {
publish,
flush,
failUnsettledTools,
hasAssistantStarted: () => assistantMessageID !== undefined,
hasProviderError: () => providerFailed,
startAssistant,
}
}
Loading
Loading