Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
360 changes: 114 additions & 246 deletions README.md

Large diffs are not rendered by default.

226 changes: 170 additions & 56 deletions agent/burr_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,23 @@
receive_input -> call_llm -> [execute_tools -> call_llm]* -> respond

ANALYSIS MODE (headless codebase analysis):
receive_input -> read_discovery -> analyze_depth_N -> ... -> synthesize -> respond
receive_input -> read_discovery -> analyze_current_depth (loop) -> synthesize -> respond

Each analyze_depth_N action:
- Runs component analyzers in parallel for all components at that depth
Each analyze_current_depth invocation:
- Runs component analyzers for all components at the current depth,
bounded by FLASHLIGHT_MAX_PARALLEL (default 4) via a ThreadPoolExecutor
- Each component analyzer is a ReAct loop (call_llm <-> execute_tools)
- Waits for all to complete before transitioning to next depth
wrapped in its own Burr Application for per-subagent UI visibility
- Waits for all components at the current depth to complete before
advancing to the next depth

Multi-agent visibility:
The Burr UI shows:
- receive_input: Initial task
- read_discovery: Load components.json and analysis_order.json
- analyze_depth_0: Parallel analysis of depth-0 components
- analyze_depth_1: Parallel analysis of depth-1 components (with upstream context)
- ... (more depth levels as needed)
- analyze_current_depth (once per depth): components at that depth, capped
by FLASHLIGHT_MAX_PARALLEL. Per-component detail lives in each subagent's
own tracked Application.
- synthesize: Architecture documentation synthesis
- respond: Final output
"""
Expand Down Expand Up @@ -890,7 +893,26 @@ def _chat_completion(
response=response,
)

# Non-retryable errors (4xx except 429)
# Non-retryable errors (4xx except 429) — surface the
# upstream error body so failures are actionable instead of
# a bare "400 Bad Request".
if response.status_code >= 400:
try:
err_body: Any = response.json()
except Exception:
err_body = response.text[:2000]
approx_chars = sum(
len(str(m.get("content", ""))) for m in messages
)
logger.error(
"LLM %d from %s (model=%s, msgs=%d, approx_chars=%d): %s",
response.status_code,
base_url,
model,
len(messages),
approx_chars,
err_body,
)
response.raise_for_status()
data = response.json()

Expand Down Expand Up @@ -1272,17 +1294,28 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State:
],
)
def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State:
"""Analyze all components at the current depth level.
"""Analyze all components at the current depth level in parallel.

For each component at this depth:
1. Build upstream context from already-analyzed dependencies
2. Run a component analyzer ReAct loop
3. Store the analysis result

Components at the same depth are analyzed sequentially in this implementation.
(Future: could use threading for true parallelism)
Components at the same depth run concurrently on a ThreadPoolExecutor,
bounded by FLASHLIGHT_MAX_PARALLEL (default 4). Threads are appropriate
because the work is I/O-bound (LLM HTTP calls + subprocess grep/bash).
Each subagent is an independent Burr Application with its own httpx
client, so no shared-state locking is required.

Results are merged back into ``component_analyses`` in deterministic
(depth-order) sequence so the downstream synthesis prompt is
reproducible even though futures complete in arbitrary order.

Set ``FLASHLIGHT_MAX_PARALLEL=1`` to opt out of parallelism (useful for
single-instance local backends like Ollama, or to avoid rate limits on
small API tiers).
"""
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

components = state.get("components", {})
depth_order = state.get("depth_order", [])
Expand All @@ -1294,64 +1327,125 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State:
# No more depths to analyze
return state.update(current_depth=current_depth)

component_names = depth_order[current_depth]

__tracer.log_attributes(
depth=current_depth,
component_count=len(component_names),
component_names=component_names,
)

for comp_name in component_names:
# Resolve the work items for this depth up-front. Upstream context is
# built from the already-complete `analyses` dict before any threads
# start, so worker threads never read mutable orchestrator state.
work_items: List[tuple[str, Dict[str, Any], str]] = []
for comp_name in depth_order[current_depth]:
comp = components.get(comp_name)
if not comp:
logger.warning(f"Component not found in inventory: {comp_name}")
continue
upstream_context = _build_upstream_context(
comp.get("internal_dependencies", []), analyses
)
work_items.append((comp_name, comp, upstream_context))

with __tracer(f"analyze:{comp_name}") as t:
# Build upstream context from dependencies
upstream_context = ""
deps = comp.get("internal_dependencies", [])
if deps:
context_parts = []
for dep_name in deps:
if dep_name in analyses:
# Extract summary from the analysis
analysis = analyses[dep_name]
summary = _extract_summary(analysis)
context_parts.append(f"### {dep_name}\n{summary}")
upstream_context = "\n\n".join(context_parts)

