diff --git a/README.md b/README.md index 680c6e9..0e4013e 100644 --- a/README.md +++ b/README.md @@ -1,138 +1,85 @@ # GitHub Flashlight -A multi-agent processing pipeline that performs dependency-aware codebase analysis and visualization. Built on [Burr](https://github.com/apache/burr) for explicit state-machine orchestration, and speaks the OpenAI Chat Completions protocol — so it works with **any OpenAI-compatible endpoint** (OpenAI, OpenRouter, vLLM, LM Studio, Ollama, Together, Groq, …). +Flashlight points an LLM-powered analysis pipeline at a codebase and produces structured, cited documentation of every component — per-component analyses, a unified knowledge graph, and a synthesized architecture doc. -## Features +It is built on [Burr](https://github.com/apache/burr) for explicit state-machine orchestration and speaks the OpenAI Chat Completions protocol, so it works with **any OpenAI-compatible endpoint** (OpenAI, OpenRouter, vLLM, LM Studio, Ollama, Together, Groq, …). -- **Automatic Service Discovery**: Identifies services in Rust (Cargo.toml), Go (go.mod), Node.js (package.json), and Python (pyproject.toml) codebases -- **Dependency Graph Analysis**: Builds and visualizes service dependency relationships -- **Two-Phase Analysis**: - - Phase 1: Analyzes foundation services with no dependencies - - Phase 2: Analyzes remaining services in dependency order with upstream context -- **Context-Aware**: Code analyzers receive analyses of direct dependencies to understand integration patterns -- **Comprehensive Documentation**: Generates system-wide architecture documentation with patterns, flows, and recommendations -- **Multi-Agent Orchestration**: Uses specialized agents for discovery, analysis, and documentation synthesis +## How it works -## Component Classification +The pipeline has one deterministic phase and two LLM-driven phases. Nothing is inferred by an LLM until the graph is built. -GitHub Flashlight uses a deterministic discovery engine (zero LLM calls) to classify every component into one of eight `ComponentKind` values via language-specific plugins: +### 1. Deterministic discovery (zero LLM calls) + +[agent/discovery/engine.py](agent/discovery/engine.py) walks the repo and classifies every component via language-specific plugins. Each component is tagged with a `ComponentKind`: | Kind | Description | |------|-------------| -| **Library** | Reusable code with no entrypoint | -| **Service** | Long-running process (HTTP, gRPC, daemon) | -| **CLI** | Command-line tool | -| **Contract** | Smart contract, API definition, or schema | -| **Infra** | Infrastructure-as-code or deployment config (Terraform, Helm, K8s) | -| **Pipeline** | Data pipeline or workflow definition (Airflow, dbt) | -| **Frontend** | UI application (React, Vue, Streamlit, SwiftUI) | -| **Unknown** | Could not classify deterministically | - -### Supported Languages - -#### Go (`go.mod`) -- **Service**: `main.go` or `package main` with server indicators (`listenandserve`, `grpc.newserver`, `net.listen`, etc.) or service-like names (`server`, `daemon`, `proxy`, `worker`) -- **CLI**: `main.go` with CLI indicators (`cobra.command`, `pflag`, `os.args`) or CLI-like names (`cli`, `tool`) -- **Library**: No `main.go`, no `package main`, or has a `cmd/` directory (executables inside `cmd/` become their own components) -- Supports single-module monorepos with per-package discovery and Go import tracing - -#### Rust (`Cargo.toml`) -- **Service**: `[[bin]]` or `src/main.rs` with server framework deps (`actix-web`, `axum`, `warp`, `rocket`, `tonic`, `hyper`) or service-like names -- **CLI**: Executable with CLI framework deps (`clap`, `structopt`, `argh`) or CLI-like names -- **Library**: `[lib]` section only, or hybrid crates with both `lib.rs` and `main.rs` -- Supports Cargo workspaces with glob member patterns - -#### Python (`pyproject.toml`) -- **Pipeline**: Markers for `airflow`, `dagster`, `prefect`, `dbt`, `luigi` -- **Frontend**: Markers for `streamlit`, `gradio`, `panel`, `dash` -- **Service**: Web framework deps (`fastapi`, `flask`, `django`, `starlette`, etc.) with `[project.scripts]` or `__main__.py` -- **CLI**: Has `[project.scripts]` or `__main__.py` without web framework deps -- **Library**: No entry points or framework markers -- Supports both PEP 621 and Poetry dependency formats - -#### TypeScript / JavaScript (`package.json`) -- **CLI**: `"bin"` field present -- **Frontend**: Frontend framework deps (`react`, `vue`, `svelte`, `angular`, `next`, `nuxt`, `remix`, `solid-js`, etc.) -- **Service**: Server framework deps (`express`, `fastify`, `koa`, `nestjs`, `hono`) with a `"start"` script or `"main"` field -- **Library**: No binary, framework, or server indicators -- Supports npm/yarn/pnpm workspaces with recursive member discovery - -#### Solidity (`foundry.toml`, `hardhat.config.ts`/`.js`) -- **Contract**: Contains `contract`, `abstract contract`, or `interface` declarations -- **Library**: All declarations are Solidity `library` keyword -- Supports both Foundry and Hardhat projects with import remapping and multi-package discovery - -#### Swift (`Package.swift`) -- **Service**: `.executableTarget` with server framework indicators (`Vapor`, `Hummingbird`, `SwiftNIO`, `GRPC`) or service-like names -- **Frontend**: Executable with iOS/macOS indicators (`UIApplication`, `SwiftUI`, `WindowGroup`) -- **CLI**: `.executableTarget` with argument parsing (`ParsableCommand`, `ArgumentParser`) or CLI-like names; default for unclassified executables -- **Library**: `.target` without `main.swift` or `@main`, `.binaryTarget`, `.systemLibrary` -- Supports SPM multi-target packages with per-target discovery - -### Detection Pipeline -1. **Manifest discovery**: Scans for language-specific manifest files (`Cargo.toml`, `go.mod`, `package.json`, `pyproject.toml`, `foundry.toml`, `Package.swift`) -2. **Manifest analysis**: Checks for binary/entrypoint indicators in manifest structure -3. **File structure**: Looks for `main.rs`, `main.go`, `__main__.py`, `main.swift`, `@main` attribute -4. **Dependency scanning**: Identifies framework-specific dependencies (e.g., `axum` -> Service, `clap` -> CLI, `react` -> Frontend) -5. **Content scanning**: Reads source files for server/CLI/UI indicators (Go reads all `.go` files; Swift reads up to 10 `.swift` files) -6. **Name-based heuristics**: Keywords like "server", "api", "daemon" -> Service; "cli", "tool" -> CLI -7. **Default**: Falls back to Library (or Service/CLI for executables, depending on the language) - -## Architecture - -The pipeline uses four specialized roles: - -1. **Primary Leader** (orchestrator) - - Discovers services by scanning for manifest files - - Builds dependency graph and determines analysis order - - Spawns code analyzer agents with appropriate context - - Spawns external service analyzers for runtime integrations - - Spawns architecture documenter for final synthesis - -2. **Code Analyzer** (multiple instances) - - Deep analysis of individual services - - Examines architecture, components, data flows, dependencies, API surface - - Documents all third-party dependencies with version, category, and purpose - - Receives context from direct dependencies - - Outputs Markdown reports - -3. **External Service Analyzer** (per-service instances) - - Deep-dives into how external services (databases, cloud platforms, APIs) are integrated - - Documents client libraries, authentication, API surface, and configuration - - Produces integration analysis files for architecture synthesis - -4. **Architecture Documenter** (single instance) - - Synthesizes all service analyses - - Aggregates external dependencies into a complete technology inventory - - Identifies system-wide patterns - - Creates comprehensive architecture documentation +| **library** | Reusable code with no entrypoint | +| **service** | Long-running process (HTTP, gRPC, daemon) | +| **cli** | Command-line tool | +| **contract** | Smart contract, ABI, or schema | +| **infra** | IaC / deployment config (Terraform, Helm, K8s) | +| **pipeline** | Data pipeline / workflow (Airflow, dbt, …) | +| **frontend** | UI application (React, Vue, Streamlit, SwiftUI) | +| **unknown** | Could not classify | + +Supported manifests: `Cargo.toml` (Rust), `go.mod` (Go), `package.json` (TS/JS), `pyproject.toml` (Python), `foundry.toml` / `hardhat.config.*` (Solidity), `Package.swift` (Swift). Plugins live in [agent/discovery/languages/](agent/discovery/languages/). + +The [KnowledgeGraphBuilder](agent/schemas/knowledge_graph.py) then builds a unified dependency graph across all components and runs a topological sort to produce `analysis_order.json` — a list of **depth levels**, where every component at depth `d` only depends (transitively) on components at depths `< d`. + +### 2. Depth-level component analysis + +The Burr state machine in [agent/burr_app.py](agent/burr_app.py) walks the depth levels in order: + +``` +receive_input -> read_discovery -> analyze_current_depth (loops over depths) -> synthesize -> respond +``` + +For each depth level, `analyze_current_depth` spawns one component-analyzer subagent per component at that depth. Subagents at the **same depth** run concurrently on a `ThreadPoolExecutor` bounded by `FLASHLIGHT_MAX_PARALLEL` (default 4). The orchestrator waits for the whole depth to complete before advancing. + +Each component-analyzer: +- Receives `upstream_context` — summaries of every direct dependency's analysis (which have already completed at a lower depth) +- Runs its own ReAct loop as an independent Burr `Application` so it shows up in the Burr UI with a parent/child relationship +- Reads the component source with `glob_files`, `grep_files`, `read_file`, `bash` +- Emits a Markdown analysis that ends with two structured JSON blocks: `## Analysis Data` (machine-readable summary) and `## Citations` (file/line provenance for every major claim) + +The orchestrator persists every returned analysis to `/tmp/{service_name}/service_analyses/{component}.md` — it doesn't trust the LLM to call `write_file` reliably. + +> Older versions of Flashlight had a distinct "depth 0" / "depth 1" split (foundation components vs. the rest). That's gone. The pipeline now treats every depth level uniformly, and there can be any number of levels depending on the graph. + +### 3. Synthesis + +Once every depth has been analyzed, `synthesize` runs the architecture-documenter subagent over every component summary to produce `architecture.md` and `quick_reference.md`. + +### 4. Citation extraction (post-analysis) + +[agent/utils/citation_extractor.py](agent/utils/citation_extractor.py) parses every `## Citations` block out of the Markdown analyses, validates each citation against the actual source file (existence, line range), and writes a unified `citations.json` next to the analyses. ## Installation ```bash -# Create virtual environment python -m venv venv -source venv/bin/activate # On Windows: venv\Scripts\activate +source venv/bin/activate # Windows: venv\Scripts\activate -# Install dependencies pip install -e . -# Configure your LLM endpoint -cp .env.example .env -# Edit .env and set OPENAI_API_KEY (and optionally OPENAI_BASE_URL / OPENAI_MODEL) +cp .env.example .env # then edit .env with your endpoint credentials ``` -### LLM configuration +Requires Python 3.10+ and [ripgrep](https://github.com/BurntSushi/ripgrep) (`rg`) on `PATH` for the grep tool. -Flashlight uses the OpenAI Chat Completions API, so any OpenAI-compatible provider works. Set: +## LLM configuration + +Flashlight uses the OpenAI Chat Completions API, so any OpenAI-compatible provider works. | Variable | Required | Default | Notes | |----------|----------|---------|-------| | `OPENAI_API_KEY` | yes | — | Bearer token for the target endpoint | | `OPENAI_BASE_URL` | no | `https://api.openai.com/v1` | Point at OpenAI, OpenRouter, vLLM, LM Studio, Ollama, etc. | -| `OPENAI_MODEL` | no | `gpt-4o-mini` | Any model served by your chosen endpoint | +| `OPENAI_MODEL` | no | `gpt-4o-mini` | Any model served by the chosen endpoint | +| `FLASHLIGHT_MAX_PARALLEL` | no | `4` | Max component analyzers run concurrently within a depth. Set to `1` to serialize (recommended for Ollama and small API tiers). | -Example configurations: +Example configs: ```bash # OpenAI @@ -152,174 +99,95 @@ OPENAI_MODEL=meta-llama/Llama-3.1-70B-Instruct ## Usage -```bash -# Run the pipeline -python -m github_flashlight.agent +The primary entry point is the headless `flashlight` CLI ([agent/cli.py](agent/cli.py)). -# Or use the installed command -github-flashlight -``` +### Full analysis -Then provide a path to analyze: -``` -You: Analyze the codebase at /path/to/repo +```bash +# Local checkout +flashlight --repo /path/to/repo --output ./artifacts/myservice + +# GitHub URL (cloned into /tmp/flashlight-repos) +flashlight --repo https://github.com/org/repo --output ./artifacts/myservice ``` -### Verbose Logging +### Incremental (diff-driven) analysis -Enable detailed SDK and API interaction logging: +Given a previous run's artifacts and a new commit SHA, Flashlight will only re-analyze components whose files changed between the last and current SHAs: ```bash -# Verbose mode - Shows API calls, agent spawning, and tool usage -AGENT_VERBOSE=true python -m github_flashlight.agent +# Explicit SHAs +flashlight --repo /path/to/repo --output ./artifacts/myservice \ + --last-sha abc1234 --head-sha def5678 -# Debug mode - Full trace logging including API request/response details -AGENT_DEBUG=true python -m github_flashlight.agent +# Auto-detect last-sha from manifest.json in the output dir +flashlight --repo /path/to/repo --output ./artifacts/myservice \ + --head-sha def5678 ``` -When enabled, you'll see real-time information about: -- LLM API requests and responses -- Subagent spawning and lifecycle -- Tool calls with parameters -- Tool results and success/failure status -- Agent context and model information - -This is useful for: -- Understanding what the agents are doing in real-time -- Debugging analysis pipeline issues -- Monitoring API usage and performance -- Learning how the multi-agent system orchestrates tasks - -The pipeline will: -1. Scan for services (Cargo.toml, go.mod, package.json, pyproject.toml files) -2. Build dependency graph -3. Analyze services in two phases: - - Phase 1: Services with no dependencies (parallel) - - Phase 2: Services with dependencies (in order, with context) -4. Generate architecture documentation - -## Output Structure +If the diff doesn't map cleanly to existing components (e.g. brand-new files), Flashlight falls back to a full analysis. -``` -files/ -├── service_discovery/ -│ ├── services.json # Discovered services metadata -│ └── discovery_log.md # Human-readable discovery log -├── dependency_graphs/ -│ ├── dependency_graph.json # Machine-readable graph -│ └── dependency_graph.md # Visualization -├── service_analyses/ -│ ├── {service1}.json # Structured analysis -│ ├── {service1}.md # Human-readable report -│ └── ... (one pair per service) -└── architecture_docs/ - ├── architecture.md # Comprehensive documentation - └── quick_reference.md # One-page summary +### Interactive chat mode + +For ad-hoc exploration with the same tool-using agent: -logs/ -└── session_YYYYMMDD_HHMMSS/ - ├── transcript.txt # Conversation log - └── tool_calls.jsonl # Structured tool usage +```bash +code-analysis-agent ``` -## Example Analysis Flow +### Verbose logging -For a Rust codebase with this structure: -``` -repo/ -├── common-utils/ (no dependencies) -├── config-loader/ (no dependencies) -├── database-layer/ (depends on common-utils) -├── auth-service/ (depends on database-layer) -└── api-gateway/ (depends on auth-service, database-layer) +```bash +flashlight --repo ... --output ... --verbose # INFO-level +flashlight --repo ... --output ... --debug # full trace including HTTP details ``` -The agent will: -1. **Phase 1**: Analyze `common-utils` and `config-loader` in parallel -2. **Phase 2**: - - Analyze `database-layer` with context from `common-utils` - - Analyze `auth-service` with context from `database-layer` only (not common-utils) - - Analyze `api-gateway` with context from `auth-service` and `database-layer` -3. **Synthesis**: Generate comprehensive architecture documentation +### Burr UI -## Key Design Principles +Each run (both the orchestrator and every subagent) is tracked by Burr. Start the Burr UI to inspect state transitions, tool calls, token usage, and subagent trees: -- **Direct Dependencies Only**: Analyzers receive context only from direct dependencies, not transitive ones -- **Dependency Order**: Services are analyzed in topological order to ensure dependencies are analyzed first -- **Parallel Execution**: Services at the same dependency level are analyzed in parallel -- **Structured Output**: Both machine-readable (JSON) and human-readable (Markdown) outputs +```bash +.burr-ui-venv/bin/python -m uvicorn burr.tracking.server.run:app --port 7241 +# then open http://localhost:7241 +``` -## Supported Languages +## Output structure -- **Rust**: Full support (Cargo.toml discovery, dependency extraction) -- **Go**: Full support (go.mod discovery, dependency extraction) -- **Node.js**: Partial support (package.json discovery) -- **Python**: Partial support (pyproject.toml discovery) +Flashlight writes intermediate artifacts to `/tmp/{service_name}/` during a run and then copies the final artifacts to `--output`: -## Requirements +``` +/ +├── manifest.json # source_repo, source_commit, timestamps +├── service_discovery/ +│ └── components.json # Deterministic component inventory +├── dependency_graphs/ +│ ├── graph.json # Unified knowledge graph +│ └── analysis_order.json # Topologically-sorted depth levels +├── service_analyses/ +│ ├── {component}.md # Markdown analysis with inline citations +│ ├── {component}.json # Parsed `## Analysis Data` block +│ └── citations.json # Validated, aggregated citations +└── architecture_docs/ + ├── architecture.md # System-wide synthesis + └── quick_reference.md # One-page summary +``` -- Python 3.10+ -- An API key for an OpenAI-compatible endpoint (OpenAI, OpenRouter, a self-hosted vLLM/LM Studio/Ollama server, etc.) -- Access to the codebase to analyze +Per-run session logs (transcript + structured tool calls) are written under `logs/session_YYYYMMDD_HHMMSS/`. ## Development ```bash -# Install with dev dependencies pip install -e ".[dev]" - -# Run tests (when available) pytest ``` -## How It Works - -The primary leader orchestrates a sophisticated multi-phase workflow: - -### Discovery Phase -- Uses Glob to find manifest files (Cargo.toml, go.mod, package.json, pyproject.toml) -- Reads each manifest to extract service metadata -- Identifies internal dependencies (path-based in manifests) -- Saves service inventory to JSON - -### Graph Building Phase -- Constructs directed dependency graph -- Calculates analysis order using two-phase approach: - - Phase 1: Services with in-degree 0 (no dependencies) - - Phase 2: Topological sort of remaining services -- Visualizes graph in both JSON and Markdown - -### Analysis Phase -- **Phase 1**: Spawns code-analyzer for each no-dependency service (parallel) -- **Phase 2**: For each remaining service: - - Waits for its direct dependencies to complete - - Loads direct dependency analyses - - Builds context summary (architecture, APIs, components) - - Spawns code-analyzer with context - - Ensures proper ordering while maximizing parallelism - -### Synthesis Phase -- Spawns architecture-documenter after all analyses complete -- Reads all service analyses and dependency graph -- Identifies system-wide patterns and architectural approaches -- Generates comprehensive documentation with: - - System overview - - Service catalog - - Dependency visualization - - Architectural patterns - - Technology stack - - Major data flows - - Development guide - - Recommendations - -## Contributing - -This project showcases dependency-aware multi-agent composition over the OpenAI Chat Completions protocol. Feel free to extend it with: -- Additional language support (Java, C#, etc.) -- Enhanced metrics collection (LOC, complexity, test coverage) -- Incremental analysis for large repositories -- Custom analysis plugins -- Additional visualization options +Key modules: + +- [agent/burr_app.py](agent/burr_app.py) — Burr state machine, actions, and subagent runners +- [agent/cli.py](agent/cli.py) — Headless CLI and incremental-analysis logic +- [agent/discovery/](agent/discovery/) — Deterministic component discovery +- [agent/schemas/](agent/schemas/) — Component / knowledge-graph data model +- [agent/prompts/subagents/](agent/prompts/subagents/) — Prompt templates for component-analyzer, architecture-documenter, external-service-analyzer ## License diff --git a/agent/burr_app.py b/agent/burr_app.py index 0e34292..bcfe355 100644 --- a/agent/burr_app.py +++ b/agent/burr_app.py @@ -11,20 +11,23 @@ receive_input -> call_llm -> [execute_tools -> call_llm]* -> respond ANALYSIS MODE (headless codebase analysis): - receive_input -> read_discovery -> analyze_depth_N -> ... -> synthesize -> respond + receive_input -> read_discovery -> analyze_current_depth (loop) -> synthesize -> respond - Each analyze_depth_N action: - - Runs component analyzers in parallel for all components at that depth + Each analyze_current_depth invocation: + - Runs component analyzers for all components at the current depth, + bounded by FLASHLIGHT_MAX_PARALLEL (default 4) via a ThreadPoolExecutor - Each component analyzer is a ReAct loop (call_llm <-> execute_tools) - - Waits for all to complete before transitioning to next depth + wrapped in its own Burr Application for per-subagent UI visibility + - Waits for all components at the current depth to complete before + advancing to the next depth Multi-agent visibility: The Burr UI shows: - receive_input: Initial task - read_discovery: Load components.json and analysis_order.json - - analyze_depth_0: Parallel analysis of depth-0 components - - analyze_depth_1: Parallel analysis of depth-1 components (with upstream context) - - ... (more depth levels as needed) + - analyze_current_depth (once per depth): components at that depth, capped + by FLASHLIGHT_MAX_PARALLEL. Per-component detail lives in each subagent's + own tracked Application. - synthesize: Architecture documentation synthesis - respond: Final output """ @@ -890,7 +893,26 @@ def _chat_completion( response=response, ) - # Non-retryable errors (4xx except 429) + # Non-retryable errors (4xx except 429) — surface the + # upstream error body so failures are actionable instead of + # a bare "400 Bad Request". + if response.status_code >= 400: + try: + err_body: Any = response.json() + except Exception: + err_body = response.text[:2000] + approx_chars = sum( + len(str(m.get("content", ""))) for m in messages + ) + logger.error( + "LLM %d from %s (model=%s, msgs=%d, approx_chars=%d): %s", + response.status_code, + base_url, + model, + len(messages), + approx_chars, + err_body, + ) response.raise_for_status() data = response.json() @@ -1272,17 +1294,28 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State: ], ) def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State: - """Analyze all components at the current depth level. + """Analyze all components at the current depth level in parallel. For each component at this depth: 1. Build upstream context from already-analyzed dependencies 2. Run a component analyzer ReAct loop 3. Store the analysis result - Components at the same depth are analyzed sequentially in this implementation. - (Future: could use threading for true parallelism) + Components at the same depth run concurrently on a ThreadPoolExecutor, + bounded by FLASHLIGHT_MAX_PARALLEL (default 4). Threads are appropriate + because the work is I/O-bound (LLM HTTP calls + subprocess grep/bash). + Each subagent is an independent Burr Application with its own httpx + client, so no shared-state locking is required. + + Results are merged back into ``component_analyses`` in deterministic + (depth-order) sequence so the downstream synthesis prompt is + reproducible even though futures complete in arbitrary order. + + Set ``FLASHLIGHT_MAX_PARALLEL=1`` to opt out of parallelism (useful for + single-instance local backends like Ollama, or to avoid rate limits on + small API tiers). """ - from pathlib import Path + from concurrent.futures import ThreadPoolExecutor, as_completed components = state.get("components", {}) depth_order = state.get("depth_order", []) @@ -1294,57 +1327,81 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State: # No more depths to analyze return state.update(current_depth=current_depth) - component_names = depth_order[current_depth] - - __tracer.log_attributes( - depth=current_depth, - component_count=len(component_names), - component_names=component_names, - ) - - for comp_name in component_names: + # Resolve the work items for this depth up-front. Upstream context is + # built from the already-complete `analyses` dict before any threads + # start, so worker threads never read mutable orchestrator state. + work_items: List[tuple[str, Dict[str, Any], str]] = [] + for comp_name in depth_order[current_depth]: comp = components.get(comp_name) if not comp: logger.warning(f"Component not found in inventory: {comp_name}") continue + upstream_context = _build_upstream_context( + comp.get("internal_dependencies", []), analyses + ) + work_items.append((comp_name, comp, upstream_context)) - with __tracer(f"analyze:{comp_name}") as t: - # Build upstream context from dependencies - upstream_context = "" - deps = comp.get("internal_dependencies", []) - if deps: - context_parts = [] - for dep_name in deps: - if dep_name in analyses: - # Extract summary from the analysis - analysis = analyses[dep_name] - summary = _extract_summary(analysis) - context_parts.append(f"### {dep_name}\n{summary}") - upstream_context = "\n\n".join(context_parts) - - t.log_attributes( - component_kind=comp.get("kind", "unknown"), - component_type=comp.get("type", "unknown"), - dependency_count=len(deps), - has_upstream_context=bool(upstream_context), - ) + max_parallel = _get_max_parallel() + pool_size = max(1, min(max_parallel, len(work_items))) if work_items else 1 - # Run the component analyzer - analysis_result = _run_component_analyzer( - component=comp, - service_name=service_name, - upstream_context=upstream_context, - tracer=t, - ) + __tracer.log_attributes( + depth=current_depth, + component_count=len(work_items), + component_names=[name for name, _, _ in work_items], + max_parallel=max_parallel, + pool_size=pool_size, + ) - analyses[comp_name] = analysis_result + results: Dict[str, str] = {} + if work_items: + with ThreadPoolExecutor( + max_workers=pool_size, + thread_name_prefix=f"flashlight-depth{current_depth}", + ) as pool: + future_to_name = { + pool.submit( + _run_component_analyzer, + component=comp, + service_name=service_name, + upstream_context=upstream_context, + tracer=None, + ): name + for name, comp, upstream_context in work_items + } + for future in as_completed(future_to_name): + name = future_to_name[future] + try: + results[name] = future.result() + except Exception as exc: + logger.error( + "Component analyzer for %s crashed: %s", + name, + exc, + exc_info=True, + ) + # Record the failure as an Error-prefixed string so the + # rest of the pipeline (synthesis, logging) behaves + # identically to the sequential-path error shape. + results[name] = f"Error: component analyzer crashed: {exc}" + + # Merge results back in deterministic work-item order so downstream + # synthesis sees a stable component ordering across runs. + successes: List[str] = [] + failures: List[str] = [] + for comp_name, comp, _ in work_items: + result = results.get(comp_name, "") + analyses[comp_name] = result + if result and not result.startswith("Error"): + successes.append(comp_name) + else: + failures.append(comp_name) - t.log_attributes( - analysis_length=len(analysis_result) if analysis_result else 0, - success=bool( - analysis_result and not analysis_result.startswith("Error") - ), - ) + __tracer.log_attributes( + successful_components=successes, + failed_components=failures, + success_count=len(successes), + failure_count=len(failures), + ) return state.update( component_analyses=analyses, @@ -1352,6 +1409,43 @@ def analyze_current_depth(state: State, __tracer: "TracerFactory") -> State: ) +def _build_upstream_context(deps: List[str], analyses: Dict[str, str]) -> str: + """Build an upstream_context prompt fragment from prior analyses. + + For each dependency that has a completed analysis in ``analyses``, + emit a section with the dependency name and its summary. Dependencies + without an analysis (missing from inventory, failed earlier) are + silently skipped — the subagent simply gets less context. + """ + if not deps: + return "" + parts: List[str] = [] + for dep_name in deps: + analysis = analyses.get(dep_name) + if analysis: + parts.append(f"### {dep_name}\n{_extract_summary(analysis)}") + return "\n\n".join(parts) + + +def _get_max_parallel() -> int: + """Read FLASHLIGHT_MAX_PARALLEL, clamped to sensible bounds. + + Defaults to 4 (reasonable for paid OpenAI-compatible tiers and + most self-hosted vLLM/LM Studio backends). Set to 1 to serialize. + Negative / non-integer values fall back to the default. + """ + raw = os.environ.get("FLASHLIGHT_MAX_PARALLEL", "4") + try: + n = int(raw) + except (TypeError, ValueError): + logger.warning( + "FLASHLIGHT_MAX_PARALLEL=%r is not an integer; using default 4", + raw, + ) + return 4 + return max(1, n) + + def _extract_summary(analysis: str) -> str: """Extract a brief summary from a component analysis.""" # Look for a summary section or take first few paragraphs @@ -1434,7 +1528,7 @@ def _run_component_analyzer( system_prompt = f"You are a component-analyzer for the {service_name} codebase." # Run as a proper Burr application for UI visibility - return _run_subagent_as_app( + analysis = _run_subagent_as_app( system_prompt=system_prompt, user_prompt=prompt, subagent_type="component-analyzer", @@ -1443,6 +1537,26 @@ def _run_component_analyzer( parent_sequence_id=parent_sequence_id, ) + # Persist the analysis to disk ourselves. We don't trust the LLM to call + # write_file — empirically it drops that tool call ~75% of the time, + # especially on smaller models, silently throwing away the analysis. + # Saving in the orchestrator guarantees every completed subagent produces + # a .md on disk regardless of which tool calls the model chose to make. + if analysis and not analysis.startswith("Error"): + out_path = Path(f"/tmp/{service_name}/service_analyses/{comp_name}.md") + try: + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(analysis, encoding="utf-8") + except OSError as exc: + logger.error( + "Failed to persist analysis for %s to %s: %s", + comp_name, + out_path, + exc, + ) + + return analysis + @action( reads=["component_analyses", "service_name"], diff --git a/agent/cli.py b/agent/cli.py index 96099c6..9bdc4b3 100644 --- a/agent/cli.py +++ b/agent/cli.py @@ -308,6 +308,12 @@ def analyze( discovery_dir = work_dir / "service_discovery" graph_dir = work_dir / "dependency_graphs" + # Expose the repo at /tmp/{service_name}/project/ — every subagent prompt + # references this path as the source root, so without it analyzers can't + # read any code and end up hallucinating. Symlink (no disk copy) so large + # repos stay cheap and the view always tracks the current clone. + _setup_project_dir(repo, work_dir) + # Discover components from source components = discover_components(repo, output_dir=discovery_dir) print(f" Discovered {len(components)} components") @@ -460,11 +466,32 @@ def analyze( project_root = tmp_artifacts / "project" cite_repo_root = project_root if project_root.exists() else None + # Build {component_name: root_path} map so the extractor can retry + # component-relative citation paths (e.g. when the LLM emits + # "base_model.py" for the models component, we want to fall back to + # "omlx/models/base_model.py"). Reads from the full unified graph + # since components.json may only list the repo-root entry for + # single-manifest Python packages. + component_roots: dict[str, str] = {} + graph_file = tmp_artifacts / "dependency_graphs" / "graph.json" + if graph_file.exists(): + try: + with open(graph_file) as f: + graph_data = json.load(f) + nodes = graph_data.get("nodes", {}).get("components", {}) + for name, comp in nodes.items(): + root = comp.get("root_path", "") + if root and root != ".": + component_roots[name] = root + except (OSError, json.JSONDecodeError) as exc: + logger.warning("Could not load component roots from graph: %s", exc) + cite_index = build_citations_index( analyses_dir=analyses_dir, repo_root=cite_repo_root, source_repo=cite_source_repo, source_commit=cite_source_commit, + component_roots=component_roots, ) meta = cite_index.get("metadata", {}) @@ -507,6 +534,68 @@ def analyze( # --------------------------------------------------------------------------- +def _setup_project_dir(repo: Path, work_dir: Path) -> Path: + """Ensure `/project` points at the repo the subagents analyze. + + Subagent prompts all reference `/tmp/{service_name}/project/` as the + source root. This function makes that path resolve to ``repo``. + + Prefers a symlink (no disk copy, always consistent with the clone); falls + back to copying if the platform can't create the symlink. + + Idempotent: safe to call across re-runs. + + Args: + repo: Resolved path to the repository root (clone or local checkout) + work_dir: `/tmp/{service_name}` working directory + + Returns: + Path to the project directory (``work_dir / "project"``) + """ + project_dir = work_dir / "project" + repo = repo.resolve() + work_dir_resolved = work_dir.resolve() + + # Refuse to alias a work_dir-rooted path onto itself (would create a cycle + # like /tmp/foo/project/project/project/...). + if repo == work_dir_resolved or work_dir_resolved in repo.parents: + raise ValueError( + f"Refusing to link {project_dir} -> {repo}: the repo lives inside " + f"the work_dir ({work_dir_resolved}). Move the clone outside " + f"{work_dir_resolved} or use a different service name." + ) + + # If project/ already points at the right target, keep it. + if project_dir.is_symlink(): + try: + if project_dir.resolve() == repo: + return project_dir + except OSError: + pass + project_dir.unlink() + elif project_dir.exists(): + # Stale real directory from an older run — remove so we can relink. + shutil.rmtree(project_dir) + + try: + project_dir.symlink_to(repo, target_is_directory=True) + return project_dir + except OSError as exc: + logger.warning( + "Could not symlink %s → %s (%s); falling back to copy.", + project_dir, + repo, + exc, + ) + shutil.copytree( + repo, + project_dir, + symlinks=True, + ignore=shutil.ignore_patterns(".git", "__pycache__", "node_modules"), + ) + return project_dir + + def clone_repo(url: str, target_dir: Path) -> Path: """Clone a git repository to a target directory. diff --git a/agent/discovery/languages/go.py b/agent/discovery/languages/go.py index 53fd279..a5dc906 100644 --- a/agent/discovery/languages/go.py +++ b/agent/discovery/languages/go.py @@ -101,7 +101,9 @@ def _discover_packages( ) -> Dict[str, Path]: """Find top-level directories that contain Go source files. - Returns a dict of {package_name: directory_path}. + Returns a dict of {package_name: directory_path}. A top-level `cmd/` + dir is treated as a container, not a package — its children are + handled by _build_package_components via the cmd/ recursion path. """ packages: Dict[str, Path] = {} @@ -110,6 +112,8 @@ def _discover_packages( continue if entry.name.startswith(".") or entry.name in SKIP_DIRS: continue + if entry.name == "cmd" and self._cmd_is_binary_container(entry): + continue # Count .go files anywhere in this directory tree go_files = list(entry.rglob("*.go")) @@ -118,6 +122,17 @@ def _discover_packages( return packages + def _cmd_is_binary_container(self, cmd_dir: Path) -> bool: + """True if `cmd/` holds one-or-more `cmd//main.go` binaries + rather than a `cmd` Go package (cmd/*.go at the top level).""" + has_top_level_go = any(f.is_file() for f in cmd_dir.glob("*.go")) + if has_top_level_go: + return False + has_child_main = any( + sub.is_dir() and (sub / "main.go").exists() for sub in cmd_dir.iterdir() + ) + return has_child_main + def _build_package_components( self, packages: Dict[str, Path], @@ -140,9 +155,22 @@ def _build_package_components( for pkg_name, pkg_dir in packages.items(): package_imports[pkg_name] = self._scan_imports(pkg_dir, module_path) - # Scan all cmd/ directories recursively within each package. - # Handles: pkg/cmd/foo, pkg/sub/cmd/bar, etc. + # Scan all cmd/ directories recursively within each package AND + # at the repo root. Handles: cmd/foo, pkg/cmd/foo, pkg/sub/cmd/bar. cmd_components: List[Tuple[str, str, Path, ComponentKind]] = [] + + root_cmd = component_root / "cmd" + if root_cmd.is_dir(): + for cmd_entry in sorted(root_cmd.iterdir()): + if cmd_entry.is_dir() and self._has_main_go(cmd_entry): + cmd_name = cmd_entry.name + if cmd_name in package_imports: + continue + cmd_imports = self._scan_imports(cmd_entry, module_path) + package_imports[cmd_name] = cmd_imports + kind = self._classify_cmd(cmd_entry) + cmd_components.append((cmd_name, "", cmd_entry, kind)) + for pkg_name, pkg_dir in packages.items(): for cmd_dir in sorted(pkg_dir.rglob("cmd")): if not cmd_dir.is_dir(): @@ -189,8 +217,8 @@ def _build_package_components( internal_deps = self._resolve_import_deps( package_imports[cmd_name], packages, module_path, ) - # Always include the parent package as a dep - if parent_pkg not in internal_deps: + # Root-level cmd binaries have no parent package + if parent_pkg and parent_pkg not in internal_deps: internal_deps.add(parent_pkg) rel_path = str(cmd_dir.relative_to(repo_root)) diff --git a/agent/discovery/languages/rust.py b/agent/discovery/languages/rust.py index 3ca3491..d96a564 100644 --- a/agent/discovery/languages/rust.py +++ b/agent/discovery/languages/rust.py @@ -101,26 +101,50 @@ def _extract_field(self, text: str, field: str) -> str: ) return field_match.group(1) if field_match else "" + _DEP_SECTION_RE = re.compile( + r'^\[(?:target\.[^\]]+\.)?(dependencies|dev-dependencies|build-dependencies)\]' + r'(.*?)(?=^\[|\Z)', + re.MULTILINE | re.DOTALL, + ) + def _parse_dependencies(self, text: str) -> tuple[List[ExternalDependency], List[str]]: - """Parse [dependencies] section.""" + """Parse all dependency tables (runtime, dev, build, target-specific). + + External deps are collected only from runtime [dependencies] sections — + dev/build deps are test/build-time artifacts, not runtime edges. + Internal path deps are collected from every table: a workspace member + referenced by path is an internal edge regardless of which table + declares it. + """ + external: List[ExternalDependency] = [] + internal: set[str] = set() + seen_external: set[str] = set() + + for match in self._DEP_SECTION_RE.finditer(text): + kind = match.group(1) + block = match.group(2) + block_external, block_internal = self._parse_dep_block(block) + internal.update(block_internal) + if kind == "dependencies": + for dep in block_external: + if dep.name in seen_external: + continue + seen_external.add(dep.name) + external.append(dep) + + return external, sorted(internal) + + def _parse_dep_block( + self, block: str, + ) -> tuple[List[ExternalDependency], List[str]]: external: List[ExternalDependency] = [] internal: List[str] = [] - # Find [dependencies] section - dep_match = re.search( - r'^\[dependencies\](.*?)(?:^\[|\Z)', - text, re.MULTILINE | re.DOTALL - ) - if not dep_match: - return external, internal - - dep_section = dep_match.group(1) - for line in dep_section.splitlines(): + for line in block.splitlines(): line = line.strip() - if not line or line.startswith("#"): + if not line or line.startswith("#") or line.startswith("["): continue - # Parse: name = "version" or name = { path = "../foo", ... } match = re.match(r'^([a-zA-Z0-9_-]+)\s*=\s*(.*)', line) if not match: continue @@ -128,16 +152,15 @@ def _parse_dependencies(self, text: str) -> tuple[List[ExternalDependency], List dep_name = match.group(1) dep_value = match.group(2).strip() - if "path" in dep_value: - # Internal dependency + if re.search(r'\bpath\s*=', dep_value): internal.append(dep_name) - else: - version = dep_value.strip('"\'') - if version.startswith("{"): - # Inline table: { version = "1.0", features = [...] } - ver_match = re.search(r'version\s*=\s*["\']([^"\']+)', dep_value) - version = ver_match.group(1) if ver_match else "" - external.append(ExternalDependency(name=dep_name, version=version)) + continue + + version = dep_value.strip('"\'') + if version.startswith("{"): + ver_match = re.search(r'version\s*=\s*["\']([^"\']+)', dep_value) + version = ver_match.group(1) if ver_match else "" + external.append(ExternalDependency(name=dep_name, version=version)) return external, internal @@ -177,12 +200,40 @@ def _classify_executable(self, text: str, component_root: Path) -> ComponentKind # Name-based heuristic if any(kw in name for kw in ["server", "service", "daemon", "node", "api"]): return ComponentKind.SERVICE - if any(kw in name for kw in ["cli", "tool", "cmd"]): + if any(kw in name for kw in ["cli", "tool", "cmd", "gen", "packer"]): return ComponentKind.CLI - return ComponentKind.SERVICE # Default for executables + # Default: bare executables with no server framework are almost always + # CLI tools (build scripts, codegen, packers). Audit corpus showed + # SERVICE default produced far more false positives than CLI default. + return ComponentKind.CLI def _classify_hybrid(self, text: str, component_root: Path) -> ComponentKind: - """Classify a crate with both lib.rs and main.rs/[[bin]].""" - # Primarily a library with a CLI companion + """Classify a crate with both lib.rs and a [[bin]] / main.rs. + + A lib crate can ship a companion binary (tauri-cli ships a bin named + cargo-tauri; workspace crates ship codegen binaries). The presence of + a binary takes precedence when its manifest declaration is explicit — + i.e. [[bin]] with a distinct name, or crate-type = ["cdylib"]/["bin"]. + Otherwise we treat the crate as primarily a library with a test + harness binary. + """ + bin_blocks = re.findall( + r'^\[\[bin\]\](.*?)(?=^\[|\Z)', + text, re.MULTILINE | re.DOTALL, + ) + pkg_name = self._extract_field(text, "name") + + for block in bin_blocks: + bin_name_match = re.search( + r'^\s*name\s*=\s*["\']([^"\']+)', block, re.MULTILINE, + ) + if not bin_name_match: + continue + bin_name = bin_name_match.group(1) + if bin_name != pkg_name: + # An explicitly-named binary distinct from the crate name + # signals this is an executable with a library internal. + return self._classify_executable(text, component_root) + return ComponentKind.LIBRARY diff --git a/agent/discovery/languages/typescript.py b/agent/discovery/languages/typescript.py index 10d24e1..fd7bd84 100644 --- a/agent/discovery/languages/typescript.py +++ b/agent/discovery/languages/typescript.py @@ -107,6 +107,12 @@ def _classify(self, data: dict, component_root: Path) -> ComponentKind: } scripts = data.get("scripts", {}) + # Cloudflare Workers (wrangler.toml) are deployed services + if (component_root / "wrangler.toml").exists() or ( + component_root / "wrangler.jsonc" + ).exists(): + return ComponentKind.SERVICE + # Has bin → CLI if data.get("bin"): return ComponentKind.CLI diff --git a/agent/prompts/subagents/component_analyzer.txt b/agent/prompts/subagents/component_analyzer.txt index 95fe9cf..fef9bb6 100644 --- a/agent/prompts/subagents/component_analyzer.txt +++ b/agent/prompts/subagents/component_analyzer.txt @@ -8,9 +8,9 @@ Component Information: - Description: {component_description} - Direct Dependencies: {dependency_list} -**Working Directory**: All source code is located at `/tmp/{SERVICE_NAME}/project/` -- Use Glob, Grep, and Read tools to analyze code within this directory -- The component_path is relative to `/tmp/{SERVICE_NAME}/project/` +**Working Directory**: All source code is at `/tmp/{SERVICE_NAME}/project/`. +- The component lives at `/tmp/{SERVICE_NAME}/project/{component_path}/`. +- Use glob_files, grep_files, and read_file to explore it. **UPSTREAM CONTEXT** (analyses of direct dependencies): @@ -19,6 +19,29 @@ Component Information: Use this context to understand how this component integrates with its dependencies. Focus on the interfaces and patterns documented above. +**MANDATORY EXPLORATION BEFORE WRITING THE ANALYSIS** + +Do not skip this — shallow analyses are rejected. Work in this order: + +1. `glob_files` the component directory to list every source file (e.g. + `**/*.py`, `**/*.rs`, `**/*.go`, `**/*.ts` — match the component's type). +2. `read_file` on the top-level entry point(s) (`__init__.py`, `lib.rs`, + `mod.rs`, `index.ts`, etc.) in full. +3. `read_file` on every non-trivial source file in the component — at least + the first 300 lines each, more if the file is central. For components + with more than 10 files, read at least 6 key ones end-to-end. +4. `grep_files` to find concrete call patterns, class definitions, trait + impls, route registrations, or CLI command wiring — whatever is relevant + to the component kind. +5. `grep_files` for how this component's code uses symbols from each of its + Direct Dependencies. Follow at least one call site per dependency back + to the file that uses it. +6. Read the manifest file (pyproject.toml, Cargo.toml, package.json, + go.mod) to get the exact dependency list with version constraints. + +Only after you have actually read the code should you write the analysis. +Every claim must be grounded in something you read. + **KIND-SPECIFIC ANALYSIS** Based on the component kind ({component_kind}), focus your analysis appropriately: @@ -122,7 +145,7 @@ At the end of your Markdown analysis, include two JSON blocks: ```json [ { - "file_path": "src/handler.rs", + "file_path": "{component_path}/handler.py", "start_line": 42, "end_line": 56, "claim": "The analysis claim this citation supports", @@ -131,8 +154,24 @@ At the end of your Markdown analysis, include two JSON blocks: ] ``` -Aim for 10-30 citations linking your analysis claims to specific source code locations. -Only cite line numbers you actually observed — never guess. - -Save your analysis to: -- /tmp/{SERVICE_NAME}/service_analyses/{component_name}.md +**Citation rules — read carefully:** +- `file_path` MUST be relative to the repository root and include the full + component path. For this component, paths MUST start with + `{component_path}/` (e.g. `{component_path}/handler.py`, NOT `handler.py` + and NOT `./handler.py`). +- `start_line` and `end_line` MUST come from lines you actually read via + `read_file`. Never guess line numbers — if you didn't read it, don't + cite it. +- Aim for 10-30 citations, at least one per major claim in the analysis. +- Empty `[]` is better than fabricated citations. + +**OUTPUT** + +Respond with the Markdown analysis as your message content. Do NOT call +write_file — the orchestrator saves your response automatically. The +analysis file will be written at +`/tmp/{SERVICE_NAME}/service_analyses/{component_name}.md` using exactly +what you return. Do not include any preamble ("Here is the analysis:"), +do not wrap the whole thing in a code fence, do not add closing +commentary. Start with the top-level `#` heading and end with the +closing ``` of the `## Citations` block. diff --git a/agent/prompts/subagents/component_analyzer_depth0.txt b/agent/prompts/subagents/component_analyzer_depth0.txt index 85f6824..1962db4 100644 --- a/agent/prompts/subagents/component_analyzer_depth0.txt +++ b/agent/prompts/subagents/component_analyzer_depth0.txt @@ -7,12 +7,32 @@ Component Information: - Location: {component_path} - Description: {component_description} -**Working Directory**: All source code is located at `/tmp/{SERVICE_NAME}/project/` -- Use Glob, Grep, and Read tools to analyze code within this directory -- The component_path is relative to `/tmp/{SERVICE_NAME}/project/` +**Working Directory**: All source code is at `/tmp/{SERVICE_NAME}/project/`. +- The component lives at `/tmp/{SERVICE_NAME}/project/{component_path}/`. +- Use glob_files, grep_files, and read_file to explore it. This component has NO internal dependencies. +**MANDATORY EXPLORATION BEFORE WRITING THE ANALYSIS** + +Do not skip this — shallow analyses are rejected. Work in this order: + +1. `glob_files` the component directory to list every source file (e.g. + `**/*.py`, `**/*.rs`, `**/*.go`, `**/*.ts` — match the component's type). +2. `read_file` on the top-level entry point(s) (`__init__.py`, `lib.rs`, + `mod.rs`, `index.ts`, etc.) in full. +3. `read_file` on every non-trivial source file in the component — at least + the first 300 lines each, more if the file is central. For components + with more than 10 files, read at least 6 key ones end-to-end. +4. `grep_files` to find concrete call patterns, class definitions, trait + impls, route registrations, or CLI command wiring — whatever is relevant + to the component kind. +5. Read the manifest file (pyproject.toml, Cargo.toml, package.json, + go.mod) to get the exact dependency list with version constraints. + +Only after you have actually read the code should you write the analysis. +Every claim in your analysis must be grounded in something you read. + **KIND-SPECIFIC ANALYSIS** Based on the component kind ({component_kind}), focus your analysis appropriately: @@ -99,7 +119,7 @@ At the end of your Markdown analysis, include two JSON blocks: ```json [ { - "file_path": "src/handler.rs", + "file_path": "{component_path}/handler.py", "start_line": 42, "end_line": 56, "claim": "The analysis claim this citation supports", @@ -108,8 +128,24 @@ At the end of your Markdown analysis, include two JSON blocks: ] ``` -Aim for 10-30 citations linking your analysis claims to specific source code locations. -Only cite line numbers you actually observed — never guess. - -Save your analysis to: -- /tmp/{SERVICE_NAME}/service_analyses/{component_name}.md +**Citation rules — read carefully:** +- `file_path` MUST be relative to the repository root and include the full + component path. For this component, paths MUST start with + `{component_path}/` (e.g. `{component_path}/handler.py`, NOT `handler.py` + and NOT `./handler.py`). +- `start_line` and `end_line` MUST come from lines you actually read via + `read_file`. Never guess line numbers — if you didn't read it, don't + cite it. +- Aim for 10-30 citations, at least one per major claim in the analysis. +- Empty `[]` is better than fabricated citations. + +**OUTPUT** + +Respond with the Markdown analysis as your message content. Do NOT call +write_file — the orchestrator saves your response automatically. The +analysis file will be written at +`/tmp/{SERVICE_NAME}/service_analyses/{component_name}.md` using exactly +what you return. Do not include any preamble ("Here is the analysis:"), +do not wrap the whole thing in a code fence, do not add closing +commentary. Start with the top-level `#` heading and end with the +closing ``` of the `## Citations` block. diff --git a/agent/prompts/subagents/graph_auditor.txt b/agent/prompts/subagents/graph_auditor.txt new file mode 100644 index 0000000..a062d48 --- /dev/null +++ b/agent/prompts/subagents/graph_auditor.txt @@ -0,0 +1,75 @@ +You are auditing a deterministic dependency graph for correctness. Your job is to independently propose what the graph SHOULD look like based on the source code, then a downstream tool will diff your proposal against the deterministic one. + +**Working Directory**: All source code is at `/tmp/{SERVICE_NAME}/project/`. + +**DETERMINISTIC DISCOVERY OUTPUT** (what the deterministic engine produced): + +Components: +{components_json} + +Edges (source -> target, DEPENDS_ON): +{edges_json} + +**YOUR TASK** + +Independently audit this graph. You are NOT told to trust it. Use the tools to read the source, then answer: + +1. For every component listed, is its `kind` correct? (library/service/cli/contract/infra/pipeline/frontend) +2. For every component, what internal dependencies SHOULD it have based on actual imports, HTTP calls, shared DB access, or other code-level evidence? +3. Are there components the deterministic engine missed entirely? (e.g., a sub-module with its own manifest that wasn't detected, a service defined by a Dockerfile with no manifest) + +**METHOD** — you MUST ground every claim in code: + +1. `glob_files` the repo to get a sense of the layout. +2. For each component listed, read its manifest file (Cargo.toml / package.json / pyproject.toml / go.mod / Package.swift / foundry.toml / hardhat.config.*) AND its main entrypoint. +3. `grep_files` for import statements, HTTP client construction, and any cross-component references. For each suspected edge, find at least one concrete file:line that evidences it. +4. Pay special attention to: + - **Workspace / path-based deps** (`path = "../foo"` in Cargo, `"file:../foo"` in npm, `replace` directives in go.mod, editable installs in pyproject) + - **Dynamic edges** — HTTP calls between services, shared message queues, cross-component DB writes (these are NOT deterministically detectable from manifests) + - **Missed components** — Dockerfiles, standalone scripts, subdirectories with their own lockfile + +**DO NOT** fabricate edges. If you cannot find a specific file:line that evidences an edge, do not propose it. Empty proposals are better than hallucinated ones. + +**OUTPUT FORMAT** — respond with ONE JSON object, no preamble, no code fence, no trailing commentary. Your entire response must parse as JSON: + +``` +{ + "classifications": [ + { + "component_name": "foo-lib", + "proposed_kind": "library", + "evidence_file": "foo-lib/src/lib.rs", + "evidence_lines": "1-20", + "reasoning": "No main.rs, only lib.rs with pub exports — reusable code, no entrypoint." + } + ], + "proposed_edges": [ + { + "source": "service-a", + "target": "foo-lib", + "evidence_type": "import | path_dep | http_call | shared_db | other", + "evidence_file": "service-a/src/main.rs", + "evidence_lines": "5", + "evidence_snippet": "use foo_lib::Config;", + "reasoning": "Direct Rust import of foo_lib crate in main.rs." + } + ], + "missed_components": [ + { + "proposed_name": "migration-worker", + "proposed_root_path": "workers/migrate", + "proposed_kind": "cli", + "evidence_file": "workers/migrate/Dockerfile", + "reasoning": "Dockerfile with ENTRYPOINT defines a standalone worker, no manifest caught by discovery." + } + ] +} +``` + +**CONSTRAINTS** + +- Component names in `proposed_edges` MUST match names in the deterministic components list exactly (or appear in your `missed_components`). +- `evidence_file` paths MUST be relative to repo root and MUST be files you actually read. +- `evidence_lines` must come from lines you read, not guessed. +- If you are uncertain about an edge, omit it rather than including a weak one. +- Keep total response under 8000 tokens — prioritize high-confidence findings. diff --git a/scripts/graph_audit/aggregate.py b/scripts/graph_audit/aggregate.py new file mode 100644 index 0000000..7211fc3 --- /dev/null +++ b/scripts/graph_audit/aggregate.py @@ -0,0 +1,96 @@ +"""Roll up per-repo audit.json findings into a corpus-wide report. + +Produces audit_report.json with per-language/per-plugin failure patterns +ranked by frequency, plus exemplar citations for each pattern. +""" + +from __future__ import annotations + +import argparse +import json +from collections import Counter, defaultdict +from pathlib import Path + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--out-root", type=Path, default=Path("audit_out")) + parser.add_argument("--output", type=Path, default=Path("audit_out/audit_report.json")) + args = parser.parse_args() + + corpus = json.loads(Path("scripts/graph_audit/corpus.json").read_text()) + lang_by_slug = {r["full_name"].replace("/", "__"): r["language"] for r in corpus} + + discovery = json.loads((args.out_root / "discovery_summary.json").read_text()) + + repos = [] + for entry in discovery: + if entry.get("status") != "ok": + continue + audit_path = args.out_root / entry["slug"] / "audit.json" + if not audit_path.exists(): + continue + repos.append((entry["slug"], json.loads(audit_path.read_text()))) + + # Per-language counters + kind_mismatch_by_lang: dict[str, Counter] = defaultdict(Counter) + edge_discrepancy_by_lang: dict[str, Counter] = defaultdict(Counter) + missed_by_lang: dict[str, int] = defaultdict(int) + + # Exemplars (first 3 per bucket) for each (lang, category) + exemplars: dict[tuple[str, str], list[dict]] = defaultdict(list) + + for slug, audit in repos: + if audit.get("status") != "ok": + continue + lang = lang_by_slug.get(slug, "unknown") + findings = audit.get("findings", {}) + + for km in findings.get("kind_mismatches", []): + key = f"{km['deterministic']} -> {km['proposed']}" + kind_mismatch_by_lang[lang][key] += 1 + bucket = (lang, f"kind:{key}") + if len(exemplars[bucket]) < 3: + exemplars[bucket].append({"slug": slug, **km}) + + for ed in findings.get("edge_discrepancies", []): + category = ed.get("category", "unknown") + edge_discrepancy_by_lang[lang][category] += 1 + bucket = (lang, f"edge:{category}") + if len(exemplars[bucket]) < 3: + exemplars[bucket].append({"slug": slug, **ed}) + + missed_by_lang[lang] += len(findings.get("missed_components", [])) + + report = { + "repos_audited": len(repos), + "per_language": {}, + } + for lang in sorted( + set(list(kind_mismatch_by_lang) + list(edge_discrepancy_by_lang) + list(missed_by_lang)) + ): + report["per_language"][lang] = { + "kind_mismatches": dict(kind_mismatch_by_lang[lang].most_common()), + "edge_discrepancies_by_category": dict(edge_discrepancy_by_lang[lang].most_common()), + "missed_component_count": missed_by_lang[lang], + "exemplars": { + key: samples + for (l, key), samples in exemplars.items() + if l == lang + }, + } + + args.output.write_text(json.dumps(report, indent=2)) + print(f"Wrote corpus report to {args.output}") + print(f" Repos audited: {len(repos)}") + for lang, data in report["per_language"].items(): + plugin_gap = data["edge_discrepancies_by_category"].get("plugin_gap", 0) + print( + f" {lang}: {plugin_gap} plugin_gap edge(s), " + f"{sum(data['kind_mismatches'].values())} kind mismatch(es), " + f"{data['missed_component_count']} missed component(s)" + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/graph_audit/fetch_corpus.py b/scripts/graph_audit/fetch_corpus.py new file mode 100644 index 0000000..01c9cd0 --- /dev/null +++ b/scripts/graph_audit/fetch_corpus.py @@ -0,0 +1,103 @@ +"""Fetch a stratified corpus of GitHub repos for graph auditing. + +Uses GitHub's search API (requires `gh` CLI authenticated) to pull +popular + recently active repos per language. Writes corpus.json with +clone URLs; actual cloning happens in run_discovery.py. +""" + +from __future__ import annotations + +import argparse +import json +import subprocess +from pathlib import Path + + +LANGUAGES = { + "rust": "Rust", + "go": "Go", + "typescript": "TypeScript", + "python": "Python", + "solidity": "Solidity", +} + + +def fetch_top_repos(language: str, per_lang: int) -> list[dict]: + """Popular + recently active repos for one language via `gh api`.""" + query = f"language:{language} stars:>500 pushed:>2026-01-01" + cmd = [ + "gh", + "api", + "-X", + "GET", + "search/repositories", + "-f", + f"q={query}", + "-f", + "sort=stars", + "-f", + "order=desc", + "-f", + f"per_page={per_lang}", + ] + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + data = json.loads(result.stdout) + return [ + { + "language": language, + "full_name": item["full_name"], + "clone_url": item["clone_url"], + "stars": item["stargazers_count"], + "size_kb": item["size"], + } + for item in data["items"] + ] + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument( + "--per-lang", + type=int, + default=4, + help="Repos to fetch per language (default 4, yielding 20 total across 5 langs)", + ) + parser.add_argument( + "--output", + type=Path, + default=Path("scripts/graph_audit/corpus.json"), + ) + parser.add_argument( + "--max-size-kb", + type=int, + default=200_000, + help="Skip repos larger than this (avoid multi-GB monorepos)", + ) + args = parser.parse_args() + + corpus: list[dict] = [] + for lang_key, lang_name in LANGUAGES.items(): + print(f"Fetching {args.per_lang} repos for {lang_name}...") + try: + repos = fetch_top_repos(lang_name, args.per_lang * 2) + except subprocess.CalledProcessError as e: + print(f" FAILED for {lang_name}: {e.stderr}") + continue + + kept = 0 + for repo in repos: + if repo["size_kb"] > args.max_size_kb: + print(f" SKIP (too large): {repo['full_name']} ({repo['size_kb']} KB)") + continue + corpus.append(repo) + kept += 1 + if kept >= args.per_lang: + break + + args.output.parent.mkdir(parents=True, exist_ok=True) + args.output.write_text(json.dumps(corpus, indent=2)) + print(f"\nWrote {len(corpus)} repos to {args.output}") + + +if __name__ == "__main__": + main() diff --git a/scripts/graph_audit/run_auditor.py b/scripts/graph_audit/run_auditor.py new file mode 100644 index 0000000..3fa1959 --- /dev/null +++ b/scripts/graph_audit/run_auditor.py @@ -0,0 +1,383 @@ +"""Run the graph-auditor subagent against each repo and diff vs deterministic graph. + +For each audit_out/{slug}/ directory produced by run_discovery.py, this: + 1. Loads components.json + edges.json + 2. Feeds them into a graph-auditor LLM loop (reuses flashlight tools) + 3. Parses the auditor's JSON proposal + 4. Diffs proposed vs deterministic, categorizes each discrepancy + 5. Writes audit.json + auditor_raw.txt + +Enforces a corpus-wide USD budget and halts before exceeding it. +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import re +import sys +from pathlib import Path +from typing import Any + +# Add repo root to sys.path so we can import agent.* +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) + +from agent.burr_app import ( # noqa: E402 + AVAILABLE_TOOLS, + SUBAGENT_TOOL_FUNCTIONS, + _chat_completion, +) + +logger = logging.getLogger(__name__) + + +# Rough per-model pricing (USD per 1M tokens). Used only for budget enforcement; +# overstate slightly so we stop before the actual bill catches up. +MODEL_PRICING = { + "anthropic/claude-sonnet-4-6": {"input": 3.0, "output": 15.0}, + "anthropic/claude-sonnet-4.6": {"input": 3.0, "output": 15.0}, + "anthropic/claude-sonnet-4": {"input": 3.0, "output": 15.0}, + "gpt-4o": {"input": 2.5, "output": 10.0}, + "gpt-4o-mini": {"input": 0.15, "output": 0.6}, +} + + +def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float: + pricing = MODEL_PRICING.get(model) + if not pricing: + # Unknown model — assume Sonnet-tier to avoid underestimating + pricing = MODEL_PRICING["anthropic/claude-sonnet-4-6"] + return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000 + + +def build_audit_prompt( + components: list[dict], + edges: list[dict], + service_name: str, +) -> str: + template = ( + Path(__file__).resolve().parents[2] + / "agent/prompts/subagents/graph_auditor.txt" + ).read_text() + return ( + template.replace("{SERVICE_NAME}", service_name) + .replace("{components_json}", json.dumps(components, indent=2)) + .replace("{edges_json}", json.dumps(edges, indent=2)) + ) + + +FINALIZE_DIRECTIVE = ( + "You have reached the exploration budget. Stop calling tools. " + "Produce your final JSON object now, based on what you've already read. " + "A partial, honest answer is better than more exploration. " + "Respond with the JSON object only, no preamble or code fence." +) + + +def run_auditor_loop( + user_prompt: str, + system_prompt: str, + model: str, + max_iterations: int = 40, +) -> tuple[str, dict[str, int]]: + """Lightweight ReAct loop that returns (final_content, token_counts). + + Mirrors _run_subagent_loop in burr_app.py but exposes token usage so we + can enforce the corpus-wide budget. If the model runs out of exploration + budget, we send a finalize directive and give it one more turn with no + tools available to produce the final JSON. + """ + subagent_tools = [ + t for t in AVAILABLE_TOOLS if t["function"]["name"] != "spawn_subagent" + ] + + messages: list[dict[str, Any]] = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + + total_input = 0 + total_output = 0 + + def run_one(tools: list | None) -> tuple[str, list]: + nonlocal total_input, total_output + response = _chat_completion(messages=messages, model=model, tools=tools) + usage = response.get("usage", {}) + total_input += usage.get("prompt_tokens", 0) + total_output += usage.get("completion_tokens", 0) + content = response.get("content", "") + tool_calls = response.get("tool_calls", []) + msg: dict[str, Any] = {"role": "assistant", "content": content} + if tool_calls: + msg["tool_calls"] = tool_calls + messages.append(msg) + return content, tool_calls + + for iteration in range(max_iterations): + content, tool_calls = run_one(subagent_tools) + + if not tool_calls: + return content or "(no response)", {"input": total_input, "output": total_output} + + for tool_call in tool_calls: + tool_name = tool_call["function"]["name"] + tool_id = tool_call["id"] + try: + tool_args = json.loads(tool_call["function"]["arguments"]) + except json.JSONDecodeError as e: + messages.append( + {"role": "tool", "tool_call_id": tool_id, "content": f"Error parsing args: {e}"} + ) + continue + + if tool_name in SUBAGENT_TOOL_FUNCTIONS: + try: + result = SUBAGENT_TOOL_FUNCTIONS[tool_name](**tool_args) + except Exception as e: + result = f"Error: {e}" + else: + result = f"Unknown tool: {tool_name}" + + messages.append({"role": "tool", "tool_call_id": tool_id, "content": result}) + + messages.append({"role": "user", "content": FINALIZE_DIRECTIVE}) + content, _ = run_one(tools=None) + return content or "(no response)", {"input": total_input, "output": total_output} + + +def extract_json_object(text: str) -> dict | None: + """Find the first top-level JSON object in the auditor's response.""" + # Try fenced first + fence = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL) + if fence: + try: + return json.loads(fence.group(1)) + except json.JSONDecodeError: + pass + + # Fall back to first { ... } that balances + start = text.find("{") + if start < 0: + return None + depth = 0 + for i, ch in enumerate(text[start:], start=start): + if ch == "{": + depth += 1 + elif ch == "}": + depth -= 1 + if depth == 0: + try: + return json.loads(text[start : i + 1]) + except json.JSONDecodeError: + return None + return None + + +def categorize_edge( + source: str, + target: str, + evidence_type: str, + in_deterministic: bool, +) -> str: + """Classify a proposed vs deterministic edge discrepancy.""" + if in_deterministic: + return "agrees" + if evidence_type in ("http_call", "shared_db", "other"): + return "dynamic_edge" # Not expected from manifest — no regression + if evidence_type in ("import", "path_dep"): + return "plugin_gap" # Manifest-level edge the parser should have caught + return "ambiguous" + + +def diff_graphs( + deterministic_components: list[dict], + deterministic_edges: list[dict], + audit: dict, +) -> dict: + det_component_names = {c["name"] for c in deterministic_components} + det_edges = { + (e["source"], e["target"]) + for e in deterministic_edges + if e.get("type", "depends_on") == "depends_on" + } + det_kinds = {c["name"]: c.get("kind") for c in deterministic_components} + + findings = { + "kind_mismatches": [], + "edge_discrepancies": [], + "missed_components": audit.get("missed_components", []), + } + + for cls in audit.get("classifications", []): + name = cls.get("component_name") + proposed = cls.get("proposed_kind") + det = det_kinds.get(name) + if det is None or proposed is None: + continue + if det != proposed: + findings["kind_mismatches"].append( + { + "component": name, + "deterministic": det, + "proposed": proposed, + "evidence_file": cls.get("evidence_file"), + "reasoning": cls.get("reasoning"), + } + ) + + for edge in audit.get("proposed_edges", []): + source = edge.get("source") + target = edge.get("target") + if source not in det_component_names and source not in { + m.get("proposed_name") for m in findings["missed_components"] + }: + continue + evidence_type = edge.get("evidence_type", "unknown") + in_det = (source, target) in det_edges + category = categorize_edge(source, target, evidence_type, in_det) + if category == "agrees": + continue + findings["edge_discrepancies"].append( + { + "source": source, + "target": target, + "evidence_type": evidence_type, + "evidence_file": edge.get("evidence_file"), + "evidence_lines": edge.get("evidence_lines"), + "evidence_snippet": edge.get("evidence_snippet"), + "reasoning": edge.get("reasoning"), + "category": category, + } + ) + + return findings + + +def audit_repo( + repo_dir: Path, + service_name: str, + model: str, +) -> tuple[dict, dict[str, int]]: + components = json.loads((repo_dir / "components.json").read_text()) + edges = json.loads((repo_dir / "edges.json").read_text()) + + if not components: + return {"status": "empty_discovery", "components": 0}, {"input": 0, "output": 0} + + prompt = build_audit_prompt(components, edges, service_name) + system = ( + "You are a graph auditor for a deterministic code-component discovery " + "engine. You respond with a single JSON object, no prose." + ) + + raw, tokens = run_auditor_loop(prompt, system, model) + (repo_dir / "auditor_raw.txt").write_text(raw) + + audit = extract_json_object(raw) + if audit is None: + return ( + { + "status": "parse_failed", + "components": len(components), + "edges": len(edges), + "tokens": tokens, + }, + tokens, + ) + + findings = diff_graphs(components, edges, audit) + return ( + { + "status": "ok", + "service_name": service_name, + "components": len(components), + "edges": len(edges), + "tokens": tokens, + "findings": findings, + "raw_audit": audit, + }, + tokens, + ) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--out-root", type=Path, default=Path("audit_out")) + parser.add_argument( + "--model", + default=os.environ.get("AUDITOR_MODEL", "anthropic/claude-sonnet-4-6"), + ) + parser.add_argument("--budget-usd", type=float, default=45.0) + parser.add_argument("--limit", type=int, help="Only process first N repos") + parser.add_argument("--only", help="Only process this slug (for dry runs)") + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + + summary_path = args.out_root / "discovery_summary.json" + discovery_summary = json.loads(summary_path.read_text()) + + eligible = [r for r in discovery_summary if r.get("status") == "ok"] + if args.only: + eligible = [r for r in eligible if r["slug"] == args.only] + if args.limit: + eligible = eligible[: args.limit] + + total_input = 0 + total_output = 0 + audit_summaries = [] + + for repo in eligible: + slug = repo["slug"] + repo_dir = args.out_root / slug + print(f"\n[{slug}] auditing ({repo['component_count']} components, {repo['edge_count']} edges)") + + projected = estimate_cost(args.model, total_input, total_output) + if projected >= args.budget_usd: + print(f" BUDGET EXHAUSTED (${projected:.2f} / ${args.budget_usd:.2f}) — halting") + break + + try: + result, tokens = audit_repo(repo_dir, slug, args.model) + except Exception as e: + logger.exception("Audit failed for %s", slug) + result = {"status": "error", "error": str(e)} + tokens = {"input": 0, "output": 0} + + total_input += tokens["input"] + total_output += tokens["output"] + running_cost = estimate_cost(args.model, total_input, total_output) + print( + f" -> {result.get('status')} (tokens: {tokens['input']} in / " + f"{tokens['output']} out, running ${running_cost:.2f})" + ) + + (repo_dir / "audit.json").write_text(json.dumps(result, indent=2)) + audit_summaries.append( + { + "slug": slug, + "status": result.get("status"), + "tokens": tokens, + "cumulative_cost_usd": running_cost, + } + ) + + (args.out_root / "audit_summary.json").write_text( + json.dumps( + { + "model": args.model, + "total_input_tokens": total_input, + "total_output_tokens": total_output, + "total_cost_usd": estimate_cost(args.model, total_input, total_output), + "per_repo": audit_summaries, + }, + indent=2, + ) + ) + print(f"\nFinal cost: ${estimate_cost(args.model, total_input, total_output):.2f}") + + +if __name__ == "__main__": + main() diff --git a/scripts/graph_audit/run_discovery.py b/scripts/graph_audit/run_discovery.py new file mode 100644 index 0000000..90f8a85 --- /dev/null +++ b/scripts/graph_audit/run_discovery.py @@ -0,0 +1,115 @@ +"""Clone each repo in corpus.json and run Flashlight deterministic discovery. + +Writes per-repo artifacts under audit_out/{slug}/: + - components.json + - edges.json + - analysis_order.json + - project/ (shallow clone of the repo) + +No LLM calls. +""" + +from __future__ import annotations + +import argparse +import json +import shutil +import subprocess +from pathlib import Path + +from agent.discovery.engine import discover_components +from agent.schemas.knowledge_graph import KnowledgeGraphBuilder + + +def slugify(full_name: str) -> str: + return full_name.replace("/", "__") + + +def clone_shallow(clone_url: str, dest: Path) -> bool: + if dest.exists(): + return True + dest.parent.mkdir(parents=True, exist_ok=True) + try: + subprocess.run( + ["git", "clone", "--depth", "1", clone_url, str(dest)], + check=True, + capture_output=True, + timeout=300, + ) + return True + except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: + print(f" CLONE FAILED: {e}") + if dest.exists(): + shutil.rmtree(dest, ignore_errors=True) + return False + + +def run_for_repo(repo: dict, out_root: Path) -> dict: + slug = slugify(repo["full_name"]) + repo_dir = out_root / slug + project_dir = repo_dir / "project" + + result = {"full_name": repo["full_name"], "slug": slug, "language": repo["language"]} + + if not clone_shallow(repo["clone_url"], project_dir): + result["status"] = "clone_failed" + return result + + try: + components = discover_components(project_dir) + except Exception as e: + result["status"] = "discovery_failed" + result["error"] = str(e) + return result + + graph = KnowledgeGraphBuilder(components).build(source_repo=repo["full_name"]) + analysis_order = graph.get_depth_order() + + (repo_dir / "components.json").write_text( + json.dumps([c.to_dict() for c in components], indent=2) + ) + (repo_dir / "edges.json").write_text( + json.dumps([e.to_dict() for e in graph.edges], indent=2) + ) + (repo_dir / "analysis_order.json").write_text(json.dumps(analysis_order, indent=2)) + + result["status"] = "ok" + result["component_count"] = len(components) + result["edge_count"] = len(graph.edges) + result["depth_levels"] = len(analysis_order) + return result + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--corpus", type=Path, default=Path("scripts/graph_audit/corpus.json")) + parser.add_argument("--out-root", type=Path, default=Path("audit_out")) + parser.add_argument("--limit", type=int, help="Only process first N repos (for dry runs)") + parser.add_argument( + "--max-components", + type=int, + default=100, + help="Skip repos with more than this many components (too expensive to audit)", + ) + args = parser.parse_args() + + corpus = json.loads(args.corpus.read_text()) + if args.limit: + corpus = corpus[: args.limit] + + args.out_root.mkdir(parents=True, exist_ok=True) + summary = [] + for repo in corpus: + print(f"\n[{repo['full_name']}] ({repo['language']})") + result = run_for_repo(repo, args.out_root) + if result.get("component_count", 0) > args.max_components: + result["status"] = "too_many_components" + print(f" -> {result.get('status')} (components={result.get('component_count')}, edges={result.get('edge_count')})") + summary.append(result) + + (args.out_root / "discovery_summary.json").write_text(json.dumps(summary, indent=2)) + print(f"\nWrote summary to {args.out_root / 'discovery_summary.json'}") + + +if __name__ == "__main__": + main() diff --git a/tests/discovery/test_rust_plugin.py b/tests/discovery/test_rust_plugin.py index 91cf5be..6e6fbd7 100644 --- a/tests/discovery/test_rust_plugin.py +++ b/tests/discovery/test_rust_plugin.py @@ -157,4 +157,103 @@ def test_hybrid_lib_and_bin(self, plugin, repo): repo.write("src/main.rs", "fn main() {}\n") comps = plugin.parse_manifest(repo.root / "Cargo.toml", repo.root) - assert comps[0].kind == ComponentKind.LIBRARY # hybrid defaults to library + assert comps[0].kind == ComponentKind.LIBRARY # same-name bin → library + + +class TestHybridAndExecutableClassification: + """Corpus findings: [[bin]] with distinct name indicates executable, + bare executables default to CLI not SERVICE.""" + + def test_hybrid_with_distinct_bin_name_is_executable(self, plugin, repo): + # tauri-cli pattern: lib + bin named cargo-tauri + repo.write("Cargo.toml", """ +[package] +name = "tauri-cli" +[lib] +[[bin]] +name = "cargo-tauri" +path = "src/main.rs" + +[dependencies] +clap = "4.0" +""") + repo.write("src/lib.rs", "") + repo.write("src/main.rs", "fn main() {}\n") + + comps = plugin.parse_manifest(repo.root / "Cargo.toml", repo.root) + assert comps[0].kind == ComponentKind.CLI + + def test_bare_executable_defaults_to_cli(self, plugin, repo): + # Bare build-time binary with no framework hints — should be CLI + repo.write("Cargo.toml", """ +[package] +name = "my-packer" +""") + repo.write("src/main.rs", "fn main() {}\n") + + comps = plugin.parse_manifest(repo.root / "Cargo.toml", repo.root) + assert comps[0].kind == ComponentKind.CLI + + +class TestNonDefaultDependencyTables: + """Corpus findings: path-based deps in [dev-dependencies], + [build-dependencies], and [target.*.dependencies] must produce + internal edges.""" + + def test_dev_dependency_path_is_internal(self, plugin, repo): + repo.write("Cargo.toml", """ +[package] +name = "my-app" + +[dev-dependencies] +mock-service = { path = "../mock-service" } +""") + repo.write("src/main.rs", "fn main() {}\n") + + comps = plugin.parse_manifest(repo.root / "Cargo.toml", repo.root) + assert "mock-service" in comps[0].internal_dependencies + + def test_build_dependency_path_is_internal(self, plugin, repo): + repo.write("Cargo.toml", """ +[package] +name = "my-app" + +[build-dependencies] +hbb_common = { path = "libs/hbb_common" } +""") + repo.write("src/main.rs", "fn main() {}\n") + + comps = plugin.parse_manifest(repo.root / "Cargo.toml", repo.root) + assert "hbb_common" in comps[0].internal_dependencies + + def test_target_specific_dependency_path_is_internal(self, plugin, repo): + repo.write("Cargo.toml", """ +[package] +name = "my-app" + +[target.'cfg(target_os = "linux")'.dependencies] +linux-shim = { path = "../linux-shim" } +""") + repo.write("src/main.rs", "fn main() {}\n") + + comps = plugin.parse_manifest(repo.root / "Cargo.toml", repo.root) + assert "linux-shim" in comps[0].internal_dependencies + + def test_dev_dep_external_not_in_runtime_externals(self, plugin, repo): + # dev/build deps are test-time; they shouldn't leak into runtime externals + repo.write("Cargo.toml", """ +[package] +name = "my-lib" + +[dependencies] +serde = "1.0" + +[dev-dependencies] +criterion = "0.5" +""") + repo.write("src/lib.rs", "") + + comps = plugin.parse_manifest(repo.root / "Cargo.toml", repo.root) + ext_names = {d.name for d in comps[0].external_dependencies} + assert "serde" in ext_names + assert "criterion" not in ext_names diff --git a/tests/discovery/test_typescript_plugin.py b/tests/discovery/test_typescript_plugin.py index 504cb0b..1af1df6 100644 --- a/tests/discovery/test_typescript_plugin.py +++ b/tests/discovery/test_typescript_plugin.py @@ -150,3 +150,25 @@ def test_external_deps(self, plugin, repo): assert "lodash" in dep_names # devDependencies should not be in external_dependencies assert "jest" not in dep_names + + +class TestCloudflareWorker: + """Corpus finding: packages with wrangler.toml are deployed services.""" + + def test_wrangler_toml_sets_service_kind(self, plugin, repo): + repo.write_json("package.json", { + "name": "my-worker", + "scripts": {"deploy": "wrangler deploy", "dev": "wrangler dev"}, + }) + repo.write("wrangler.toml", 'name = "my-worker"\nmain = "src/index.ts"\n') + repo.write("src/index.ts", "export default {}") + + comps = plugin.parse_manifest(repo.root / "package.json", repo.root) + assert comps[0].kind == ComponentKind.SERVICE + + def test_wrangler_jsonc_also_detected(self, plugin, repo): + repo.write_json("package.json", {"name": "my-worker"}) + repo.write("wrangler.jsonc", '{"name": "my-worker"}') + + comps = plugin.parse_manifest(repo.root / "package.json", repo.root) + assert comps[0].kind == ComponentKind.SERVICE diff --git a/tests/test_analyze_current_depth.py b/tests/test_analyze_current_depth.py new file mode 100644 index 0000000..4b32fdc --- /dev/null +++ b/tests/test_analyze_current_depth.py @@ -0,0 +1,550 @@ +"""Tests for analyze_current_depth's parallel execution and error handling. + +The action runs one ThreadPoolExecutor per depth level, bounded by +FLASHLIGHT_MAX_PARALLEL. These tests exercise that behavior without +touching any real LLM by monkeypatching _run_component_analyzer. +""" + +import os +import threading +import time +from unittest.mock import MagicMock + +import pytest +from burr.core import State + +import agent.burr_app as burr_app +from agent.burr_app import ( + _build_upstream_context, + _get_max_parallel, + _run_component_analyzer, + analyze_current_depth, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _component(name, *, deps=None, kind="library"): + """Build a minimal component dict matching the structure analyze_current_depth expects.""" + return { + "name": name, + "kind": kind, + "type": "python-package", + "root_path": f"path/{name}", + "description": "", + "internal_dependencies": deps or [], + } + + +def _state(components, depth_order, current_depth=0, prior_analyses=None): + """Construct the Burr State slice analyze_current_depth needs.""" + return State( + { + "components": {c["name"]: c for c in components}, + "depth_order": depth_order, + "current_depth": current_depth, + "component_analyses": prior_analyses or {}, + "service_name": "test-svc", + } + ) + + +@pytest.fixture(autouse=True) +def _reset_max_parallel(monkeypatch): + """Ensure FLASHLIGHT_MAX_PARALLEL never leaks between tests.""" + monkeypatch.delenv("FLASHLIGHT_MAX_PARALLEL", raising=False) + yield + + +# --------------------------------------------------------------------------- +# _get_max_parallel: env-var parsing +# --------------------------------------------------------------------------- + + +class TestGetMaxParallel: + def test_default_is_four(self): + assert _get_max_parallel() == 4 + + def test_valid_integer(self, monkeypatch): + monkeypatch.setenv("FLASHLIGHT_MAX_PARALLEL", "8") + assert _get_max_parallel() == 8 + + def test_one_opts_out_of_parallelism(self, monkeypatch): + monkeypatch.setenv("FLASHLIGHT_MAX_PARALLEL", "1") + assert _get_max_parallel() == 1 + + def test_negative_clamped_to_one(self, monkeypatch): + monkeypatch.setenv("FLASHLIGHT_MAX_PARALLEL", "-5") + assert _get_max_parallel() == 1 + + def test_zero_clamped_to_one(self, monkeypatch): + monkeypatch.setenv("FLASHLIGHT_MAX_PARALLEL", "0") + assert _get_max_parallel() == 1 + + def test_garbage_falls_back_to_default(self, monkeypatch): + monkeypatch.setenv("FLASHLIGHT_MAX_PARALLEL", "not-a-number") + assert _get_max_parallel() == 4 + + def test_empty_string_falls_back_to_default(self, monkeypatch): + monkeypatch.setenv("FLASHLIGHT_MAX_PARALLEL", "") + assert _get_max_parallel() == 4 + + +# --------------------------------------------------------------------------- +# _build_upstream_context: dependency summary assembly +# --------------------------------------------------------------------------- + + +class TestBuildUpstreamContext: + def test_no_deps_returns_empty(self): + assert _build_upstream_context([], {"other": "analysis"}) == "" + + def test_missing_analyses_silently_skipped(self): + out = _build_upstream_context( + ["dep_a", "dep_b"], {"dep_b": "## Summary\nDep B summary\n"} + ) + assert "dep_a" not in out + assert "dep_b" in out + assert "Dep B summary" in out + + def test_extracts_summary_section(self): + analysis = "## Summary\nI do a thing.\nAnd another.\n\n## Other\nIgnored" + out = _build_upstream_context(["dep"], {"dep": analysis}) + assert "### dep" in out + assert "I do a thing." in out + assert "Ignored" not in out + + def test_preserves_dep_order(self): + analyses = { + "b": "## Summary\nB summary\n", + "a": "## Summary\nA summary\n", + } + # Caller-supplied dependency order is preserved. + out = _build_upstream_context(["b", "a"], analyses) + assert out.index("### b") < out.index("### a") + + +# --------------------------------------------------------------------------- +# analyze_current_depth: core behavior +# --------------------------------------------------------------------------- + + +class TestAnalyzeCurrentDepth: + def test_single_component(self, monkeypatch): + calls = [] + + def fake_runner(**kwargs): + calls.append(kwargs["component"]["name"]) + return f"## Summary\n{kwargs['component']['name']} done\n" + + monkeypatch.setattr(burr_app, "_run_component_analyzer", fake_runner) + + state = _state( + components=[_component("solo")], + depth_order=[["solo"]], + ) + out = analyze_current_depth(state, MagicMock()) + + assert calls == ["solo"] + assert out.get("component_analyses") == {"solo": "## Summary\nsolo done\n"} + assert out.get("current_depth") == 1 + + def test_multiple_components_all_analyzed(self, monkeypatch): + runs = [] + + def fake_runner(**kwargs): + runs.append(kwargs["component"]["name"]) + return f"## Summary\n{kwargs['component']['name']}\n" + + monkeypatch.setattr(burr_app, "_run_component_analyzer", fake_runner) + + state = _state( + components=[_component("a"), _component("b"), _component("c")], + depth_order=[["a", "b", "c"]], + ) + out = analyze_current_depth(state, MagicMock()) + + assert sorted(runs) == ["a", "b", "c"] + assert set(out.get("component_analyses").keys()) == {"a", "b", "c"} + + def test_results_merged_in_deterministic_order(self, monkeypatch): + """Even if futures finish in reverse order, the merge order matches + depth_order so downstream synthesis is reproducible.""" + finish_delays = {"a": 0.06, "b": 0.03, "c": 0.01} + + def fake_runner(**kwargs): + name = kwargs["component"]["name"] + time.sleep(finish_delays[name]) + return f"## Summary\n{name}\n" + + monkeypatch.setattr(burr_app, "_run_component_analyzer", fake_runner) + + state = _state( + components=[_component("a"), _component("b"), _component("c")], + depth_order=[["a", "b", "c"]], + ) + out = analyze_current_depth(state, MagicMock()) + + # c finishes first, b second, a last — but the dict key order is + # the depth_order order (a, b, c). + assert list(out.get("component_analyses").keys()) == ["a", "b", "c"] + + def test_components_actually_run_in_parallel(self, monkeypatch): + """Wall time for N 100ms tasks with MAX_PARALLEL=N should be ~100ms, + not N*100ms. Tolerance is generous so flakes are unlikely.""" + per_task = 0.10 + n = 4 + + def fake_runner(**kwargs): + time.sleep(per_task) + return f"## Summary\n{kwargs['component']['name']}\n" + + monkeypatch.setattr(burr_app, "_run_component_analyzer", fake_runner) + monkeypatch.setenv("FLASHLIGHT_MAX_PARALLEL", str(n)) + + state = _state( + components=[_component(f"c{i}") for i in range(n)], + depth_order=[[f"c{i}" for i in range(n)]], + ) + t0 = time.perf_counter() + analyze_current_depth(state, MagicMock()) + elapsed = time.perf_counter() - t0 + + sequential_lower_bound = per_task * n # 0.40s + # If parallel, we should be well below sequential_lower_bound. + # Allow a fat safety margin (75% of sequential) for CI noise. + assert elapsed < sequential_lower_bound * 0.75, ( + f"Expected parallel run much faster than {sequential_lower_bound}s, " + f"got {elapsed:.3f}s" + ) + + def test_max_parallel_one_forces_serial(self, monkeypatch): + """MAX_PARALLEL=1 should serialize: per-task sleep seen N times + end-to-end, not once.""" + per_task = 0.10 + n = 3 + + def fake_runner(**kwargs): + time.sleep(per_task) + return f"## Summary\n{kwargs['component']['name']}\n" + + monkeypatch.setattr(burr_app, "_run_component_analyzer", fake_runner) + monkeypatch.setenv("FLASHLIGHT_MAX_PARALLEL", "1") + + state = _state( + components=[_component(f"c{i}") for i in range(n)], + depth_order=[[f"c{i}" for i in range(n)]], + ) + t0 = time.perf_counter() + analyze_current_depth(state, MagicMock()) + elapsed = time.perf_counter() - t0 + + # Should take at least (n-1)*per_task — if parallel, would be ~per_task. + assert elapsed >= per_task * (n - 1), ( + f"Expected serial run ≥ {per_task * (n - 1)}s, got {elapsed:.3f}s" + ) + + def test_max_parallel_respects_pool_cap(self, monkeypatch): + """With MAX_PARALLEL=2 and 4 tasks @ 100ms each, wall time should be + ~200ms (2 waves of 2), not 100ms (all four at once) or 400ms (serial).""" + per_task = 0.10 + n = 4 + active = {"count": 0, "peak": 0} + lock = threading.Lock() + + def fake_runner(**kwargs): + with lock: + active["count"] += 1 + active["peak"] = max(active["peak"], active["count"]) + time.sleep(per_task) + with lock: + active["count"] -= 1 + return f"## Summary\n{kwargs['component']['name']}\n" + + monkeypatch.setattr(burr_app, "_run_component_analyzer", fake_runner) + monkeypatch.setenv("FLASHLIGHT_MAX_PARALLEL", "2") + + state = _state( + components=[_component(f"c{i}") for i in range(n)], + depth_order=[[f"c{i}" for i in range(n)]], + ) + analyze_current_depth(state, MagicMock()) + + # Peak concurrency should equal the cap, not exceed it. + assert active["peak"] == 2, f"expected peak=2, got {active['peak']}" + + def test_worker_exception_captured_as_error_string(self, monkeypatch): + """A crashing subagent shouldn't kill the whole depth — its result + becomes an Error-prefixed string and peers still run.""" + + def fake_runner(**kwargs): + if kwargs["component"]["name"] == "bad": + raise RuntimeError("boom") + return f"## Summary\n{kwargs['component']['name']}\n" + + monkeypatch.setattr(burr_app, "_run_component_analyzer", fake_runner) + + state = _state( + components=[_component("good"), _component("bad"), _component("also_good")], + depth_order=[["good", "bad", "also_good"]], + ) + out = analyze_current_depth(state, MagicMock()) + analyses = out.get("component_analyses") + + assert analyses["good"].startswith("## Summary") + assert analyses["also_good"].startswith("## Summary") + assert analyses["bad"].startswith("Error:") + assert "boom" in analyses["bad"] + # Depth still advances. + assert out.get("current_depth") == 1 + + def test_missing_component_is_warned_and_skipped(self, monkeypatch, caplog): + """A name in depth_order that isn't in the components inventory should + log a warning but not stop the run.""" + calls = [] + + def fake_runner(**kwargs): + calls.append(kwargs["component"]["name"]) + return "## Summary\nok\n" + + monkeypatch.setattr(burr_app, "_run_component_analyzer", fake_runner) + + state = _state( + components=[_component("real")], + depth_order=[["real", "ghost"]], + ) + import logging + + with caplog.at_level(logging.WARNING, logger="agent.burr_app"): + out = analyze_current_depth(state, MagicMock()) + + assert calls == ["real"] + assert "ghost" not in out.get("component_analyses") + assert any("ghost" in rec.message for rec in caplog.records) + + def test_upstream_context_built_from_prior_analyses(self, monkeypatch): + """A depth>0 component should receive the summaries of its deps.""" + captured_context = {} + + def fake_runner(**kwargs): + captured_context[kwargs["component"]["name"]] = kwargs["upstream_context"] + return f"## Summary\n{kwargs['component']['name']}\n" + + monkeypatch.setattr(burr_app, "_run_component_analyzer", fake_runner) + + prior = { + "dep_a": "## Summary\nDep A does stuff\n", + "dep_b": "## Summary\nDep B does other stuff\n", + } + state = _state( + components=[ + _component("consumer", deps=["dep_a", "dep_b"]), + ], + depth_order=[["consumer"]], + prior_analyses=prior, + ) + analyze_current_depth(state, MagicMock()) + + ctx = captured_context["consumer"] + assert "### dep_a" in ctx + assert "Dep A does stuff" in ctx + assert "### dep_b" in ctx + assert "Dep B does other stuff" in ctx + + def test_depth_already_exhausted_noop(self, monkeypatch): + """If current_depth >= len(depth_order), no work is attempted.""" + monkeypatch.setattr( + burr_app, + "_run_component_analyzer", + lambda **kw: pytest.fail("should not be called"), + ) + + state = _state( + components=[_component("a")], + depth_order=[["a"]], + current_depth=5, + ) + out = analyze_current_depth(state, MagicMock()) + assert out.get("component_analyses") == {} + + def test_empty_depth_still_advances(self, monkeypatch): + """An empty depth bucket is legal (e.g. diff-driven runs) — advance + without running anything.""" + monkeypatch.setattr( + burr_app, + "_run_component_analyzer", + lambda **kw: pytest.fail("should not be called"), + ) + + state = _state(components=[], depth_order=[[]]) + out = analyze_current_depth(state, MagicMock()) + assert out.get("current_depth") == 1 + assert out.get("component_analyses") == {} + + def test_prior_analyses_preserved(self, monkeypatch): + """analyses accumulated across earlier depths aren't clobbered.""" + + def fake_runner(**kwargs): + return f"## Summary\nnew: {kwargs['component']['name']}\n" + + monkeypatch.setattr(burr_app, "_run_component_analyzer", fake_runner) + + state = _state( + components=[_component("new")], + depth_order=[["new"]], + prior_analyses={"old": "## Summary\nold result\n"}, + ) + out = analyze_current_depth(state, MagicMock()) + + analyses = out.get("component_analyses") + assert analyses["old"] == "## Summary\nold result\n" + assert analyses["new"].startswith("## Summary\nnew: new") + + +# --------------------------------------------------------------------------- +# _run_component_analyzer: orchestrator-writes-file behavior +# +# LLMs drop the write_file tool call ~75% of the time, especially on +# smaller models. The orchestrator persists the final_response to disk +# itself to guarantee every completed subagent produces a .md file. +# --------------------------------------------------------------------------- + + +class TestOrchestratorWritesAnalysis: + def test_writes_final_response_to_expected_path(self, tmp_path, monkeypatch): + """The runner's return value should land in service_analyses/{name}.md.""" + # Redirect /tmp/{service_name}/ at a tmp_path so the test is hermetic. + service_name = "hermetic-svc" + svc_root = tmp_path / service_name + svc_root.mkdir() + + monkeypatch.setattr( + burr_app, + "_run_subagent_as_app", + lambda **kwargs: "# Mock Analysis\n\nSubstantive content.", + ) + + # Patch the Path construction inside _run_component_analyzer + import pathlib + + real_path = pathlib.Path + + def patched_path(arg): + if isinstance(arg, str) and arg.startswith(f"/tmp/{service_name}/"): + return real_path(str(tmp_path) + arg[len("/tmp"):]) + return real_path(arg) + + # Easier: patch the /tmp prefix by wrapping the out_path logic. + # We do that by monkey-patching Path inside burr_app; simpler is to + # run with the real /tmp and clean up. + out_file = pathlib.Path(f"/tmp/{service_name}/service_analyses/comp.md") + if out_file.exists(): + out_file.unlink() + try: + result = _run_component_analyzer( + component={ + "name": "comp", + "kind": "library", + "type": "python-package", + "root_path": "path/comp", + }, + service_name=service_name, + upstream_context="", + tracer=None, + ) + assert result.startswith("# Mock Analysis") + assert out_file.exists(), "expected orchestrator to persist analysis" + assert out_file.read_text() == "# Mock Analysis\n\nSubstantive content." + finally: + # Clean up the /tmp side-effect. + if out_file.exists(): + out_file.unlink() + try: + out_file.parent.rmdir() + out_file.parent.parent.rmdir() + except OSError: + pass # directory not empty or already gone — that's fine + + def test_does_not_write_when_subagent_errored(self, monkeypatch): + """Error-prefixed results shouldn't land on disk — they'd pollute + the analyses directory with garbage.""" + service_name = "error-svc" + monkeypatch.setattr( + burr_app, + "_run_subagent_as_app", + lambda **kwargs: "Error: LLM call failed after 4 attempts: timeout", + ) + + import pathlib + + out_file = pathlib.Path( + f"/tmp/{service_name}/service_analyses/broken.md" + ) + if out_file.exists(): + out_file.unlink() + try: + result = _run_component_analyzer( + component={ + "name": "broken", + "kind": "library", + "type": "python-package", + "root_path": "path/broken", + }, + service_name=service_name, + upstream_context="", + tracer=None, + ) + assert result.startswith("Error:") + assert not out_file.exists(), ( + f"error response should not be persisted, but {out_file} exists" + ) + finally: + if out_file.exists(): + out_file.unlink() + try: + out_file.parent.rmdir() + out_file.parent.parent.rmdir() + except OSError: + pass + + def test_write_failure_doesnt_kill_analyzer(self, monkeypatch, caplog): + """If disk write fails (permission error, whatever), the analysis + is still returned to the caller — just logged as an error.""" + monkeypatch.setattr( + burr_app, + "_run_subagent_as_app", + lambda **kwargs: "# Good analysis", + ) + + # Force the write step to raise by patching Path.write_text. + import pathlib + + real_write = pathlib.Path.write_text + + def failing_write(self, *args, **kwargs): + if "service_analyses" in str(self): + raise OSError("disk full") + return real_write(self, *args, **kwargs) + + monkeypatch.setattr(pathlib.Path, "write_text", failing_write) + + import logging + + with caplog.at_level(logging.ERROR, logger="agent.burr_app"): + result = _run_component_analyzer( + component={ + "name": "comp", + "kind": "library", + "type": "python-package", + "root_path": "path/comp", + }, + service_name="write-fail-svc", + upstream_context="", + tracer=None, + ) + + assert result == "# Good analysis" + assert any("Failed to persist analysis" in r.message for r in caplog.records)