Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 265 additions & 61 deletions crates/tui/src/core/engine/capacity_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,34 +659,18 @@ impl Engine {
.persist_capacity_record(turn, GuardrailAction::VerifyAndReplan, &record)
.await;

let latest_user = self
.session
.messages
.iter()
.rev()
.find(|msg| {
msg.role == "user"
&& msg
.content
.iter()
.any(|block| matches!(block, ContentBlock::Text { .. }))
})
.cloned();
let latest_verified = self
.session
.messages
.iter()
.rev()
.find(|msg| {
msg.role == "user"
&& msg.content.iter().any(|block| match block {
ContentBlock::ToolResult { content, .. } => {
content.contains("[verification replay]")
}
_ => false,
})
})
.cloned();
// The replan path needs the *full* messages, not summaries.
// `scan_canonical_inputs` already located the indices in a single
// reverse pass; clone from the live `messages` slice once. We
// pass `true` because the replan path consumes
// `latest_verified_user_idx` below.
let scan = scan_canonical_inputs(&self.session.messages, true);
let latest_user = scan
.latest_user_text_idx
.and_then(|idx| self.session.messages.get(idx).cloned());
let latest_verified = scan
.latest_verified_user_idx
.and_then(|idx| self.session.messages.get(idx).cloned());

self.session.messages.clear();
if let Some(msg) = latest_user {
Expand Down Expand Up @@ -765,20 +749,18 @@ impl Engine {
turn: &TurnContext,
note: Option<&str>,
) -> CanonicalState {
let goal = self
.session
.messages
.iter()
.rev()
.find_map(|msg| {
if msg.role != "user" {
return None;
}
msg.content.iter().find_map(|block| match block {
ContentBlock::Text { text, .. } => Some(summarize_text(text, 220)),
_ => None,
})
})
// Single reverse scan of session.messages collects the goal,
// confirmed facts (capped at 4), and the latest verified-user
// message index. Previously this function did two reverse
// `.iter().rev().find_map()` walks and a third for facts; the
// dedicated scan below replaces all three with one pass that
// also early-exits once every collector is satisfied. We pass
// `false` here because build_canonical_state does not consume
// `latest_verified_user_idx`, so we don't need the scan to keep
// looking for it.
let scan = scan_canonical_inputs(&self.session.messages, false);
let goal = scan
.goal
.unwrap_or_else(|| "Continue current task from compact state".to_string());

let mut constraints = vec![
Expand All @@ -789,24 +771,6 @@ impl Engine {
constraints.push(summarize_text(note, 180));
}

let mut confirmed_facts = Vec::new();
for msg in self.session.messages.iter().rev() {
for block in &msg.content {
if let ContentBlock::ToolResult { content, .. } = block {
if content.starts_with("Error:") {
continue;
}
confirmed_facts.push(summarize_text(content, 180));
if confirmed_facts.len() >= 4 {
break;
}
}
}
if confirmed_facts.len() >= 4 {
break;
}
}

let open_loops: Vec<String> = turn
.tool_calls
.iter()
Expand Down Expand Up @@ -837,7 +801,7 @@ impl Engine {
CanonicalState {
goal,
constraints,
confirmed_facts,
confirmed_facts: scan.confirmed_facts,
open_loops,
pending_actions,
critical_refs,
Expand Down Expand Up @@ -975,3 +939,243 @@ impl Engine {
self.merge_compaction_summary(Some(prompt));
}
}

/// Maximum number of confirmed-fact snippets retained by the canonical-state
/// scan. Matches the prior `build_canonical_state` behavior — only the
/// four most recent non-error tool results are surfaced.
const CANONICAL_SCAN_MAX_FACTS: usize = 4;

/// Output of [`scan_canonical_inputs`]: everything `build_canonical_state`
/// and `apply_verify_and_replan` need to know about the session's recent
/// history, collected in a single reverse pass over `session.messages`.
///
/// Index fields (`latest_user_text_idx`, `latest_verified_user_idx`) point
/// into the original `messages` slice so the caller can clone the full
/// `Message` value when the re-plan path needs to keep it across a
/// `messages.clear()`.
#[derive(Debug, Default)]
struct CanonicalStateScan {
/// Most recent user-text block, summarized to ≤220 chars. `None` when
/// no user message with a Text block exists.
goal: Option<String>,
/// Index of the most recent user message containing at least one
/// `Text` content block. Used by the re-plan path to keep the
/// latest user request across a `messages.clear()`.
latest_user_text_idx: Option<usize>,
/// Index of the most recent user message whose content includes a
/// `[verification replay]` tool result. Used by the re-plan path.
latest_verified_user_idx: Option<usize>,
/// Up to [`CANONICAL_SCAN_MAX_FACTS`] most recent non-error
/// `ToolResult` snippets, newest first.
confirmed_facts: Vec<String>,
/// Running count of facts collected so far; lets the early-exit
/// condition avoid an extra `Vec::len()` call per message.
facts_collected: usize,
}

impl CanonicalStateScan {
/// `true` once every collector the caller actually needs is satisfied.
///
/// `find_verified` controls whether `latest_verified_user_idx` is part
/// of the early-exit gate. The build_canonical_state path does not
/// consume that field, so passing `false` lets the scan stop as soon
/// as the goal and `CANONICAL_SCAN_MAX_FACTS` facts are found — a
/// huge win on long histories with no verification replay.
fn is_complete(&self, find_verified: bool) -> bool {
self.goal.is_some()
&& (!find_verified || self.latest_verified_user_idx.is_some())
&& self.facts_collected >= CANONICAL_SCAN_MAX_FACTS
}
}

/// Walk `messages` once (in reverse) and collect everything the canonical
/// state and re-plan paths need. Replaces the previous pattern of three
/// independent reverse scans: one for the goal, one for confirmed facts,
/// and one for the latest verified user message.
///
/// `find_verified` controls whether the scan bothers locating the
/// latest verified user message. Callers that don't need it (e.g.
/// `build_canonical_state`) should pass `false` so the early-exit
/// condition can fire as soon as the goal + facts are gathered.
fn scan_canonical_inputs(messages: &[Message], find_verified: bool) -> CanonicalStateScan {
let mut scan = CanonicalStateScan::default();
for (idx, msg) in messages.iter().enumerate().rev() {
if msg.role == "user" {
if scan.goal.is_none()
&& let Some(text) = msg.content.iter().find_map(|b| match b {
ContentBlock::Text { text, .. } => Some(text.as_str()),
_ => None,
})
{
scan.goal = Some(summarize_text(text, 220));
scan.latest_user_text_idx = Some(idx);
}
if find_verified && scan.latest_verified_user_idx.is_none() {
let verified = msg.content.iter().any(|b| match b {
ContentBlock::ToolResult { content, .. } => {
content.contains("[verification replay]")
}
_ => false,
});
if verified {
scan.latest_verified_user_idx = Some(idx);
}
}
}
if scan.facts_collected < CANONICAL_SCAN_MAX_FACTS {
for block in &msg.content {
if let ContentBlock::ToolResult { content, .. } = block
&& !content.starts_with("Error:")
{
scan.confirmed_facts.push(summarize_text(content, 180));
scan.facts_collected = scan.facts_collected.saturating_add(1);
if scan.facts_collected >= CANONICAL_SCAN_MAX_FACTS {
break;
}
}
}
}
if scan.is_complete(find_verified) {
break;
}
}
scan
}
Comment on lines +976 to +1043
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Performance Issue: Early-Exit Optimization Bypassed in Common Cases

In the current implementation, is_complete() requires latest_verified_user_idx.is_some() to return true:

    fn is_complete(&self) -> bool {
        self.goal.is_some()
            && self.latest_verified_user_idx.is_some()
            && self.facts_collected >= CANONICAL_SCAN_MAX_FACTS
    }

However, in the vast majority of normal runs, there is no verified user message in the session history (since verification replays only happen under specific high-risk conditions).

This leads to three major issues:

  1. Bypassed Optimization in build_canonical_state: build_canonical_state does not use latest_verified_user_idx at all, yet it calls scan_canonical_inputs which is forced to scan the entire message history because is_complete() will never return true.
  2. Bypassed Optimization in apply_verify_and_replan: Even when we do want to find the verified message, if none exists, the loop will scan the entire history instead of early-exiting once the goal and 4 facts are found.
  3. Broken Test: The test scan_early_exits_when_complete actually does not early-exit in the current PR code because there is no verified message in the test input, meaning it scans all 1000 messages.

Solution

Introduce a find_verified: bool parameter to scan_canonical_inputs and is_complete. When find_verified is false (such as in build_canonical_state), we can skip searching for the verified message and allow the loop to early-exit immediately once the goal and 4 facts are collected.

impl CanonicalStateScan {
    /// `true` once every collector is satisfied. The single-pass
    /// caller can use this to break out of the reverse iteration.
    fn is_complete(&self, find_verified: bool) -> bool {
        self.goal.is_some()
            && (!find_verified || self.latest_verified_user_idx.is_some())
            && self.facts_collected >= CANONICAL_SCAN_MAX_FACTS
    }
}

/// Walk `messages` once (in reverse) and collect everything the canonical
/// state and re-plan paths need. Replaces the previous pattern of three
/// independent reverse scans: one for the goal, one for confirmed facts,
/// and one for the latest verified user message.
fn scan_canonical_inputs(messages: &[Message], find_verified: bool) -> CanonicalStateScan {
    let mut scan = CanonicalStateScan::default();
    for (idx, msg) in messages.iter().enumerate().rev() {
        if msg.role == "user" {
            if scan.goal.is_none() {
                if let Some(text) = msg.content.iter().find_map(|b| match b {
                    ContentBlock::Text { text, .. } => Some(text.as_str()),
                    _ => None,
                }) {
                    scan.goal = Some(summarize_text(text, 220));
                    scan.latest_user_text_idx = Some(idx);
                }
            }
            if find_verified && scan.latest_verified_user_idx.is_none() {
                let verified = msg.content.iter().any(|b| match b {
                    ContentBlock::ToolResult { content, .. } => {
                        content.contains("[verification replay]")
                    }
                    _ => false,
                });
                if verified {
                    scan.latest_verified_user_idx = Some(idx);
                }
            }
        }
        if scan.facts_collected < CANONICAL_SCAN_MAX_FACTS {
            for block in &msg.content {
                if let ContentBlock::ToolResult { content, .. } = block
                    && !content.starts_with("Error:")
                {
                    scan.confirmed_facts.push(summarize_text(content, 180));
                    scan.facts_collected = scan.facts_collected.saturating_add(1);
                    if scan.facts_collected >= CANONICAL_SCAN_MAX_FACTS {
                        break;
                    }
                }
            }
        }
        if scan.is_complete(find_verified) {
            break;
        }
    }
    scan
}


#[cfg(test)]
mod canonical_scan_tests {
use super::*;
use crate::models::ContentBlock;

fn user_text_msg(text: &str) -> Message {
Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: text.to_string(),
cache_control: None,
}],
}
}

fn user_with_verified_replay(text: &str) -> Message {
Message {
role: "user".to_string(),
content: vec![
ContentBlock::Text {
text: text.to_string(),
cache_control: None,
},
ContentBlock::ToolResult {
tool_use_id: "x".to_string(),
content: "[verification replay] pass=true".to_string(),
is_error: None,
content_blocks: None,
},
],
}
}

fn tool_result_msg(content: &str) -> Message {
Message {
role: "tool".to_string(),
content: vec![ContentBlock::ToolResult {
tool_use_id: "x".to_string(),
content: content.to_string(),
is_error: None,
content_blocks: None,
}],
}
}

#[test]
fn scan_returns_goal_for_latest_user_text() {
let messages = vec![
user_text_msg("first"),
tool_result_msg("a"),
user_text_msg("second"),
tool_result_msg("b"),
user_text_msg("third"),
];
let scan = scan_canonical_inputs(&messages, false);
// Goal should be the most recent user text.
let goal = scan.goal.expect("goal");
assert!(
goal.contains("third"),
"expected the most recent, got {goal}"
);
assert_eq!(scan.latest_user_text_idx, Some(4));
}

#[test]
fn scan_collects_up_to_four_facts_newest_first() {
let messages = vec![
tool_result_msg("fact-A"),
tool_result_msg("fact-B"),
tool_result_msg("fact-C"),
tool_result_msg("fact-D"),
tool_result_msg("fact-E"),
];
let scan = scan_canonical_inputs(&messages, false);
assert_eq!(scan.confirmed_facts.len(), 4);
// The four most recent (newest first) are E, D, C, B.
assert!(scan.confirmed_facts[0].contains("fact-E"));
assert!(scan.confirmed_facts[1].contains("fact-D"));
assert!(scan.confirmed_facts[2].contains("fact-C"));
assert!(scan.confirmed_facts[3].contains("fact-B"));
}

#[test]
fn scan_skips_error_results() {
let messages = vec![
tool_result_msg("good-A"),
tool_result_msg("Error: bad"),
tool_result_msg("good-B"),
];
let scan = scan_canonical_inputs(&messages, false);
assert_eq!(scan.confirmed_facts.len(), 2);
assert!(scan.confirmed_facts[0].contains("good-B"));
assert!(scan.confirmed_facts[1].contains("good-A"));
}

#[test]
fn scan_finds_latest_verified_user_message() {
let messages = vec![
user_text_msg("first"),
user_with_verified_replay("verified"),
user_text_msg("third"),
];
let scan = scan_canonical_inputs(&messages, true);
// The verified marker is on the *middle* message, not the most
// recent. The scan should report its actual position.
assert_eq!(scan.latest_verified_user_idx, Some(1));
// The goal still points at the most recent user text.
assert!(scan.goal.as_deref().unwrap_or("").contains("third"));
}

#[test]
fn scan_handles_empty_input() {
let scan = scan_canonical_inputs(&[], false);
assert!(scan.goal.is_none());
assert!(scan.latest_verified_user_idx.is_none());
assert!(scan.latest_user_text_idx.is_none());
assert!(scan.confirmed_facts.is_empty());
}

#[test]
fn scan_early_exits_when_complete() {
// 1000 tool results — the scan should stop walking once the
// first 4 facts and a goal are found. We can't directly assert
// "didn't visit every element" without instrumentation, but the
// call must return promptly with the right slice. We pass
// `find_verified=false` so the scan does not have to keep
// walking looking for a verified user message that isn't there.
let mut messages: Vec<Message> = (0..1000)
.map(|i| tool_result_msg(&format!("fact-{i}")))
.collect();
// Most recent user message comes last.
messages.push(user_text_msg("goal"));
let scan = scan_canonical_inputs(&messages, false);
assert!(scan.goal.as_deref().unwrap_or("").contains("goal"));
assert_eq!(scan.confirmed_facts.len(), 4);
}
}
Loading