t.log_attributes(
component_kind=comp.get("kind", "unknown"),
component_type=comp.get("type", "unknown"),
dependency_count=len(deps),
has_upstream_context=bool(upstream_context),
)
max_parallel = _get_max_parallel()
pool_size = max(1, min(max_parallel, len(work_items))) if work_items else 1

# Run the component analyzer
analysis_result = _run_component_analyzer(
component=comp,
service_name=service_name,
upstream_context=upstream_context,
tracer=t,
)
__tracer.log_attributes(
depth=current_depth,
component_count=len(work_items),
component_names=[name for name, _, _ in work_items],
max_parallel=max_parallel,
pool_size=pool_size,
)

analyses[comp_name] = analysis_result
results: Dict[str, str] = {}
if work_items:
with ThreadPoolExecutor(
max_workers=pool_size,
thread_name_prefix=f"flashlight-depth{current_depth}",
) as pool:
future_to_name = {
pool.submit(
_run_component_analyzer,
component=comp,
service_name=service_name,
upstream_context=upstream_context,
tracer=None,
): name
for name, comp, upstream_context in work_items
}
for future in as_completed(future_to_name):
name = future_to_name[future]
try:
results[name] = future.result()
except Exception as exc:
logger.error(
"Component analyzer for %s crashed: %s",
name,
exc,
exc_info=True,
)
# Record the failure as an Error-prefixed string so the
# rest of the pipeline (synthesis, logging) behaves
# identically to the sequential-path error shape.
results[name] = f"Error: component analyzer crashed: {exc}"

# Merge results back in deterministic work-item order so downstream
# synthesis sees a stable component ordering across runs.
successes: List[str] = []
failures: List[str] = []
for comp_name, comp, _ in work_items:
result = results.get(comp_name, "")
analyses[comp_name] = result
if result and not result.startswith("Error"):
successes.append(comp_name)
else:
failures.append(comp_name)

t.log_attributes(
analysis_length=len(analysis_result) if analysis_result else 0,
success=bool(
analysis_result and not analysis_result.startswith("Error")
),
)
__tracer.log_attributes(
successful_components=successes,
failed_components=failures,
success_count=len(successes),
failure_count=len(failures),
)

return state.update(
component_analyses=analyses,
current_depth=current_depth + 1,
)


def _build_upstream_context(deps: List[str], analyses: Dict[str, str]) -> str:
"""Build an upstream_context prompt fragment from prior analyses.

For each dependency that has a completed analysis in ``analyses``,
emit a section with the dependency name and its summary. Dependencies
without an analysis (missing from inventory, failed earlier) are
silently skipped — the subagent simply gets less context.
"""
if not deps:
return ""
parts: List[str] = []
for dep_name in deps:
analysis = analyses.get(dep_name)
if analysis:
parts.append(f"### {dep_name}\n{_extract_summary(analysis)}")
return "\n\n".join(parts)


def _get_max_parallel() -> int:
"""Read FLASHLIGHT_MAX_PARALLEL, clamped to sensible bounds.

Defaults to 4 (reasonable for paid OpenAI-compatible tiers and
most self-hosted vLLM/LM Studio backends). Set to 1 to serialize.
Negative / non-integer values fall back to the default.
"""
raw = os.environ.get("FLASHLIGHT_MAX_PARALLEL", "4")
try:
n = int(raw)
except (TypeError, ValueError):
logger.warning(
"FLASHLIGHT_MAX_PARALLEL=%r is not an integer; using default 4",
raw,
)
return 4
return max(1, n)


def _extract_summary(analysis: str) -> str:
"""Extract a brief summary from a component analysis."""
# Look for a summary section or take first few paragraphs
Expand Down Expand Up @@ -1434,7 +1528,7 @@ def _run_component_analyzer(
system_prompt = f"You are a component-analyzer for the {service_name} codebase."

# Run as a proper Burr application for UI visibility
return _run_subagent_as_app(
analysis = _run_subagent_as_app(
system_prompt=system_prompt,
user_prompt=prompt,
subagent_type="component-analyzer",
Expand All @@ -1443,6 +1537,26 @@ def _run_component_analyzer(
parent_sequence_id=parent_sequence_id,
)

# Persist the analysis to disk ourselves. We don't trust the LLM to call
# write_file — empirically it drops that tool call ~75% of the time,
# especially on smaller models, silently throwing away the analysis.
# Saving in the orchestrator guarantees every completed subagent produces
# a .md on disk regardless of which tool calls the model chose to make.
if analysis and not analysis.startswith("Error"):
out_path = Path(f"/tmp/{service_name}/service_analyses/{comp_name}.md")
try:
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(analysis, encoding="utf-8")
except OSError as exc:
logger.error(
"Failed to persist analysis for %s to %s: %s",
comp_name,
out_path,
exc,
)

return analysis


@action(
reads=["component_analyses", "service_name"],
Expand Down
Loading
Loading