diff --git a/agent/burr_app.py b/agent/burr_app.py index bcfe355..820c1c2 100644 --- a/agent/burr_app.py +++ b/agent/burr_app.py @@ -1206,7 +1206,7 @@ def respond(state: State) -> State: @action( - reads=["service_name"], + reads=["service_name", "component_filter"], writes=[ "components", "depth_order", @@ -1228,10 +1228,13 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State: - /tmp/{service_name}/dependency_graphs/analysis_order.json Populates state with components and depth ordering for analysis. + If component_filter is set, only includes those components (plus + transitive deps) in the depth order. """ from pathlib import Path service_name = state.get("service_name", "unknown") + component_filter = state.get("component_filter") work_dir = Path(f"/tmp/{service_name}") components_file = work_dir / "service_discovery" / "components.json" analysis_order_file = work_dir / "dependency_graphs" / "analysis_order.json" @@ -1266,6 +1269,20 @@ def read_discovery(state: State, __tracer: "TracerFactory") -> State: total_components=sum(len(level) for level in depth_order), ) + # Apply component filter: prune depth_order to only include filtered components + if component_filter: + filtered_depth_order = [] + for level in depth_order: + filtered_level = [name for name in level if name in component_filter] + if filtered_level: + filtered_depth_order.append(filtered_level) + depth_order = filtered_depth_order + logger.info( + "Component filter active: %d components across %d depth levels", + sum(len(l) for l in depth_order), + len(depth_order), + ) + # Build component lookup by name component_map = {c["name"]: c for c in components} @@ -1717,6 +1734,7 @@ def analysis_respond(state: State) -> State: def build_analysis_pipeline( service_name: str, project_name: str = "flashlight-analysis", + component_filter: set[str] | None = None, ) -> Application: """Build the analysis pipeline for headless codebase analysis. @@ -1732,6 +1750,8 @@ def build_analysis_pipeline( Args: service_name: Name of the service being analyzed (for /tmp/{service_name}/) project_name: Project name for Burr tracking UI + component_filter: If set, only these components will be analyzed. + The filter is passed to read_discovery which prunes the depth order. Returns: Compiled Burr Application @@ -1769,6 +1789,7 @@ def receive_analysis_input(state: State, task: str) -> State: service_name=service_name, synthesis_result="", final_response="", + component_filter=component_filter, ) .with_tracker(project=project_name) .build() diff --git a/agent/cli.py b/agent/cli.py index 9bdc4b3..1ed433d 100644 --- a/agent/cli.py +++ b/agent/cli.py @@ -14,6 +14,10 @@ # Incremental with auto-detected SHAs (reads manifest.json from output dir) flashlight --repo /path/to/repo --output ./artifacts/company/eigenda \ --head-sha def5678 + + # Targeted analysis (specific components only, requires existing artifacts) + flashlight --repo /path/to/repo --output ./artifacts/company/eigenda \ + --components coordinator,provider-api --head-sha def5678 """ import argparse @@ -236,11 +240,102 @@ def build_analysis_prompt( # --------------------------------------------------------------------------- +def resolve_component_filter( + component_names: str, + artifacts_dir: Path, +) -> set[str] | None: + """Resolve a comma-separated --components string to a set of names. + + Expands the given names to include their transitive dependencies so that + upstream context is available for each requested component. + + Returns None when no filter is requested (full analysis). + """ + if not component_names: + return None + + requested = {n.strip() for n in component_names.split(",") if n.strip()} + if not requested: + return None + + # Load graph to resolve transitive deps + graph_file = artifacts_dir / "dependency_graphs" / "graph.json" + components_file = artifacts_dir / "service_discovery" / "components.json" + + if not graph_file.exists() or not components_file.exists(): + logger.warning( + "--components requires existing artifacts (components.json + graph.json). " + "Falling back to full analysis." + ) + return None + + with open(components_file) as f: + components_data = json.load(f) + all_components = ( + components_data.get("components", []) + if isinstance(components_data, dict) + else components_data + ) + known_names = {c["name"] for c in all_components if "name" in c} + + # Validate requested names + unknown = requested - known_names + if unknown: + logger.warning( + "Unknown component(s) in --components: %s. Known: %s", + sorted(unknown), + sorted(known_names), + ) + requested &= known_names + if not requested: + logger.warning("No valid components remaining — running full analysis") + return None + + # Expand to include transitive dependencies via graph edges + try: + with open(graph_file) as f: + graph_data = json.load(f) + edges = graph_data.get("edges", []) + # Build adjacency: target -> sources (i.e. who depends on whom) + # An edge from A to B means A depends on B, so if we want A we also need B + dep_map: dict[str, set[str]] = {} + for edge in edges: + source = edge.get("source", "") + target = edge.get("target", "") + if source and target: + dep_map.setdefault(source, set()).add(target) + + # BFS to find all transitive deps of requested components + expanded = set(requested) + queue = list(requested) + while queue: + current = queue.pop(0) + for dep in dep_map.get(current, set()): + if dep not in expanded: + expanded.add(dep) + queue.append(dep) + + added_deps = expanded - requested + if added_deps: + logger.info( + "--components: added %d transitive dep(s): %s", + len(added_deps), + sorted(added_deps), + ) + + return expanded + + except (OSError, json.JSONDecodeError) as exc: + logger.warning("Could not load graph for dep resolution: %s", exc) + return requested + + def analyze( repo_path: str, output_dir: str, last_sha: str = "", head_sha: str = "", + component_filter: set[str] | None = None, ): """Run headless (non-interactive) codebase analysis. @@ -249,6 +344,8 @@ def analyze( output_dir: Where to write final artifacts. last_sha: Previous commit SHA (from manifest). Empty for full analysis. head_sha: Current commit SHA being analyzed. + component_filter: If set, only analyze these components (and their + transitive deps). Requires existing artifacts. Discovery is skipped. """ # Late imports for heavy deps from dotenv import load_dotenv @@ -272,6 +369,14 @@ def analyze( output = Path(output_dir).resolve() service_name = repo.name + # Guard against work_dir collision with repo path + # (e.g. repo at /tmp/d-inference → work_dir at /tmp/d-inference) + work_dir = Path(f"/tmp/{service_name}") + if work_dir.resolve() == repo or repo in work_dir.resolve().parents or work_dir.resolve() in repo.parents: + service_name = f"{repo.name}-flashlight" + work_dir = Path(f"/tmp/{service_name}") + logger.info("Adjusted service_name to %s to avoid work_dir collision", service_name) + if not repo.exists(): print(f"Error: repo path does not exist: {repo}", file=sys.stderr) sys.exit(1) @@ -303,7 +408,6 @@ def analyze( # Phase 0: Deterministic discovery (zero LLM calls) # --------------------------------------------------------------- print("\nRunning deterministic discovery...") - work_dir = Path(f"/tmp/{service_name}") work_dir.mkdir(parents=True, exist_ok=True) discovery_dir = work_dir / "service_discovery" graph_dir = work_dir / "dependency_graphs" @@ -314,75 +418,94 @@ def analyze( # repos stay cheap and the view always tracks the current clone. _setup_project_dir(repo, work_dir) - # Discover components from source - components = discover_components(repo, output_dir=discovery_dir) - print(f" Discovered {len(components)} components") + if component_filter: + # Skip discovery: copy existing artifacts from output_dir into work_dir + # so the analysis pipeline can read them at the expected paths. + print(f" --components filter active: {len(component_filter)} component(s)") + print(f" Skipping discovery (using existing artifacts)") + + for subdir in ("service_discovery", "dependency_graphs"): + src = output / subdir + dst = work_dir / subdir + if src.exists(): + if dst.exists(): + shutil.rmtree(dst) + shutil.copytree(src, dst) + else: + logger.warning("Missing %s in %s — analysis may fail", subdir, output) + else: + # Discover components from source + components = discover_components(repo, output_dir=discovery_dir) + print(f" Discovered {len(components)} components") - # Log component summary by kind - by_kind: dict[str, list[str]] = {} - for comp in components: - by_kind.setdefault(comp.kind.value, []).append(comp.name) - for kind, names in sorted(by_kind.items()): - print(f" {kind}: {', '.join(sorted(names)[:8])}", end="") + # Log component summary by kind + by_kind: dict[str, list[str]] = {} + for comp in components: + by_kind.setdefault(comp.kind.value, []).append(comp.name) + for kind, names in sorted(by_kind.items()): + print(f" {kind}: {', '.join(sorted(names)[:8])}", end="") if len(names) > 8: print(f" (+{len(names) - 8} more)") else: print() - # Validate discovery output - errors = validate_discovery(components, repo) - if errors: - print(f" Discovery warnings: {len(errors)}") - for err in errors[:5]: - print(f" - {err}") - - # Build unified knowledge graph (all components, not just libraries) - print("\n Building unified knowledge graph...") - graph_builder = KnowledgeGraphBuilder(components) - knowledge_graph = graph_builder.build( - source_repo=str(repo), - source_commit=head_sha, - ) - depth_order = graph_builder.get_analysis_order() - - # Validate graph - graph_errors = validate_graph(components, depth_order) - if graph_errors: - print(f" Graph warnings: {len(graph_errors)}") - - # Write graph - graph_dir.mkdir(parents=True, exist_ok=True) - - # Write unified graph.json - with open(graph_dir / "graph.json", "w") as f: - json.dump(knowledge_graph.to_dict(), f, indent=2) - - # Also write analysis_order.json for the lead agent - analysis_order = { - "depth_levels": depth_order, - "total_components": len(components), - "level_count": len(depth_order), - "by_kind": { - kind.value: sum(1 for c in components if c.kind == kind) - for kind in ComponentKind - if any(c.kind == kind for c in components) - }, - } - with open(graph_dir / "analysis_order.json", "w") as f: - json.dump(analysis_order, f, indent=2) + if not component_filter: + # Validate discovery output + errors = validate_discovery(components, repo) + if errors: + print(f" Discovery warnings: {len(errors)}") + for err in errors[:5]: + print(f" - {err}") + + # Build unified knowledge graph (all components, not just libraries) + print("\n Building unified knowledge graph...") + graph_builder = KnowledgeGraphBuilder(components) + knowledge_graph = graph_builder.build( + source_repo=str(repo), + source_commit=head_sha, + ) + depth_order = graph_builder.get_analysis_order() + + # Validate graph + graph_errors = validate_graph(components, depth_order) + if graph_errors: + print(f" Graph warnings: {len(graph_errors)}") + + # Write graph + graph_dir.mkdir(parents=True, exist_ok=True) + + # Write unified graph.json + with open(graph_dir / "graph.json", "w") as f: + json.dump(knowledge_graph.to_dict(), f, indent=2) + + # Also write analysis_order.json for the lead agent + analysis_order = { + "depth_levels": depth_order, + "total_components": len(components), + "level_count": len(depth_order), + "by_kind": { + kind.value: sum(1 for c in components if c.kind == kind) + for kind in ComponentKind + if any(c.kind == kind for c in components) + }, + } + with open(graph_dir / "analysis_order.json", "w") as f: + json.dump(analysis_order, f, indent=2) - print( - f" Knowledge graph: {len(components)} components, {len(knowledge_graph.edges)} edges" - ) - print(f" Analysis order: {len(depth_order)} depth levels") - for i, level in enumerate(depth_order): - by_kind = {} - for name in level: - comp = knowledge_graph.components.get(name) - if comp: - by_kind.setdefault(comp.kind.value, []).append(name) - kind_summary = ", ".join(f"{k}:{len(v)}" for k, v in sorted(by_kind.items())) - print(f" depth {i}: {len(level)} components ({kind_summary})") + print( + f" Knowledge graph: {len(components)} components, {len(knowledge_graph.edges)} edges" + ) + print(f" Analysis order: {len(depth_order)} depth levels") + for i, level in enumerate(depth_order): + by_kind = {} + for name in level: + comp = knowledge_graph.components.get(name) + if comp: + by_kind.setdefault(comp.kind.value, []).append(name) + kind_summary = ", ".join(f"{k}:{len(v)}" for k, v in sorted(by_kind.items())) + print(f" depth {i}: {len(level)} components ({kind_summary})") + else: + print(f" Reusing existing graph from {output}") print() @@ -395,6 +518,7 @@ def analyze( app = build_analysis_pipeline( service_name=service_name, project_name=f"flashlight-{service_name}", + component_filter=component_filter, ) print(f"\nStarting {mode_label} analysis of {service_name}...") @@ -671,6 +795,16 @@ def main(): action="store_true", help="Enable verbose logging", ) + parser.add_argument( + "--components", + default="", + help=( + "Comma-separated list of component names to analyze. " + "Only these components (and their transitive dependencies) " + "will be analyzed; all others are skipped. " + "Requires existing artifacts with components.json." + ), + ) parser.add_argument( "--debug", action="store_true", @@ -706,11 +840,16 @@ def main(): clone_dir = Path("/tmp/flashlight-repos") repo_path = str(clone_repo(repo_path, clone_dir)) + # Resolve --components filter (expands transitive deps) + output_dir = Path(args.output) + component_filter = resolve_component_filter(args.components, output_dir) + analyze( repo_path=repo_path, output_dir=args.output, last_sha=args.last_sha, head_sha=args.head_sha, + component_filter=component_filter, )