diff --git a/comfy_cli/cmdline.py b/comfy_cli/cmdline.py index 38910190..659440d9 100644 --- a/comfy_cli/cmdline.py +++ b/comfy_cli/cmdline.py @@ -423,10 +423,25 @@ def update( rprint(f"[yellow]Failed to update node id cache: {e}[/yellow]") -@app.command(help="Run API workflow file using the ComfyUI launched by `comfy launch --background`") +@app.command( + help=( + "Run a workflow on the ComfyUI launched by `comfy launch --background`. " + "Accepts both ComfyUI API format and exported UI workflow JSON; " + "UI workflows are converted to API format client-side via /object_info." + ) +) @tracking.track_command() def run( - workflow: Annotated[str, typer.Option(help="Path to the workflow API json file.")], + workflow: Annotated[ + str, + typer.Option( + help=( + "Path to the workflow JSON file. Both ComfyUI API format and " + "exported UI format are accepted; UI workflows are converted " + "to API format client-side." + ) + ), + ], wait: Annotated[ bool, typer.Option(help="If the command should wait until execution completes."), @@ -444,9 +459,17 @@ def run( typer.Option(help="The port where the ComfyUI instance is running, e.g. 8188."), ] = None, timeout: Annotated[ - int | None, - typer.Option(help="The timeout in seconds for the workflow execution."), - ] = 30, + int, + typer.Option( + help=( + "Per-event timeout in seconds: bails out if the server is silent " + "for this long. Also caps HTTP connect, /prompt POST, and websocket " + "handshake. NOT a wall-clock execution deadline — a workflow that " + "streams progress events faster than the timeout can run " + "indefinitely." + ), + ), + ] = 120, api_key: Annotated[ str | None, typer.Option( @@ -454,12 +477,42 @@ def run( envvar="COMFY_API_KEY", help=( "Comfy API key for API Nodes (Partner Nodes). " - "Embedded in the prompt body as extra_data.api_key_comfy_org on POST /prompt. " + "Embedded in the POST /prompt request body as extra_data.api_key_comfy_org. " "For scripting, prefer the COMFY_API_KEY environment variable so the secret " "stays out of shell history." ), ), ] = None, + json_output: Annotated[ + bool, + typer.Option( + "--json", + help=( + "Emit NDJSON events to stdout instead of human-readable output. " + "One JSON object per line, terminated by \\n. See docs/json-output.md " + "for the event reference and stability contract. In this mode, " + "--verbose has no effect and Rich progress is suppressed. " + "Workflow input accepts both API and UI format JSON (UI input " + "triggers a `converted` event before `queued`). The converted " + "workflow graph is always emitted as a `prompt_preview` event " + "before `queued`, so agents have a full audit trail of what " + "the CLI submitted." + ), + ), + ] = False, + print_prompt: Annotated[ + bool, + typer.Option( + "--print-prompt", + help=( + "Print the API-format workflow graph that WOULD be sent to /prompt and exit. " + "Does not POST and does not execute. For UI-format input the workflow is " + "converted first (requires a reachable ComfyUI for /object_info); API input " + "is printed as-is with no server hit. In --json mode emits a `prompt_preview` " + "event; otherwise pretty-prints to stdout." + ), + ), + ] = False, ): if api_key: api_key = api_key.strip() or None @@ -472,22 +525,29 @@ def run( if not port and len(s) == 2: port = int(s[1]) - local_paths = False if config.background: + bg_host, bg_port = config.background[0], config.background[1] if not host: - host = config.background[0] - local_paths = True - if port: - local_paths = False - else: - port = config.background[1] + host = bg_host + if not port: + port = bg_port if not host: host = "127.0.0.1" if not port: port = 8188 - run_inner.execute(workflow, host, port, wait, verbose, local_paths, timeout, api_key=api_key) + run_inner.execute( + workflow, + host, + port, + wait, + verbose, + timeout, + api_key=api_key, + json_mode=json_output, + print_prompt=print_prompt, + ) def validate_comfyui(_env_checker): diff --git a/comfy_cli/command/run.py b/comfy_cli/command/run.py index f8b46857..f74810e3 100644 --- a/comfy_cli/command/run.py +++ b/comfy_cli/command/run.py @@ -19,6 +19,29 @@ workspace_manager = WorkspaceManager() +# JSON output schema version. Bumped only for breaking changes per docs/json-output.md. +SCHEMA_VERSION = 1 + +# Maximum bytes of a server response body we surface to the user (or +# embed in a `failed.error.body` field). Anything longer is truncated. +_MAX_BODY_PREVIEW = 500 + + +def _node_errors_to_list(node_errors) -> list[dict]: + """Transform ComfyUI's dict-keyed `node_errors` payload into a list of self-contained records. + Each record carries `node_id` as a field, so agents can iterate the result + directly without indirecting through dict keys.""" + if not isinstance(node_errors, dict): + return [] + result = [] + for node_id, record in node_errors.items(): + if not isinstance(record, dict): + continue + entry = {"node_id": str(node_id)} + entry.update(record) + result.append(entry) + return result + def is_ui_workflow(workflow) -> bool: return ( @@ -28,40 +51,285 @@ def is_ui_workflow(workflow) -> bool: ) -def _validate_api_workflow(workflow): - """Return the workflow dict if it has the shape of API format, else None.""" - if not isinstance(workflow, dict) or not workflow: - return None - node = workflow[next(iter(workflow))] +def _classify_api_workflow(workflow): + """Classify a parsed JSON object as API workflow / empty / invalid. + + Returns one of: + ("ok", workflow_dict) — well-formed API workflow with ≥1 node + ("empty", None) — empty dict (caller routes to workflow_empty) + ("invalid", None) — not a dict, or first node lacks class_type + """ + if not isinstance(workflow, dict): + return ("invalid", None) + if not workflow: + return ("empty", None) + first_key = next(iter(workflow)) + node = workflow[first_key] if not isinstance(node, dict) or "class_type" not in node: - return None - return workflow + return ("invalid", None) + return ("ok", workflow) + + +class JsonEmitter: + """NDJSON event emitter for ``comfy run --json``. + + Every ``emit_*`` method is a no-op when ``json_mode=False``, so the + same call sites work for both modes. See ``docs/json-output.md``. + """ + + def __init__(self, json_mode: bool): + self.json_mode = json_mode + self.start_time = time.monotonic() + self.client_id: str | None = None + self.prompt_id: str | None = None + self.workflow: dict | None = None + self.cached_node_ids: list[str] = [] + self.executed_node_ids: list[str] = [] + self.outputs: list[dict] = [] + + def set_workflow(self, workflow): + self.workflow = workflow + + def set_client_id(self, client_id): + self.client_id = client_id + + def _elapsed(self) -> float: + return time.monotonic() - self.start_time + + def get_title(self, node_id): + if not isinstance(self.workflow, dict): + return str(node_id) + node = self.workflow.get(node_id) + if not isinstance(node, dict): + return str(node_id) + meta = node.get("_meta") + if isinstance(meta, dict): + title = meta.get("title") + if isinstance(title, str) and title: + return title + class_type = node.get("class_type") + return class_type if isinstance(class_type, str) and class_type else str(node_id) + + def get_class_type(self, node_id): + if not isinstance(self.workflow, dict): + return "" + node = self.workflow.get(node_id) + if not isinstance(node, dict): + return "" + return node.get("class_type", "") + + def _emit(self, event: dict) -> None: + if not self.json_mode: + return + line = json.dumps(event, ensure_ascii=True) + print(line, flush=True) + + def emit_converted(self, node_count: int) -> None: + self._emit( + { + "event": "converted", + "schema_version": SCHEMA_VERSION, + "node_count": node_count, + } + ) + + def workflow_manifest(self) -> list[dict]: + """Build the `nodes` array for the `queued` event — one entry per + node in the submitted (post-conversion) workflow.""" + if not isinstance(self.workflow, dict): + return [] + manifest: list[dict] = [] + for node_id, node in self.workflow.items(): + if not isinstance(node, dict): + continue + class_type = node.get("class_type", "") + class_type = class_type if isinstance(class_type, str) else "" + manifest.append( + { + "node_id": str(node_id), + "class_type": class_type, + "title": self.get_title(node_id), + } + ) + return manifest + + def emit_prompt_preview(self, prompt: dict) -> None: + self._emit( + { + "event": "prompt_preview", + "schema_version": SCHEMA_VERSION, + "prompt": prompt, + } + ) + + def emit_queued(self, prompt_id: str, validation_warnings: list[dict]) -> None: + self.prompt_id = prompt_id + self._emit( + { + "event": "queued", + "schema_version": SCHEMA_VERSION, + "prompt_id": prompt_id, + "client_id": self.client_id, + "validation_warnings": validation_warnings, + "nodes": self.workflow_manifest(), + } + ) + + def emit_node_cached(self, node_id) -> None: + node_id = str(node_id) + self.cached_node_ids.append(node_id) + self._emit( + { + "event": "node_cached", + "schema_version": SCHEMA_VERSION, + "node_id": node_id, + "class_type": self.get_class_type(node_id), + "title": self.get_title(node_id), + } + ) + + def emit_node_executing(self, node_id) -> None: + node_id = str(node_id) + # `executed_node_ids` aggregates everything the executor touched — + # including intermediate nodes that never fire a server-side `executed` WS event. + if node_id not in self.executed_node_ids: + self.executed_node_ids.append(node_id) + self._emit( + { + "event": "node_executing", + "schema_version": SCHEMA_VERSION, + "node_id": node_id, + "class_type": self.get_class_type(node_id), + "title": self.get_title(node_id), + } + ) + + def emit_node_progress(self, node_id, value, max_val) -> None: + node_id = str(node_id) + self._emit( + { + "event": "node_progress", + "schema_version": SCHEMA_VERSION, + "node_id": node_id, + "class_type": self.get_class_type(node_id), + "title": self.get_title(node_id), + "value": value, + "max": max_val, + } + ) + + def emit_node_executed(self, node_id, outputs: list[dict]) -> None: + node_id = str(node_id) + if node_id not in self.executed_node_ids: + self.executed_node_ids.append(node_id) + self.outputs.extend(outputs) + self._emit( + { + "event": "node_executed", + "schema_version": SCHEMA_VERSION, + "node_id": node_id, + "class_type": self.get_class_type(node_id), + "title": self.get_title(node_id), + "outputs": outputs, + } + ) + + def emit_completed(self) -> None: + self._emit( + { + "event": "completed", + "schema_version": SCHEMA_VERSION, + "prompt_id": self.prompt_id, + "client_id": self.client_id, + "elapsed_seconds": self._elapsed(), + "outputs": self.outputs, + "cached_node_ids": self.cached_node_ids, + "executed_node_ids": self.executed_node_ids, + } + ) + + def fail(self, kind: str, message: str, *, rich_message: str | None = None, **extras) -> typer.Exit: + """Emit a `failed` event (in JSON mode) or print a red text message + (otherwise), then return the `typer.Exit(code=1)` for the caller to + raise. Returning rather than raising keeps `raise ... from e` + chaining clean at call sites. `rich_message` overrides `message` + for the human-readable text only — it is auto-wrapped in + `[bold red]...[/bold red]`. Sites that need multi-colour Rich + markup should emit the failure event explicitly.""" + self.emit_failed(kind, message, **extras) + if not self.json_mode: + pprint(f"[bold red]{rich_message if rich_message is not None else message}[/bold red]") + return typer.Exit(code=1) + + def emit_failed(self, kind: str, message: str, **extras) -> None: + error = {"kind": kind, "message": message} + error.update(extras) + self._emit( + { + "event": "failed", + "schema_version": SCHEMA_VERSION, + "prompt_id": self.prompt_id, + "client_id": self.client_id, + "elapsed_seconds": self._elapsed(), + "error": error, + } + ) -def fetch_object_info(host: str, port: int, timeout: int) -> dict: +def fetch_object_info(host, port, timeout, emitter=None): """GET ``/object_info`` from the running ComfyUI server. The response describes every loaded node class's input schema and is what the converter uses to map widget values to input names, fill defaults, etc. + + In JSON mode, failures emit a structured ``failed`` event via ``emitter``. + Either way, a ``typer.Exit(code=1)`` is raised. """ url = f"http://{host}:{port}/object_info" + json_mode = bool(emitter and emitter.json_mode) try: with request.urlopen(url, timeout=timeout) as resp: body = resp.read() except urllib.error.HTTPError as e: - body = e.read().decode("utf-8", errors="replace").strip() - pprint(f"[bold red]Failed to fetch /object_info (HTTP {e.code}): {body[:500]}[/bold red]") + body_text = e.read().decode("utf-8", errors="replace").strip() + if json_mode: + emitter.emit_failed( + "object_info_unavailable", + f"Failed to fetch /object_info (HTTP {e.code})", + status_code=e.code, + body=body_text[:_MAX_BODY_PREVIEW], + ) + else: + pprint( + f"[bold red]Failed to fetch /object_info (HTTP {e.code}): {body_text[:_MAX_BODY_PREVIEW]}[/bold red]" + ) raise typer.Exit(code=1) from e except urllib.error.URLError as e: - pprint(f"[bold red]Failed to fetch /object_info: {e.reason}[/bold red]") + msg = f"Failed to fetch /object_info from {host}:{port}: {e.reason} (override with --host / --port)" + if json_mode: + emitter.emit_failed("connection_error", msg) + else: + pprint(f"[bold red]{msg}[/bold red]") raise typer.Exit(code=1) from e except TimeoutError as e: - pprint(f"[bold red]Failed to fetch /object_info: timed out after {timeout}s[/bold red]") + msg = f"Failed to fetch /object_info from {host}:{port}: timed out after {timeout}s (override with --host / --port)" + if json_mode: + emitter.emit_failed("connection_error", msg) + else: + pprint(f"[bold red]{msg}[/bold red]") raise typer.Exit(code=1) from e try: return json.loads(body) except json.JSONDecodeError as e: - pprint("[bold red]Failed to fetch /object_info: server returned invalid JSON[/bold red]") + if json_mode: + emitter.emit_failed( + "object_info_unavailable", + "Server returned invalid JSON for /object_info", + status_code=200, + body=body.decode("utf-8", errors="replace")[:_MAX_BODY_PREVIEW], + ) + else: + pprint("[bold red]Failed to fetch /object_info: server returned invalid JSON[/bold red]") raise typer.Exit(code=1) from e @@ -71,73 +339,133 @@ def execute( port, wait=True, verbose=False, - local_paths=False, - timeout=30, + timeout=120, api_key: str | None = None, + json_mode: bool = False, + print_prompt: bool = False, ): + # `0.0.0.0` is a wildcard bind, not a connect address. macOS / Windows + # clients can't reach it; on Linux it happens to resolve to a loopback. + # Substitute the canonical loopback so every downstream use (server + # probe, /prompt POST, emitted /view URLs) is portable. + if host == "0.0.0.0": + host = "127.0.0.1" + + emitter = JsonEmitter(json_mode=json_mode) workflow_name = os.path.abspath(os.path.expanduser(workflow)) - if not os.path.isfile(workflow): - pprint( - f"[bold red]Specified workflow file not found: {workflow}[/bold red]", - file=sys.stderr, - ) - raise typer.Exit(code=1) - if not check_comfy_server_running(port, host): - pprint(f"[bold red]ComfyUI not running on specified address ({host}:{port})[/bold red]") + if not os.path.isfile(workflow_name): + if json_mode: + emitter.emit_failed("workflow_not_found", f"Workflow file not found: {workflow_name}") + else: + pprint( + f"[bold red]Specified workflow file not found: {workflow_name}[/bold red]", + file=sys.stderr, + ) raise typer.Exit(code=1) + # Under --print-prompt we skip this pre-flight probe. API-format input + # makes no server calls downstream so it works fully offline; UI-format + # input still needs /object_info for the converter, but if it's + # unreachable, fetch_object_info() surfaces the same connection_error + # kind a few lines later. + if not print_prompt and not check_comfy_server_running(port, host, timeout=timeout): + raise emitter.fail( + "connection_error", + f"ComfyUI not running at {host}:{port} (override with --host / --port)", + ) + try: with open(workflow_name, encoding="utf-8") as f: raw_workflow = json.load(f) - except OSError as e: - pprint(f"[bold red]Unable to read workflow file: {e}[/bold red]") - raise typer.Exit(code=1) from e + except (OSError, UnicodeDecodeError) as e: + raise emitter.fail("workflow_read_error", f"Unable to read workflow file: {e}") from e except json.JSONDecodeError as e: - pprint(f"[bold red]Specified workflow file is not valid JSON: {e}[/bold red]") - raise typer.Exit(code=1) from e + raise emitter.fail( + "workflow_invalid_json", + f"Specified workflow file is not valid JSON: {e}", + ) from e if is_ui_workflow(raw_workflow): - pprint("[yellow]Detected UI-format workflow, converting to API format...[/yellow]") - object_info = fetch_object_info(host, port, timeout) + if not json_mode: + pprint("[yellow]Detected UI-format workflow, converting to API format...[/yellow]") + object_info = fetch_object_info(host, port, timeout, emitter=emitter) try: workflow = convert_ui_to_api(raw_workflow, object_info) except WorkflowConversionError as e: - pprint(f"[bold red]Workflow conversion failed: {e}[/bold red]") - raise typer.Exit(code=1) from e + raise emitter.fail("conversion_error", f"Workflow conversion failed: {e}") from e except Exception as e: - # The converter is experimental; an unexpected crash here is a bug - # in our code, not user error. Show a clean message and a pointer. - pprint( - f"[bold red]Workflow conversion crashed unexpectedly: {type(e).__name__}: {e}[/bold red]\n" - "[yellow]The UI-to-API converter is experimental. Please report this at[/yellow]\n" - "[yellow] https://github.com/Comfy-Org/comfy-cli/issues[/yellow]\n" - "[yellow]and attach the workflow file if possible.[/yellow]" - ) - if verbose: - import traceback - - traceback.print_exc() + if json_mode: + emitter.emit_failed( + "conversion_crash", + f"Workflow conversion crashed unexpectedly: {type(e).__name__}: {e}", + exception_type=type(e).__name__, + ) + else: + pprint( + f"[bold red]Workflow conversion crashed unexpectedly: {type(e).__name__}: {e}[/bold red]\n" + "[yellow]The UI-to-API converter is experimental. Please report this at[/yellow]\n" + "[yellow] https://github.com/Comfy-Org/comfy-cli/issues[/yellow]\n" + "[yellow]and attach the workflow file if possible.[/yellow]" + ) + if verbose: + import traceback as _tb + + _tb.print_exc() raise typer.Exit(code=1) from e if not workflow: - pprint("[bold red]Workflow conversion produced no executable nodes[/bold red]") - raise typer.Exit(code=1) + raise emitter.fail("workflow_empty", "Workflow conversion produced no executable nodes") + emitter.set_workflow(workflow) + if json_mode: + emitter.emit_converted(len(workflow)) else: - workflow = _validate_api_workflow(raw_workflow) - if not workflow: - pprint("[bold red]Specified workflow does not appear to be an API workflow json file[/bold red]") - raise typer.Exit(code=1) + kind, validated = _classify_api_workflow(raw_workflow) + if kind == "empty": + raise emitter.fail( + "workflow_empty", + "API workflow contains no nodes", + rich_message="Specified API workflow has no nodes", + ) + if kind == "invalid": + raise emitter.fail( + "workflow_format_invalid", + "Workflow file is neither a ComfyUI API workflow nor an exported UI workflow", + rich_message=("Specified workflow is neither a ComfyUI API workflow nor an exported UI workflow"), + ) + workflow = validated + emitter.set_workflow(workflow) + + # In JSON mode, always emit the converted workflow graph so agents have + # a complete audit trail of what the CLI is about to submit. The event + # is non-terminal in normal flow and terminal under --print-prompt. + if json_mode: + emitter.emit_prompt_preview(workflow) + + if print_prompt: + if not json_mode: + print(json.dumps(workflow, indent=2, ensure_ascii=False)) + return progress = None start = time.time() - if wait: + if wait and not json_mode: pprint(f"Executing workflow: {workflow_name}") progress = ExecutionProgress() progress.start() - else: + elif not wait and not json_mode: print(f"Queuing workflow: {workflow_name}") - execution = WorkflowExecution(workflow, host, port, verbose, progress, local_paths, timeout, api_key=api_key) + execution = WorkflowExecution( + workflow, + host, + port, + verbose, + progress, + timeout, + api_key=api_key, + emitter=emitter, + ) + emitter.set_client_id(execution.client_id) try: if wait: @@ -146,30 +474,46 @@ def execute( if wait: execution.watch_execution() end = time.time() - progress.stop() - progress = None - - if len(execution.outputs) > 0: - pprint("[bold green]\nOutputs:[/bold green]") + if progress is not None: + progress.stop() + progress = None - for f in execution.outputs: - pprint(f) - - elapsed = timedelta(seconds=end - start) - pprint(f"[bold green]\nWorkflow execution completed ({elapsed})[/bold green]") + if json_mode: + emitter.emit_completed() + else: + if len(execution.outputs) > 0: + pprint("[bold green]\nOutputs:[/bold green]") + for f in execution.outputs: + pprint(f) + elapsed = timedelta(seconds=end - start) + pprint(f"[bold green]\nWorkflow execution completed ({elapsed})[/bold green]") else: - pprint("[bold green]Workflow queued[/bold green]") + # --no-wait: queued was already emitted by execution.queue(). + if not json_mode: + pprint("[bold green]Workflow queued[/bold green]") except WebSocketTimeoutException: - pprint( - f"[bold red]Error: WebSocket timed out after {timeout}s waiting for server response.[/bold red]\n" - "[yellow]For long-running workflows, increase the timeout: comfy run --workflow --timeout 300[/yellow]" - ) + # Not migrated to emitter.fail(): the text-mode message combines + # a red error line and a yellow remediation hint, which the + # single-colour auto-wrap in fail() can't express. + msg = f"WebSocket timed out after {timeout}s waiting for server response" + if json_mode: + emitter.emit_failed("timeout", msg, timeout_seconds=float(timeout)) + else: + pprint( + f"[bold red]Error: {msg}.[/bold red]\n" + "[yellow]For long-running workflows, increase the timeout: comfy run --workflow --timeout 300[/yellow]" + ) raise typer.Exit(code=1) except (WebSocketException, ConnectionError, OSError) as e: - pprint(f"[bold red]Error: Lost connection to ComfyUI server: {e}[/bold red]") - raise typer.Exit(code=1) + raise emitter.fail( + "connection_lost", + f"Lost connection to ComfyUI server: {e}", + rich_message=f"Error: Lost connection to ComfyUI server: {e}", + ) + except KeyboardInterrupt: + raise emitter.fail("execution_interrupted", "Interrupted by user") from None finally: - if progress: + if progress is not None: progress.stop() @@ -191,18 +535,27 @@ def get_renderables(self): class WorkflowExecution: - def __init__(self, workflow, host, port, verbose, progress, local_paths, timeout=30, api_key: str | None = None): + def __init__( + self, + workflow, + host, + port, + verbose, + progress, + timeout=120, + api_key: str | None = None, + emitter: JsonEmitter | None = None, + ): self.workflow = workflow self.host = host self.port = port self.verbose = verbose - self.local_paths = local_paths self.client_id = str(uuid.uuid4()) - self.outputs = [] + self.outputs: list = [] self.progress = progress self.remaining_nodes = set(self.workflow.keys()) self.total_nodes = len(self.remaining_nodes) - if progress: + if progress is not None: self.overall_task = self.progress.add_task("", total=self.total_nodes, progress_type="overall") self.current_node = None self.progress_task = None @@ -211,10 +564,18 @@ def __init__(self, workflow, host, port, verbose, progress, local_paths, timeout self.ws = None self.timeout = timeout self.api_key = api_key + # Default to a no-op emitter so internal call sites don't need to + # branch on whether json mode is active. + self.emitter = emitter if emitter is not None else JsonEmitter(json_mode=False) def connect(self): self.ws = WebSocket() - self.ws.connect(f"ws://{self.host}:{self.port}/ws?clientId={self.client_id}") + # Timeout on the handshake too: a server busy loading a model + # can otherwise leave the CLI hung with no terminal event. + self.ws.connect( + f"ws://{self.host}:{self.port}/ws?clientId={self.client_id}", + timeout=self.timeout, + ) def queue(self): data: dict = {"prompt": self.workflow, "client_id": self.client_id} @@ -225,56 +586,163 @@ def queue(self): json.dumps(data).encode("utf-8"), ) try: - resp = request.urlopen(req) - body = json.loads(resp.read()) - - self.prompt_id = body["prompt_id"] + resp = request.urlopen(req, timeout=self.timeout) + raw_body = resp.read() except urllib.error.HTTPError as e: - message = "An unknown error occurred" - if e.status == 500: - # This is normally just the generic internal server error - message = e.read().decode() - elif e.status == 400: - # Bad Request - workflow failed validation on the server - body = json.loads(e.read()) - if body["node_errors"].keys(): - message = json.dumps(body["node_errors"], indent=2) + self._handle_submit_http_error(e) + raise typer.Exit(code=1) from e + except urllib.error.URLError as e: + self._stop_progress() + raise self.emitter.fail( + "connection_error", + f"Cannot reach server at {self.host}:{self.port}: {e.reason}", + ) from e + except TimeoutError as e: + self._stop_progress() + raise self.emitter.fail( + "connection_error", + f"Connection to {self.host}:{self.port} timed out: {e}", + ) from e + except OSError as e: + self._stop_progress() + raise self.emitter.fail( + "connection_error", + f"Network error contacting {self.host}:{self.port}: {e}", + ) from e + + try: + body = json.loads(raw_body) if raw_body else None + except (json.JSONDecodeError, UnicodeDecodeError) as e: + self._stop_progress() + body_str = raw_body.decode("utf-8", errors="replace")[:_MAX_BODY_PREVIEW] + raise self.emitter.fail( + "invalid_response", + "Server returned HTTP 200 with unparseable body", + rich_message=f"Server returned HTTP 200 with unparseable body: {body_str}", + status_code=200, + body=body_str, + ) from e + + prompt_id = body.get("prompt_id") if isinstance(body, dict) else None + if not isinstance(prompt_id, str) or not prompt_id: + self._stop_progress() + body_str = json.dumps(body)[:_MAX_BODY_PREVIEW] if body is not None else "" + raise self.emitter.fail( + "invalid_response", + "Server returned HTTP 200 without a prompt_id", + rich_message=f"Server returned HTTP 200 without a prompt_id: {body_str}", + status_code=200, + body=body_str, + ) - self.progress.stop() + self.prompt_id = prompt_id + + # 200 may still carry node_errors if some output chains failed + # validation but others passed — surface as warnings, not a failure. + node_errors = body.get("node_errors") if isinstance(body, dict) else None + validation_warnings = _node_errors_to_list(node_errors) + + if self.emitter.json_mode: + self.emitter.emit_queued(prompt_id, validation_warnings) + + def _handle_submit_http_error(self, e: urllib.error.HTTPError) -> None: + raw = b"" + try: + raw = e.read() + except Exception: + pass + try: + body = json.loads(raw) if raw else None + except (json.JSONDecodeError, UnicodeDecodeError): + body = None + body_str = (raw or b"").decode("utf-8", errors="replace") + self._stop_progress() + + code = e.code + if code == 400 and isinstance(body, dict) and isinstance(body.get("node_errors"), dict) and body["node_errors"]: + self._emit_validation_error(body["node_errors"]) + return + if 400 <= code < 500: + kind = "client_error" + elif 500 <= code < 600: + kind = "server_error" + else: + kind = "client_error" + + if self.emitter.json_mode: + self.emitter.emit_failed( + kind, + f"Server returned HTTP {code}", + status_code=code, + body=body_str[:_MAX_BODY_PREVIEW], + ) + else: + if code == 500: + pprint(f"[bold red]Error running workflow\n{body_str}[/bold red]") + elif code == 400 and isinstance(body, dict): + pprint(f"[bold red]Error running workflow\n{json.dumps(body, indent=2)}[/bold red]") + else: + pprint(f"[bold red]Error running workflow (HTTP {code})\n{body_str[:_MAX_BODY_PREVIEW]}[/bold red]") + + def _emit_validation_error(self, node_errors: dict) -> None: + if self.emitter.json_mode: + message = "Workflow failed validation" + try: + first_node = next(iter(node_errors.values())) + errs = first_node.get("errors") if isinstance(first_node, dict) else None + if isinstance(errs, list) and errs: + first = errs[0] + if isinstance(first, dict) and isinstance(first.get("message"), str): + message = first["message"] + except StopIteration: + pass + self.emitter.emit_failed( + "validation_error", + message, + node_errors=_node_errors_to_list(node_errors), + ) + else: + pprint(f"[bold red]Error running workflow\n{json.dumps(node_errors, indent=2)}[/bold red]") - pprint(f"[bold red]Error running workflow\n{message}[/bold red]") - raise typer.Exit(code=1) + def _stop_progress(self) -> None: + if self.progress is not None: + try: + self.progress.stop() + except Exception: + pass def watch_execution(self): self.ws.settimeout(self.timeout) while True: message = self.ws.recv() - if isinstance(message, str): - message = json.loads(message) - if not self.on_message(message): - break + if not isinstance(message, str): + continue + try: + parsed = json.loads(message) + except json.JSONDecodeError: + # Tolerate malformed frames from misbehaving proxies. + continue + if not self.on_message(parsed): + break def update_overall_progress(self): + if self.progress is None: + return self.progress.update(self.overall_task, completed=self.total_nodes - len(self.remaining_nodes)) - def get_node_title(self, node_id): - node = self.workflow.get(node_id) - if node is None: - return str(node_id) - if "_meta" in node and "title" in node["_meta"]: - return node["_meta"]["title"] - return node["class_type"] - def log_node(self, type, node_id): if not self.verbose: return + if self.emitter.json_mode: + # --verbose is a no-op in JSON mode; Rich output would corrupt the stream. + return node = self.workflow.get(node_id) if node is None: pprint(f"{type} : [bright_black]({node_id})[/]") return class_type = node["class_type"] - title = self.get_node_title(node_id) + title = self.emitter.get_title(node_id) if title != class_type: title += f"[bright_black] - {class_type}[/]" @@ -283,87 +751,205 @@ def log_node(self, type, node_id): pprint(f"{type} : {title}") def format_image_path(self, img): + """Build a single human-readable path string for the legacy text + output. Prefers a clickable absolute filesystem path when the + host is a known loopback, the workspace resolves, the path stays + inside the workspace's per-type output dir, and the file exists + on disk. Otherwise falls back to a /view URL.""" filename = img["filename"] - subfolder = img["subfolder"] if "subfolder" in img else None - output_type = img["type"] or "output" - - if self.local_paths: - if subfolder: - filename = os.path.join(subfolder, filename) - - return os.path.join(workspace_manager.get_workspace_path()[0], output_type, filename) - - query = urllib.parse.urlencode(img) - return f"http://{self.host}:{self.port}/view?{query}" + subfolder = img.get("subfolder") or "" + output_type = img.get("type") or "output" + + if self.host in ("127.0.0.1", "localhost", "::1", "[::1]"): + ws_path = self._text_mode_workspace_path() + if ws_path: + parts = [subfolder, filename] if subfolder else [filename] + type_root = os.path.normpath(os.path.join(ws_path, output_type)) + candidate = os.path.normpath(os.path.join(type_root, *parts)) + if (candidate == type_root or candidate.startswith(type_root + os.sep)) and os.path.isfile(candidate): + return candidate + + return self._view_url(filename, subfolder, output_type) + + def _view_url(self, filename: str, subfolder: str, file_type: str) -> str: + params = {"filename": filename, "subfolder": subfolder, "type": file_type} + return f"http://{self.host}:{self.port}/view?{urllib.parse.urlencode(params)}" + + def _text_mode_workspace_path(self) -> str | None: + # workspace_manager.get_workspace_path() can print a warning and + # write config on the stale-recent path. Memoize so a workflow + # with N outputs doesn't repeat the side effects N times. + if not hasattr(self, "_ws_path_cached"): + try: + self._ws_path_cached = workspace_manager.get_workspace_path()[0] + except Exception: + self._ws_path_cached = None + return self._ws_path_cached + + def _build_output_object(self, node_id, category, item) -> dict: + """Construct a structured Output dict for the JSON contract.""" + node_id = str(node_id) + filename = item["filename"] + subfolder = item.get("subfolder") or "" + file_type = item.get("type") or "output" + + return { + "category": category, + "node_id": node_id, + "class_type": self.emitter.get_class_type(node_id), + "title": self.emitter.get_title(node_id), + "filename": filename, + "subfolder": subfolder, + "type": file_type, + "url": self._view_url(filename, subfolder, file_type), + } def on_message(self, message): - data = message["data"] if "data" in message else {} - # Skip any messages that aren't about our prompt - if "prompt_id" not in data or data["prompt_id"] != self.prompt_id: + # Defensive: a malformed (non-object) JSON frame from the server + # must not raise out of the recv loop — that would tear down the + # run without a terminal `failed` event and break the contract. + if not isinstance(message, dict): + return True + data = message.get("data") + if not isinstance(data, dict): + return True + if data.get("prompt_id") != self.prompt_id: return True - if message["type"] == "executing": + msg_type = message.get("type") + if msg_type == "executing": return self.on_executing(data) - elif message["type"] == "execution_cached": + elif msg_type == "execution_cached": self.on_cached(data) - elif message["type"] == "progress": + elif msg_type == "progress": self.on_progress(data) - elif message["type"] == "executed": + elif msg_type == "executed": self.on_executed(data) - elif message["type"] == "execution_error": + elif msg_type == "execution_error": self.on_error(data) + elif msg_type == "execution_interrupted": + self.on_interrupted(data) return True def on_executing(self, data): - if self.progress_task: + if self.progress_task is not None and self.progress is not None: self.progress.remove_task(self.progress_task) self.progress_task = None + # `node: null` is the documented "execution done" signal. A + # missing key is a protocol violation — skip the frame and keep + # listening rather than prematurely terminating. + if "node" not in data: + return True if data["node"] is None: return False - else: - if self.current_node: - self.remaining_nodes.discard(self.current_node) - self.update_overall_progress() - self.current_node = data["node"] - self.log_node("Executing", data["node"]) + + node_id = data["node"] + if self.current_node: + self.remaining_nodes.discard(self.current_node) + self.update_overall_progress() + self.current_node = node_id + self.log_node("Executing", node_id) + if self.emitter.json_mode: + self.emitter.emit_node_executing(node_id) return True def on_cached(self, data): - nodes = data["nodes"] + nodes = data.get("nodes") or [] for n in nodes: + if n is None: + continue self.remaining_nodes.discard(n) self.log_node("Cached", n) + if self.emitter.json_mode: + self.emitter.emit_node_cached(n) self.update_overall_progress() def on_progress(self, data): - node = data["node"] - if self.progress_node != node: - self.progress_node = node - if self.progress_task: - self.progress.remove_task(self.progress_task) - - self.progress_task = self.progress.add_task( - self.get_node_title(node), total=data["max"], progress_type="node" - ) - self.progress.update(self.progress_task, completed=data["value"]) - - def on_executed(self, data): - self.remaining_nodes.discard(data["node"]) - self.update_overall_progress() - - if "output" not in data: + node = data.get("node") + if node is None: return + value = data.get("value", 0) + max_val = data.get("max", 0) + if self.progress is not None: + if self.progress_node != node: + self.progress_node = node + if self.progress_task is not None: + self.progress.remove_task(self.progress_task) + self.progress_task = self.progress.add_task( + self.emitter.get_title(node), total=max_val, progress_type="node" + ) + self.progress.update(self.progress_task, completed=value) + if self.emitter.json_mode: + self.emitter.emit_node_progress(node, value, max_val) - output = data["output"] - - if output is None or "images" not in output: + def on_executed(self, data): + node_id = data.get("node") + if node_id is None: return + self.remaining_nodes.discard(node_id) + self.update_overall_progress() - for img in output["images"]: - self.outputs.append(self.format_image_path(img)) + # node_executed fires whenever the server emits `executed`, even + # when there are no file-shaped outputs (outputs=[] in that case). + structured_outputs: list[dict] = [] + output = data.get("output") + if isinstance(output, dict): + for category, items in output.items(): + if not isinstance(items, list): + continue + for item in items: + if not isinstance(item, dict) or "filename" not in item: + continue + obj = self._build_output_object(node_id, category, item) + structured_outputs.append(obj) + if not self.emitter.json_mode: + # Legacy string list, only consumed by the Rich path. + self.outputs.append(self.format_image_path(item)) + + if self.emitter.json_mode: + self.emitter.emit_node_executed(node_id, structured_outputs) def on_error(self, data): - pprint(f"[bold red]Error running workflow\n{json.dumps(data, indent=2)}[/bold red]") + raw_node_id = data.get("node_id", "") + node_id = str(raw_node_id) if raw_node_id is not None else "" + class_type = data.get("node_type") or data.get("class_type") or "" + exception_type = data.get("exception_type", "") + raw_tb = data.get("traceback", "") + if isinstance(raw_tb, list): + traceback_str = "".join(str(x) for x in raw_tb) + elif isinstance(raw_tb, str): + traceback_str = raw_tb + else: + traceback_str = "" + message = data.get("exception_message") or "Workflow execution failed" + + self._stop_progress() + if self.emitter.json_mode: + title = self.emitter.get_title(node_id) if node_id else "" + if not class_type and node_id: + class_type = self.emitter.get_class_type(node_id) + self.emitter.emit_failed( + "execution_error", + message, + node_id=node_id, + class_type=class_type, + title=title, + exception_type=exception_type, + traceback=traceback_str, + ) + else: + pprint(f"[bold red]Error running workflow\n{json.dumps(data, indent=2)}[/bold red]") + raise typer.Exit(code=1) + + def on_interrupted(self, data): + self._stop_progress() + if self.emitter.json_mode: + self.emitter.emit_failed( + "execution_interrupted", + "Workflow execution was interrupted", + ) + else: + pprint("[yellow]Workflow execution was interrupted[/yellow]") raise typer.Exit(code=1) diff --git a/comfy_cli/env_checker.py b/comfy_cli/env_checker.py index d5978b04..8fa51350 100644 --- a/comfy_cli/env_checker.py +++ b/comfy_cli/env_checker.py @@ -32,15 +32,18 @@ def format_python_version(version_info): return f"[bold red]{version_info.major}.{version_info.minor}.{version_info.micro}[/bold red]" -def check_comfy_server_running(port=8188, host="localhost"): +def check_comfy_server_running(port=8188, host="localhost", timeout: float = 5.0): """ Checks if the Comfy server is running by making a GET request to the /history endpoint. + `timeout` bounds the probe so a TCP-reachable but unresponsive server + (e.g., stuck in a CUDA kernel) doesn't hang the caller. + Returns: bool: True if the Comfy server is running, False otherwise. """ try: - response = requests.get(f"http://{host}:{port}/history") + response = requests.get(f"http://{host}:{port}/history", timeout=timeout) return response.status_code == 200 except requests.exceptions.RequestException: return False diff --git a/comfy_cli/tracking.py b/comfy_cli/tracking.py index 570dee46..106dd438 100644 --- a/comfy_cli/tracking.py +++ b/comfy_cli/tracking.py @@ -1,5 +1,6 @@ import functools import logging as logginglib +import sys import uuid import typer @@ -29,6 +30,13 @@ tracing_id = str(uuid.uuid4()) workspace_manager = WorkspaceManager() +# Process-scoped opt-in used when running non-interactively before the +# user has ever recorded a consent choice. Captures agentic usage without +# persisting the consent flag, so a later interactive run can still +# prompt the human. The anonymous user_id is persisted separately for +# stable agent identity in analytics. +_session_only_tracking = False + app = typer.Typer() @@ -50,7 +58,7 @@ def track_event(event_name: str, properties: any = None): properties = {} logging.debug(f"tracking event called with event_name: {event_name} and properties: {properties}") enable_tracking = config_manager.get_bool(constants.CONFIG_KEY_ENABLE_TRACKING) - if not enable_tracking: + if not enable_tracking and not _session_only_tracking: return try: @@ -90,15 +98,42 @@ def wrapper(*args, **kwargs): def prompt_tracking_consent(skip_prompt: bool = False, default_value: bool = False): + global _session_only_tracking, user_id + + if _session_only_tracking: + return + tracking_enabled = config_manager.get_bool(constants.CONFIG_KEY_ENABLE_TRACKING) if tracking_enabled is not None: return if skip_prompt: init_tracking(default_value) - else: - enable_tracking = ui.prompt_confirm_action("Do you agree to enable tracking to improve the application?", False) - init_tracking(enable_tracking) + return + + # When stdin or stdout is not a TTY (subprocess pipe, redirect, CI), + # blocking on the consent prompt would either hang the caller forever + # or corrupt their output stream. Enable tracking for this process and + # persist a stable anonymous user_id so repeat agentic usage from the + # same machine attributes to one identity. The consent flag itself + # stays unset so a later interactive run can still ask the human; if + # they consent, init_tracking will reuse this user_id. + if not sys.stdin.isatty() or not sys.stdout.isatty(): + _session_only_tracking = True + if user_id is None: + user_id = str(uuid.uuid4()) + # Best-effort persistence — a read-only config dir (fresh CI, + # restricted sandbox) must not crash the caller. If the write + # fails we keep the in-memory user_id so this process still + # tracks normally; the next run on a writable host will retry. + try: + config_manager.set(constants.CONFIG_KEY_USER_ID, user_id) + except OSError: + pass + return + + enable_tracking = ui.prompt_confirm_action("Do you agree to enable tracking to improve the application?", False) + init_tracking(enable_tracking) def init_tracking(enable_tracking: bool): diff --git a/docs/json-output.md b/docs/json-output.md new file mode 100644 index 00000000..c5f83bbb --- /dev/null +++ b/docs/json-output.md @@ -0,0 +1,702 @@ +# `comfy run --json`: Machine-Readable Output (NDJSON) + +This document specifies the output contract of `comfy run --json`. The intent +is to give agents and automation a stable, parseable view of a workflow +execution — independent of the human-readable Rich-formatted output that +`comfy run` emits by default. + +## Overview + +When `--json` is passed, `comfy run` switches into a strict +machine-readable mode: + +- **stdout** carries exclusively **NDJSON** (newline-delimited JSON): one + JSON object per line, each terminated by `\n`. No ANSI, no progress bar, + no headings. Each line is written and flushed to stdout as soon as the + underlying event is produced; agents may rely on read-as-emitted timing — + there is no batching. +- The stream is 7-bit ASCII clean. Non-ASCII characters in string fields + are emitted as `\uXXXX` JSON escapes (equivalent to Python's + `json.dumps(..., ensure_ascii=True)`). +- **stderr** is reserved for things the CLI cannot route through the JSON + contract: framework-level Python errors, uncaught exceptions, library + warnings. Agents should not parse stderr; they may discard it or + capture it for diagnostics. +- **Exit code** is `0` when the terminal event is `completed`, + `queued` (`--no-wait` mode), or `prompt_preview` (`--print-prompt` + mode); `1` on a `failed` terminal event. Error categorisation is + carried in the `error.kind` field of the `failed` event, not in the + exit code (see [Stability](#stability-and-exit-codes)). + +In `--json` mode, `--verbose` has no effect: agents receive the full event +stream regardless. + +**Workflow input format.** `--workflow` accepts both the ComfyUI **API +format** (the canonical `{node_id: {class_type, inputs, ...}}` graph +produced by "Save (API Format)") and the **exported UI format** (the +`{nodes: [...], links: [...]}` shape produced by "Save"). UI workflows +are converted to API format client-side via `/object_info` before +queuing; conversion is signalled by a [`converted`](#converted) event +emitted before [`queued`](#queued). API-format input does not produce a +`converted` event. + +All duration fields in this contract are floats representing seconds. +Numeric count fields (e.g., `node_progress.value` / `max`) are JSON +`number` and may be int or float depending on the underlying node. + +This contract targets ComfyUI servers reached via `--host` / `--port` +(typically local). Cloud-specific URL routing (e.g., `/api` prefix, +endpoint renames like `/history` → `/history_v2`) is not exercised in +v1 and may not work without additional flag wiring — agents talking to +Comfy Cloud should expect to use their own client code for now. + +## Stream shape + +Every line on stdout is a JSON object containing a non-empty string +`event` field acting as a discriminator. Agents must dispatch on this +field. In normal `--json` execution the stream ends with a terminal +event (`completed` or `failed`). In `--no-wait` mode the stream ends +at `queued`, and the agent is responsible for polling +`/history/{prompt_id}` to observe completion. In `--print-prompt` +mode the stream ends at `prompt_preview` (no execution happens, so +`completed`/`failed` would be category-error events about something +that was never started). See [Process-level termination](#process-level-termination) +for the edge case where the CLI is killed before emitting its terminal +event. + +### Stream archetypes + +The stream takes one of these shapes depending on the workflow format and +outcome: + +| Outcome | Stream | +| --------------------------------- | ------------------------------------------------------------------- | +| Success | `[converted]? + prompt_preview + queued + [node_*]* + completed` | +| `--no-wait` queued | `[converted]? + prompt_preview + queued` | +| `--print-prompt` | `[converted]? + prompt_preview` (terminal) | +| Failure mid-execution | `[converted]? + prompt_preview + queued + [node_*]* + failed` | +| Failure during submission | `[converted]? + prompt_preview + failed` | +| Failure pre-flight | `failed` | + +Where `[node_*]*` is zero or more interleaved `node_cached`, +`node_executing`, `node_progress`, and `node_executed` events. `[X]?` +means X may or may not appear. + +### Early-exit rule + +`failed` can replace any non-terminal event, terminating the stream +early. The archetypes table above is the canonical view of what shapes +of stream agents will actually see; the early-exit rule is the universal +quantifier behind it. + +### Schema version + +Every event in v1 streams carries `schema_version: 1`. The field is a +monotonically increasing integer; minor versions are not used. Future +schema versions will emit `schema_version: 2`, etc., on every event. +Agents may read the version from any line. Agents needing +feature-presence detection beyond the integer version should +feature-detect by field presence rather than version comparison. + +## Event reference + +| Event | When | Terminal | +| ----------------- | ------------------------------------------------- | -------- | +| `converted` | UI-format workflow was client-side converted | | +| `prompt_preview` | The API-format workflow graph about to be submitted | (✓ in `--print-prompt`) | +| `queued` | Server accepted the prompt (HTTP 200 on `/prompt`)| (✓ in `--no-wait`) | +| `node_cached` | Node hit the execution cache and was skipped | | +| `node_executing` | Node started execution | | +| `node_progress` | In-flight progress update for the running node | | +| `node_executed` | Node finished and reported its outputs | | +| `completed` | Workflow finished successfully | ✓ | +| `failed` | Workflow could not complete | ✓ | + +Agents must ignore events whose `event` value they do not recognise — +new event kinds may be added in a backward-compatible manner. Agents +must ignore unknown fields on known events for the same reason. + +A handful of fields carry values from a server-defined open set rather +than a fixed enumeration: `class_type`, `category`, `type`, and +`exception_type`. Each is flagged **Open set** at one canonical +description below; the same treatment applies wherever that field +appears across events. Agents must accept and pass through unknown +values without keying behaviour on specific strings. + +Every event that names a single node also carries a `title` field — +the human-readable label to show in a per-node UI. The contract for +`title` is the same wherever it appears: **`_meta.title` if present, +else `class_type`, else `node_id`**. Per-event field tables list it +simply as "display label" rather than repeating the chain. + +### `converted` + +Emitted once if the input workflow was in UI format and was converted to +API format client-side. Not emitted when the input was already in API +format. + +```json +{"event": "converted", "schema_version": 1, "node_count": 2} +``` + +| Field | Type | Description | +| ---------------- | ---- | ---------------------------------------------- | +| `event` | str | `"converted"` | +| `schema_version` | int | `1` | +| `node_count` | int | Number of nodes in the converted graph | + +### `prompt_preview` + +Emitted in `--json` mode once the workflow has been successfully +loaded, parsed, and (if UI-format) converted — i.e., in every stream +except the **Failure pre-flight** archetype (a `failed`-only stream +where the CLI bails out before it has a workflow to preview: file +errors, parse errors, server-probe / `/object_info` failures, UI +conversion failures, etc.). Fires immediately after the optional +`converted` event and immediately before `queued`. Carries the API-format workflow graph the CLI is +about to POST to `/prompt` — the same dict that would land in the +request's `prompt` field. Gives agents a complete audit trail of what +was submitted, useful for debugging conversions, logging, and +post-mortem analysis, without needing a separate run. + +Under `--print-prompt` this event is also the **terminal** event: +the CLI emits it and exits 0 without queuing. In normal flow it's +informational and execution continues with `queued`. + +```json +{"event": "prompt_preview", "schema_version": 1, "prompt": {"1": {"class_type": "EmptyLatentImage", "inputs": {"width": 512, "height": 512, "batch_size": 1}}}} +``` + +| Field | Type | Description | +| ---------------- | ---- | ---------------------------------------------------------------------- | +| `event` | str | `"prompt_preview"` | +| `schema_version` | int | `1` | +| `prompt` | dict | The API-format workflow graph keyed by node id. Same shape as the `prompt` field POSTed to `/prompt`. Does NOT include `client_id` or `extra_data` (those are runtime fields, not part of the workflow — so any `--api-key` value never appears here). | + +For UI-format input under `--print-prompt`, `/object_info` must still +be reachable (the converter consults it). For API-format input under +`--print-prompt`, no server requests are made and the command works +fully offline. In normal (non-`--print-prompt`) flow the server is +always contacted regardless, since execution follows. + +### `queued` + +Emitted after `POST /prompt` returns 200. Carries the server's prompt +handle, which can be used to correlate against `/history/{prompt_id}`. + +```json +{ + "event": "queued", + "schema_version": 1, + "prompt_id": "9b1c…", + "client_id": "fe2a…", + "validation_warnings": [], + "nodes": [ + {"node_id": "1", "class_type": "GeminiNanoBanana2", "title": "Nano Banana 2"}, + {"node_id": "2", "class_type": "SaveImage", "title": "Save Image"} + ] +} +``` + +| Field | Type | Description | +| --------------------- | -------------- | ------------------------------------------------------------ | +| `event` | str | `"queued"` | +| `schema_version` | int | `1` | +| `prompt_id` | str | Server-assigned prompt UUID | +| `client_id` | str | Client-generated UUID (sent with `/prompt`) | +| `validation_warnings` | array of dict | List of per-node validation issues that ComfyUI reported alongside a successful queue (some output chains validated, others didn't). Same record shape as `validation_error.node_errors` (see below). Empty (`[]`) in the common case. | +| `nodes` | array of dict | Manifest of every node in the submitted (post-conversion) workflow. Each entry has `node_id` (str), `class_type` (str), and `title` (str — display label, see canonical `title` rule). Lets piped consumers (who don't have the workflow file at hand) render a per-node UI immediately without waiting for `completed`. | + +The `validation_warnings` field exists specifically for the case where +ComfyUI's `validate_prompt` returns success because at least one output +chain validated, but other nodes failed validation and will not run. +This field is not a general warnings channel; other warning surfaces +would require separate spec decisions. + +### `node_cached` + +Emitted up front as a group, before any `node_executing` in the same +run, listing nodes whose outputs were retrieved from the execution +cache. Comes from ComfyUI's `execution_cached` websocket message. + +Note: for cached nodes that had prior UI output (e.g., a cached +`SaveImage`), ComfyUI emits both `execution_cached` AND a per-node +`executed` payload with the cached UI dict. The CLI surfaces both: a +single node may produce both `node_cached` and `node_executed` events. +See [completed](#completed) for the resulting semantics of +`cached_node_ids` / `executed_node_ids`. + +```json +{"event": "node_cached", "schema_version": 1, "node_id": "1", "class_type": "GeminiNanoBanana2", "title": "Nano Banana 2"} +``` + +| Field | Type | Description | +| ---------------- | ---- | ------------------------------------------- | +| `event` | str | `"node_cached"` | +| `schema_version` | int | `1` | +| `node_id` | str | Node key in the workflow dict | +| `class_type` | str | Node class name. **Open set** — current ComfyUI versions emit names like `KSampler`, `SaveImage`, plus arbitrary custom-node class names; agents must accept and pass through unknown values without keying behaviour on specific strings. | +| `title` | str | display label (see canonical `title` rule above) | + +### `node_executing` + +Emitted when a node starts execution. Subsequent `node_progress` events +(if any) refer to this node until either a different `node_executing` +arrives or the node's `node_executed` event is emitted. + +Two consecutive `node_executing` events with different `node_id` values +are normal: the first node finished but didn't surface a result the +server forwarded as `executed` (this happens for intermediate compute +nodes whose outputs aren't published to the client). Agents that track +a "current node" should treat a new `node_executing` as implicitly +closing the previous one. + +```json +{"event": "node_executing", "schema_version": 1, "node_id": "2", "class_type": "SaveImage", "title": "Save Image"} +``` + +Fields are identical to `node_cached`. + +### `node_progress` + +Per-step progress for samplers, video encoders, and any node that calls +`ProgressBar.update_absolute(...)`. Shares the `node_id` of the most +recently-emitted `node_executing` event. + +```json +{"event": "node_progress", "schema_version": 1, "node_id": "1", "class_type": "GeminiNanoBanana2", "title": "Nano Banana 2", "value": 14, "max": 30} +``` + +| Field | Type | Description | +| ---------------- | ------ | ----------------------------------------------------------- | +| `event` | str | `"node_progress"` | +| `schema_version` | int | `1` | +| `node_id` | str | Node currently running | +| `class_type` | str | Node class name (duplicated from the workflow so stateless consumers don't need to track the prior `node_executing`) | +| `title` | str | display label (see canonical `title` rule above) | +| `value` | number | Current progress. Typically int (step count); some custom nodes emit float (fractional progress). Defaults to `0` when the server omits the field. | +| `max` | number | Total progress; same caveat as `value`. Defaults to `0` when the server omits the field. | + +Some custom nodes may emit `value > max` near the end of execution. +Agents rendering a progress bar should clamp `value` to `max`. + +### `node_executed` + +Emitted when the server reports node completion via its `executed` +websocket message. **Not guaranteed for every executed node** — +intermediate compute nodes that don't surface output to the client may +finish without emitting this event. Output nodes (like `SaveImage`) and +some custom partner nodes that publish previews reliably emit it. A +cached output-bearing node also emits `node_executed` (in addition to +`node_cached`). + +```json +{ + "event": "node_executed", + "schema_version": 1, + "node_id": "2", + "class_type": "SaveImage", + "title": "Save Image", + "outputs": [ + { + "category": "images", + "node_id": "2", + "class_type": "SaveImage", + "title": "Save Image", + "filename": "banana_test_00001_.png", + "subfolder": "", + "type": "output", + "url": "http://127.0.0.1:8188/view?filename=banana_test_00001_.png&subfolder=&type=output" + } + ] +} +``` + +| Field | Type | Description | +| ---------------- | ---------------- | ------------------------------------------- | +| `event` | str | `"node_executed"` | +| `schema_version` | int | `1` | +| `node_id` | str | Node key | +| `class_type` | str | Node class name | +| `title` | str | display label (see canonical `title` rule above) | +| `outputs` | array of Output | File-like outputs (empty if none) | + +`outputs` is populated by iterating each key in ComfyUI's +`executed.output` dict and emitting any item that matches the +file-record shape (a dict containing a `filename` key). Items that are +not file-record-shaped (strings, booleans, mixed lists from nodes that +publish non-file data like text predictions or animation flags) are +silently skipped. See [Output object](#output-object) for the entry +shape. + +### `completed` + +Terminal event on success. Carries identifiers, timing, the aggregated +output list, and node-execution metadata. + +```json +{ + "event": "completed", + "schema_version": 1, + "prompt_id": "9b1c…", + "client_id": "fe2a…", + "elapsed_seconds": 8.342, + "outputs": [], + "cached_node_ids": ["1"], + "executed_node_ids": ["2"] +} +``` + +| Field | Type | Description | +| ------------------- | --------------- | ------------------------------------------------------------ | +| `event` | str | `"completed"` | +| `schema_version` | int | `1` | +| `prompt_id` | str | Server-assigned prompt UUID | +| `client_id` | str | Client-generated UUID | +| `elapsed_seconds` | float | Wall-clock duration from start of `comfy run` (same clock as `failed.elapsed_seconds`) | +| `outputs` | array of Output | All file-like outputs across all nodes (empty if none) | +| `cached_node_ids` | array of str | Node IDs the server reported as cached (via `execution_cached`) | +| `executed_node_ids` | array of str | Node IDs the executor *ran* — the union of every `node_id` that appeared in a `node_executing` or `node_executed` event. Named for what the executor did (run a node), broader than the leaf-only `node_executed` event: includes intermediate compute nodes (CheckpointLoaderSimple, KSampler, etc.) that don't surface output to the client. | + +`cached_node_ids` and `executed_node_ids` are independent signals about +what the server reported. **They may overlap**: a cached output-bearing +node emits both `execution_cached` and `executed`, so it appears in +both lists. Agents wanting "ran fresh, not from cache" should compute +`set(executed_node_ids) - set(cached_node_ids)`. + +### `failed` + +Terminal event on any failure. The `error.kind` discriminator is the +documented stable enum (see [Error object](#error-object) and +[Error kinds](#error-kinds)). + +```json +{ + "event": "failed", + "schema_version": 1, + "prompt_id": "9b1c…", + "client_id": "fe2a…", + "elapsed_seconds": 1.23, + "error": { + "kind": "execution_error", + "message": "API key invalid", + "node_id": "1", + "class_type": "GeminiNanoBanana2", + "title": "Nano Banana 2", + "exception_type": "RuntimeError", + "traceback": " File \"/path/to/node.py\", line 42, in execute\n raise RuntimeError(\"API key invalid\")\n" + } +} +``` + +| Field | Type | Description | +| ----------------- | ----------- | -------------------------------------------------------------------------- | +| `event` | str | `"failed"` | +| `schema_version` | int | `1` | +| `prompt_id` | str \| null | `null` when failure occurred before `/prompt` was accepted | +| `client_id` | str \| null | `null` when failure occurred before `WorkflowExecution` was constructed | +| `elapsed_seconds` | float | Wall-clock duration from start of `comfy run` | +| `error` | Error | See [Error object](#error-object) | + +If `prompt_id` is non-null, `client_id` is also non-null (a `prompt_id` +cannot be assigned without a `client_id`). + +## Output object + +```json +{ + "category": "images", + "node_id": "2", + "class_type": "SaveImage", + "title": "Save Image", + "filename": "banana_test_00001_.png", + "subfolder": "", + "type": "output", + "url": "http://127.0.0.1:8188/view?filename=..." +} +``` + +| Field | Type | Description | +| ------------ | ----------- | -------------------------------------------------------------------------------------------------------- | +| `category` | str | Output category as keyed by ComfyUI's `executed.output` dict. **Open set.** Current ComfyUI versions emit values like `images`, `audio`, `3d`, `latents`; agents must accept and pass through unknown values. | +| `node_id` | str | Node that produced the output | +| `class_type` | str | Node class name | +| `title` | str | display label (see canonical `title` rule above) | +| `filename` | str | Raw filename as reported by the server | +| `subfolder` | str | Subfolder within the output folder's root. Defaults to `""` when the server omits or empties the field. | +| `type` | str | ComfyUI output folder discriminator. **Open set.** Current ComfyUI versions emit `output`, `temp`, `input`; agents must accept and pass through unknown values. Defaults to `"output"` when the server omits or empties the field. | +| `url` | str | `http(s)://:/view?...` URL — always present, fetch this to get the bytes | + +### Fetching output bytes + +The `url` field is the only contractual way to retrieve an output's +bytes. It points at ComfyUI's `/view` endpoint and works whether the +agent is on the same machine as ComfyUI, on a different host, or +talking to Cloud. For the local case, a loopback HTTP fetch from a +ComfyUI on the same box is cheap — the agent's HTTP client reads +through the kernel loopback in the same way it'd read a local file. + +An earlier draft of v1 also emitted a `local_path` field for the +same-machine case; it was removed because resolving ComfyUI's actual +output directory reliably (across manual launches, alternate install +paths, multi-install machines, bind-mounted volumes) wasn't feasible. +Agents should rely on `url` exclusively. + +## Error object + +Every `failed` event carries an `error` object with these universal +fields, plus per-kind extras documented in [Error kinds](#error-kinds). + +| Field | Type | Description | +| --------- | ---- | -------------------------------------------------------------------------------------------- | +| `kind` | str | Discriminator; one of the values in [Error kinds](#error-kinds) | +| `message` | str | Human-readable summary. For display only — agents should dispatch on `kind`, not on `message`| + +The set of per-kind extra fields is the documented minimum. New +optional extra fields may be added in non-breaking releases; existing +fields will not be removed or renamed without a `schema_version` bump. + +## Error kinds + +Per-kind extra fields. Universal fields (`kind`, `message`) are +documented in [Error object](#error-object). + +Each `error.kind` is a stable string. New kinds may be added in +backward-compatible releases; existing kinds will not be renamed or +removed without a schema version bump. + +| `kind` | Triggered when | Extra fields | +| ------------------------- | --------------------------------------------------------------------------------------------- | -------------------------------------------------- | +| `workflow_not_found` | `--workflow` path does not exist | — | +| `workflow_invalid_json` | Workflow file is not valid JSON | — | +| `workflow_read_error` | Workflow file exists but isn't readable as text (`OSError`, `UnicodeDecodeError`) | — | +| `workflow_format_invalid` | File parses but is neither UI nor API format | — | +| `workflow_empty` | Workflow has no executable nodes (UI conversion produced `{}`, or API workflow is `{}`) | — | +| `conversion_error` | UI→API converter raised `WorkflowConversionError` | — | +| `conversion_crash` | UI→API converter raised an unexpected exception | `exception_type` (str) | +| `object_info_unavailable` | `/object_info` returned an HTTP error, or an HTTP 200 with an unparseable body | `status_code` (int), `body` (str) | +| `connection_error` | ComfyUI server unreachable: `URLError`, `TimeoutError`, or other `OSError` while contacting it (including on `/object_info`) | — | +| `validation_error` | Server returned HTTP 400 with `node_errors` | `node_errors` (array of dict; see [shape](#validation_errornode_errors-shape)) | +| `client_error` | Server returned an HTTP 4xx response (not validation) | `status_code` (int, 4xx), `body` (str) | +| `server_error` | Server returned an HTTP 5xx response | `status_code` (int, 5xx), `body` (str) | +| `invalid_response` | Server returned HTTP 2xx but body was unparseable or lacked `prompt_id` | `status_code` (int, 2xx), `body` (str) | +| `timeout` | WebSocket `recv` timed out | `timeout_seconds` (float) | +| `connection_lost` | WebSocket connection dropped mid-execution | — | +| `execution_interrupted` | Workflow was interrupted — either by the server (`execution_interrupted` WS, e.g., via `/interrupt`) or by the client process receiving `SIGINT` (Ctrl-C) | — | +| `execution_error` | A node raised during execution (server emitted `execution_error`) | `node_id` (str), `class_type` (str), `title` (str — display label, see canonical `title` rule), `exception_type` (str), `traceback` (str) | + +### `exception_type` field + +`exception_type` is provided for diagnostic and observability purposes +(e.g., metrics bucketing). **Open set** — the format is whatever +ComfyUI sends, typically the bare class name for builtins +(`RuntimeError`, `ValueError`) and a dotted module path for non- +builtins (`comfy.model_management.InterruptProcessingException`). May +be `""` when the server omits it. Agents should not key retry or +routing logic on `exception_type`; use `error.kind` for coarse +dispatch and `error.message` for human display. + +### `traceback` field + +`traceback` is a single multi-line string carrying the formatted stack +frames as reported by ComfyUI (joined from the server's +`traceback.format_tb()` output). It does NOT include the +`"Traceback (most recent call last):"` header or the final +`"ExceptionType: message"` summary line — agents reconstructing a +Python-style display can do so themselves from `exception_type`, +`error.message`, and `traceback`. May be empty (`""`) when the server's +formatted stack is empty. + +After `json.loads()`, the `traceback` string contains real newline +characters (the JSON wire-format `\n` escapes are decoded). + +### `validation_error.node_errors` shape + +The same shape is used for `queued.validation_warnings`. The value is +an array of self-contained records, one per affected node. Each record +carries `node_id` (str — same identifier as appears in `node_*` events) +plus the per-node fields ComfyUI emits. Example shape: + +```json +"node_errors": [ + { + "node_id": "1", + "errors": [ + { + "type": "value_not_in_list", + "message": "Value not in list", + "details": "resolution: '5K' not in ['1K','2K','4K']", + "extra_info": { + "input_name": "resolution", + "received_value": "5K" + } + } + ], + "dependent_outputs": ["2"], + "class_type": "GeminiNanoBanana2" + } +] +``` + +The inner per-node fields are defined by ComfyUI's `validate_prompt()` +in [`server.py`](https://github.com/comfyanonymous/ComfyUI/blob/master/server.py) +and may evolve with ComfyUI versions. **Agents should ignore unknown +fields.** The CLI guarantees only: +- the outer value is an array of dicts, each carrying a `node_id` (str), and +- under current ComfyUI versions, each record additionally carries + `errors`, `dependent_outputs`, and `class_type`. + +The record order matches ComfyUI's response order and is not guaranteed +to be sorted; consumers that need a specific order should sort +themselves. + +## Process-level termination + +The CLI may be terminated by the operating system or a parent process +(SIGKILL, SIGTERM, SIGINT, OOM-kill, segmentation fault). In these +cases, no terminal event is emitted and the stream may be truncated. + +Agents should treat the run as failed when **both**: +- the process exit code is non-zero, and +- the last line on stdout is not one of the documented terminal events + (`completed`, `failed`, `queued` under `--no-wait`, or `prompt_preview` + under `--print-prompt`), or stdout is empty. + +Stderr may contain a Python traceback in these cases. + +## Examples + +Class type names in these examples (`SaveImage`, `GeminiNanoBanana2`, +etc.) are illustrative — they reflect specific ComfyUI/partner nodes. +Agents should not hardcode behavior on specific `class_type` strings; +the contract guarantees the *shape* of these fields, not their content. + +Every line, including the terminal event (`completed` / `failed` / +`queued` under `--no-wait` / `prompt_preview` under `--print-prompt`), +ends with `\n`. Agents using line iteration (`for line in stdout`) are fine; +agents using `splitlines()` or `split("\n")` should filter empty +trailing entries. + +### Successful run (UI-format input) + +```json +{"event":"converted","schema_version":1,"node_count":2} +{"event":"prompt_preview","schema_version":1,"prompt":{"1":{"class_type":"GeminiNanoBanana2","inputs":{"prompt":"a banana","width":2048,"height":2048},"_meta":{"title":"Nano Banana 2"}},"2":{"class_type":"SaveImage","inputs":{"filename_prefix":"banana_test","images":["1",0]},"_meta":{"title":"Save Image"}}}} +{"event":"queued","schema_version":1,"prompt_id":"9b1c…","client_id":"fe2a…","validation_warnings":[],"nodes":[{"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2"},{"node_id":"2","class_type":"SaveImage","title":"Save Image"}]} +{"event":"node_executing","schema_version":1,"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2"} +{"event":"node_progress","schema_version":1,"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2","value":1,"max":4} +{"event":"node_progress","schema_version":1,"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2","value":4,"max":4} +{"event":"node_executing","schema_version":1,"node_id":"2","class_type":"SaveImage","title":"Save Image"} +{"event":"node_executed","schema_version":1,"node_id":"2","class_type":"SaveImage","title":"Save Image","outputs":[{"category":"images","node_id":"2","class_type":"SaveImage","title":"Save Image","filename":"banana_test_00001_.png","subfolder":"","type":"output","url":"http://127.0.0.1:8188/view?filename=banana_test_00001_.png&subfolder=&type=output"}]} +{"event":"completed","schema_version":1,"prompt_id":"9b1c…","client_id":"fe2a…","elapsed_seconds":8.342,"outputs":[{"category":"images","node_id":"2","class_type":"SaveImage","title":"Save Image","filename":"banana_test_00001_.png","subfolder":"","type":"output","url":"http://127.0.0.1:8188/view?filename=banana_test_00001_.png&subfolder=&type=output"}],"cached_node_ids":[],"executed_node_ids":["1","2"]} +``` + +Exit code: `0`. + +Note: node 1 (`GeminiNanoBanana2`) does not emit a `node_executed` +event in this example — it's an intermediate compute node whose result +is forwarded via tensors rather than surfaced as a file output, so the +server doesn't send an `executed` ws message for it. + +### `--no-wait` (API-format input) + +```json +{"event":"prompt_preview","schema_version":1,"prompt":{"1":{"class_type":"GeminiNanoBanana2","inputs":{"prompt":"a banana","width":2048,"height":2048},"_meta":{"title":"Nano Banana 2"}},"2":{"class_type":"SaveImage","inputs":{"filename_prefix":"banana_test","images":["1",0]},"_meta":{"title":"Save Image"}}}} +{"event":"queued","schema_version":1,"prompt_id":"9b1c…","client_id":"fe2a…","validation_warnings":[],"nodes":[{"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2"},{"node_id":"2","class_type":"SaveImage","title":"Save Image"}]} +``` + +Exit code: `0`. The agent is responsible for polling +`/history/{prompt_id}` to observe completion. + +### Failure: workflow file missing + +```json +{"event":"failed","schema_version":1,"prompt_id":null,"client_id":null,"elapsed_seconds":0.001,"error":{"kind":"workflow_not_found","message":"Workflow file not found: /tmp/missing.json"}} +``` + +Exit code: `1`. + +### Failure: server returned validation errors + +```json +{"event":"converted","schema_version":1,"node_count":2} +{"event":"prompt_preview","schema_version":1,"prompt":{"1":{"class_type":"GeminiNanoBanana2","inputs":{"prompt":"a banana","resolution":"5K"},"_meta":{"title":"Nano Banana 2"}},"2":{"class_type":"SaveImage","inputs":{"filename_prefix":"banana_test","images":["1",0]},"_meta":{"title":"Save Image"}}}} +{"event":"failed","schema_version":1,"prompt_id":null,"client_id":"fe2a…","elapsed_seconds":0.45,"error":{"kind":"validation_error","message":"Value not in list","node_errors":[{"node_id":"1","errors":[{"type":"value_not_in_list","message":"Value not in list","details":"resolution: '5K' not in ['1K','2K','4K']","extra_info":{"input_name":"resolution","received_value":"5K"}}],"dependent_outputs":["2"],"class_type":"GeminiNanoBanana2"}]}} +``` + +Exit code: `1`. + +### Failure: node raised during execution + +```json +{"event":"prompt_preview","schema_version":1,"prompt":{"1":{"class_type":"GeminiNanoBanana2","inputs":{"prompt":"a banana","width":2048,"height":2048},"_meta":{"title":"Nano Banana 2"}},"2":{"class_type":"SaveImage","inputs":{"filename_prefix":"banana_test","images":["1",0]},"_meta":{"title":"Save Image"}}}} +{"event":"queued","schema_version":1,"prompt_id":"9b1c…","client_id":"fe2a…","validation_warnings":[],"nodes":[{"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2"},{"node_id":"2","class_type":"SaveImage","title":"Save Image"}]} +{"event":"node_executing","schema_version":1,"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2"} +{"event":"failed","schema_version":1,"prompt_id":"9b1c…","client_id":"fe2a…","elapsed_seconds":2.1,"error":{"kind":"execution_error","message":"API key invalid","node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2","exception_type":"RuntimeError","traceback":" File \"/path/to/node.py\", line 42, in execute\n raise RuntimeError(\"API key invalid\")\n"}} +``` + +Exit code: `1`. + +### Failure: websocket timeout + +```json +{"event":"prompt_preview","schema_version":1,"prompt":{"1":{"class_type":"GeminiNanoBanana2","inputs":{"prompt":"a banana","width":2048,"height":2048},"_meta":{"title":"Nano Banana 2"}},"2":{"class_type":"SaveImage","inputs":{"filename_prefix":"banana_test","images":["1",0]},"_meta":{"title":"Save Image"}}}} +{"event":"queued","schema_version":1,"prompt_id":"9b1c…","client_id":"fe2a…","validation_warnings":[],"nodes":[{"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2"},{"node_id":"2","class_type":"SaveImage","title":"Save Image"}]} +{"event":"node_executing","schema_version":1,"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2"} +{"event":"failed","schema_version":1,"prompt_id":"9b1c…","client_id":"fe2a…","elapsed_seconds":30.0,"error":{"kind":"timeout","message":"WebSocket timed out after 30s waiting for server response","timeout_seconds":30.0}} +``` + +Exit code: `1`. + +### Failure: workflow interrupted + +```json +{"event":"prompt_preview","schema_version":1,"prompt":{"1":{"class_type":"GeminiNanoBanana2","inputs":{"prompt":"a banana","width":2048,"height":2048},"_meta":{"title":"Nano Banana 2"}},"2":{"class_type":"SaveImage","inputs":{"filename_prefix":"banana_test","images":["1",0]},"_meta":{"title":"Save Image"}}}} +{"event":"queued","schema_version":1,"prompt_id":"9b1c…","client_id":"fe2a…","validation_warnings":[],"nodes":[{"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2"},{"node_id":"2","class_type":"SaveImage","title":"Save Image"}]} +{"event":"node_executing","schema_version":1,"node_id":"1","class_type":"GeminiNanoBanana2","title":"Nano Banana 2"} +{"event":"failed","schema_version":1,"prompt_id":"9b1c…","client_id":"fe2a…","elapsed_seconds":3.2,"error":{"kind":"execution_interrupted","message":"Workflow execution was interrupted"}} +``` + +Exit code: `1`. + +## Stability and exit codes + +### What is stable + +For the v1 contract documented here: +- The set of event names listed above and the field names within them. +- The set of `error.kind` values listed above and the per-kind extra + fields documented for each. +- The exit code mapping: `0` when the terminal event is `completed`, + `queued` (under `--no-wait`), or `prompt_preview` (under + `--print-prompt`); `1` on `failed`. +- The stdout/stderr separation: stdout carries only NDJSON (no ANSI, + no human-readable progress bar, no headings); stderr is reserved + for framework-level Python errors, uncaught exceptions, and library + warnings — agents should not parse it. +- The 7-bit ASCII encoding of stdout (non-ASCII characters in string + fields are emitted as `\uXXXX` JSON escapes, equivalent to + `json.dumps(..., ensure_ascii=True)`). +- The `schema_version: 1` field on every event of v1 streams. + +### What may change in a non-breaking way + +- New event types being added (agents must ignore unknown `event` values). +- New `error.kind` values being added (agents must default-handle unknown + kinds). +- New optional fields being added to existing events (agents must ignore + unknown fields). + +New events that would alter the meaning of existing events when +ignored (for example, a per-node skip event whose absence would make +`executed_node_ids` incomplete) require a `schema_version` bump rather +than being treated as an additive change. + +### Why exit codes are not granular + +The 0/1 mapping (defined in "What is stable" above) intentionally +trades resolution for stability. `error.kind` is the expressive, +extensible discriminator — agents dispatch on it; the exit code is +just a coarse "did we succeed?" signal. Granular exit codes can be +introduced later for non-`--json` callers in a separate, +evidence-driven change without breaking the JSON contract. diff --git a/tests/comfy_cli/command/test_run.py b/tests/comfy_cli/command/test_run.py index 3b07db97..a09327ad 100644 --- a/tests/comfy_cli/command/test_run.py +++ b/tests/comfy_cli/command/test_run.py @@ -52,7 +52,6 @@ def mock_execution(workflow): port=8188, verbose=False, progress=progress, - local_paths=False, timeout=30, ) @@ -165,7 +164,6 @@ def _make_exec(self, workflow, api_key=None): port=8188, verbose=False, progress=progress, - local_paths=False, timeout=30, api_key=api_key, ) @@ -262,7 +260,6 @@ def test_unknown_node_ids_verbose(self, workflow): port=8188, verbose=True, progress=progress, - local_paths=False, timeout=30, ) execution.prompt_id = prompt_id @@ -310,7 +307,7 @@ def test_collects_image_outputs(self, mock_execution): class TestExecuteErrorHandling: def _run_execute_expect_exit(self, workflow_file, **overrides): - kwargs = dict(host="127.0.0.1", port=8188, wait=True, verbose=False, local_paths=False, timeout=30) + kwargs = dict(host="127.0.0.1", port=8188, wait=True, verbose=False, timeout=30) kwargs.update(overrides) with pytest.raises(typer.Exit) as exc_info: execute(workflow_file, **kwargs) @@ -507,7 +504,8 @@ def test_ui_workflow_is_converted_then_executed(self, ui_workflow_file): execute(ui_workflow_file, host="127.0.0.1", port=8188, wait=True, timeout=30) - mock_fetch.assert_called_once_with("127.0.0.1", 8188, 30) + mock_fetch.assert_called_once() + assert mock_fetch.call_args.args == ("127.0.0.1", 8188, 30) api_workflow = MockExec.call_args.args[0] assert set(api_workflow) == {"1", "2"} assert api_workflow["1"]["class_type"] == "EmptyLatentImage" @@ -555,7 +553,8 @@ def test_ui_workflow_plumbs_api_key_through_to_execution(self, ui_workflow_file) execute(ui_workflow_file, host="127.0.0.1", port=8188, wait=True, timeout=30, api_key="sk-test") - mock_fetch.assert_called_once_with("127.0.0.1", 8188, 30) + mock_fetch.assert_called_once() + assert mock_fetch.call_args.args == ("127.0.0.1", 8188, 30) assert MockExec.call_args.kwargs["api_key"] == "sk-test" def test_ui_workflow_exits_when_conversion_yields_nothing(self): @@ -585,3 +584,33 @@ def test_ui_workflow_exits_when_conversion_yields_nothing(self): MockExec.assert_not_called() finally: os.unlink(path) + + +class TestWildcardHostSubstitution: + """0.0.0.0 is a wildcard bind that macOS/Windows clients can't connect to; + execute() substitutes it with the canonical loopback so downstream uses + (server probe, /prompt POST, emitted URLs) are portable.""" + + def test_zero_zero_zero_zero_substituted_at_entry(self, workflow_file): + captured = {} + + def fake_check(port, host, *args, **kwargs): + captured["check_host"] = host + return False # short-circuits execute() with a clean exit + + with patch("comfy_cli.command.run.check_comfy_server_running", side_effect=fake_check): + with pytest.raises(typer.Exit): + execute(workflow_file, host="0.0.0.0", port=8188, json_mode=True) + assert captured["check_host"] == "127.0.0.1" + + def test_other_local_hosts_not_substituted(self, workflow_file): + captured = {} + + def fake_check(port, host, *args, **kwargs): + captured["check_host"] = host + return False + + with patch("comfy_cli.command.run.check_comfy_server_running", side_effect=fake_check): + with pytest.raises(typer.Exit): + execute(workflow_file, host="localhost", port=8188, json_mode=True) + assert captured["check_host"] == "localhost" diff --git a/tests/comfy_cli/command/test_run_json.py b/tests/comfy_cli/command/test_run_json.py new file mode 100644 index 00000000..773d9363 --- /dev/null +++ b/tests/comfy_cli/command/test_run_json.py @@ -0,0 +1,1683 @@ +"""Unit tests for `comfy run --json` (NDJSON output mode). + +See `docs/json-output.md` for the contract these tests pin in place. +The tests cover: + - every event type emitted at the right time and shape + - every error.kind for each documented failure path + - schema_version: 1 on every line + - stream archetypes from the spec table + - the duck-typed output filter rule + - the cached/executed overlap semantics +""" + +from __future__ import annotations + +import io +import json +import os +import tempfile +import urllib.error +from unittest.mock import MagicMock, patch + +import pytest +import typer +from websocket import WebSocketException, WebSocketTimeoutException + +from comfy_cli.command.run import ( + JsonEmitter, + WorkflowExecution, + _classify_api_workflow, + execute, +) + + +@pytest.fixture +def simple_workflow(): + return { + "1": { + "class_type": "EmptyLatentImage", + "inputs": {"width": 64, "height": 64, "batch_size": 1}, + "_meta": {"title": "Latent"}, + }, + "2": { + "class_type": "SaveImage", + "inputs": {"filename_prefix": "x", "images": ["1", 0]}, + "_meta": {"title": "Save"}, + }, + } + + +@pytest.fixture +def workflow_file(simple_workflow): + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(simple_workflow, f) + f.flush() + path = f.name + yield path + os.unlink(path) + + +def _run_execute_capture(workflow_path, capsys, **overrides): + """Run execute() and return the parsed JSON events from stdout.""" + kwargs = dict( + host="127.0.0.1", + port=8188, + wait=True, + verbose=False, + timeout=30, + json_mode=True, + ) + kwargs.update(overrides) + try: + execute(workflow_path, **kwargs) + except typer.Exit: + pass + out, _err = capsys.readouterr() + events = [] + for line in out.splitlines(): + line = line.strip() + if not line: + continue + events.append(json.loads(line)) + return events + + +def _make_http_error(code: int, body: bytes = b"") -> urllib.error.HTTPError: + return urllib.error.HTTPError( + url="http://127.0.0.1:8188/prompt", + code=code, + msg=f"HTTP {code}", + hdrs=None, + fp=io.BytesIO(body), + ) + + +def _make_workflow_execution(workflow, *, with_progress: bool = False, json_mode: bool = True): + """Build a `WorkflowExecution` with a `JsonEmitter` pre-wired to the + workflow. `with_progress=True` attaches a MagicMock progress object — + needed by tests that exercise `update_overall_progress`.""" + e = JsonEmitter(json_mode=json_mode) + e.set_workflow(workflow) + progress = None + if with_progress: + progress = MagicMock() + progress.add_task.return_value = 0 + return WorkflowExecution( + workflow=workflow, + host="127.0.0.1", + port=8188, + verbose=False, + progress=progress, + timeout=30, + emitter=e, + ) + + +class TestJsonEmitter: + """Direct emitter tests — verify event shape, schema_version, no-op in non-JSON mode.""" + + def test_noop_in_human_mode(self, capsys): + e = JsonEmitter(json_mode=False) + e.set_client_id("cid") + e.emit_queued("pid", None) + e.emit_completed() + e.emit_failed("workflow_not_found", "x") + out, _ = capsys.readouterr() + assert out == "" + + def test_every_event_has_schema_version_1(self, capsys, simple_workflow): + e = JsonEmitter(json_mode=True) + e.set_workflow(simple_workflow) + e.set_client_id("cid") + e.emit_converted(2) + e.emit_queued("pid", None) + e.emit_node_cached("1") + e.emit_node_executing("2") + e.emit_node_progress("2", 5, 10) + e.emit_node_executed("2", []) + e.emit_completed() + e.emit_failed("execution_error", "x", node_id="2") + out, _ = capsys.readouterr() + lines = [line for line in out.splitlines() if line.strip()] + assert len(lines) == 8 + for line in lines: + event = json.loads(line) + assert event.get("schema_version") == 1, f"Missing schema_version on: {line}" + + def test_queued_includes_validation_warnings_empty(self, capsys): + e = JsonEmitter(json_mode=True) + e.set_client_id("c") + e.emit_queued("p", []) + out, _ = capsys.readouterr() + event = json.loads(out.strip()) + assert event["event"] == "queued" + assert event["validation_warnings"] == [] + assert event["prompt_id"] == "p" + assert event["client_id"] == "c" + # nodes manifest is always present (empty when no workflow set) + assert event["nodes"] == [] + + def test_queued_includes_validation_warnings_list(self, capsys): + e = JsonEmitter(json_mode=True) + e.set_client_id("c") + warnings = [{"node_id": "5", "errors": [{"type": "x", "message": "y"}]}] + e.emit_queued("p", warnings) + out, _ = capsys.readouterr() + event = json.loads(out.strip()) + assert event["validation_warnings"] == warnings + + def test_queued_nodes_manifest_from_workflow(self, capsys, simple_workflow): + """`nodes` should list one entry per workflow node with node_id, class_type, title.""" + e = JsonEmitter(json_mode=True) + e.set_workflow(simple_workflow) + e.set_client_id("c") + e.emit_queued("p", None) + event = json.loads(capsys.readouterr().out.strip()) + nodes = event["nodes"] + assert len(nodes) == 2 + by_id = {n["node_id"]: n for n in nodes} + assert by_id["1"]["class_type"] == "EmptyLatentImage" + assert by_id["1"]["title"] == "Latent" # _meta.title wins + assert by_id["2"]["class_type"] == "SaveImage" + assert by_id["2"]["title"] == "Save" + + def test_node_progress_includes_class_type_and_title(self, capsys, simple_workflow): + """node_progress carries class_type+title so stateless consumers can + render the running node without buffering a prior node_executing event.""" + e = JsonEmitter(json_mode=True) + e.set_workflow(simple_workflow) + e.set_client_id("c") + e.emit_node_progress("1", 5, 10) + event = json.loads(capsys.readouterr().out.strip()) + assert event["event"] == "node_progress" + assert event["class_type"] == "EmptyLatentImage" + assert event["title"] == "Latent" + assert event["value"] == 5 + assert event["max"] == 10 + + def test_emit_node_handlers_coerce_node_id_to_str(self, capsys, simple_workflow): + """If the server ever sends an int node_id, emit_* must coerce to str.""" + e = JsonEmitter(json_mode=True) + e.set_workflow(simple_workflow) + e.set_client_id("c") + e.emit_node_executing(2) + e.emit_node_progress(2, 1, 10) + e.emit_node_cached(2) + e.emit_node_executed(2, []) + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + for ev in events: + assert isinstance(ev["node_id"], str), f"{ev['event']} node_id is {type(ev['node_id']).__name__}" + assert ev["node_id"] == "2" + e.emit_completed() + completed = json.loads(capsys.readouterr().out.strip()) + assert all(isinstance(nid, str) for nid in completed["cached_node_ids"]) + assert all(isinstance(nid, str) for nid in completed["executed_node_ids"]) + + def test_completed_aggregates_outputs_and_node_ids(self, capsys, simple_workflow): + e = JsonEmitter(json_mode=True) + e.set_workflow(simple_workflow) + e.set_client_id("c") + e.emit_node_cached("1") + out1 = { + "category": "images", + "node_id": "2", + "class_type": "SaveImage", + "title": "Save", + "filename": "x.png", + "subfolder": "", + "type": "output", + "url": "http://x", + } + e.emit_node_executed("2", [out1]) + e.emit_completed() + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + completed = events[-1] + assert completed["event"] == "completed" + assert completed["cached_node_ids"] == ["1"] + assert completed["executed_node_ids"] == ["2"] + assert completed["outputs"] == [out1] + assert isinstance(completed["elapsed_seconds"], float) + assert completed["elapsed_seconds"] >= 0 + + def test_cached_and_executed_can_overlap(self, capsys, simple_workflow): + """Cached output-bearing nodes emit both execution_cached and executed.""" + e = JsonEmitter(json_mode=True) + e.set_workflow(simple_workflow) + e.set_client_id("c") + e.emit_node_cached("2") + e.emit_node_executed("2", []) + e.emit_completed() + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + completed = events[-1] + assert "2" in completed["cached_node_ids"] + assert "2" in completed["executed_node_ids"] + + def test_failed_event_carries_universal_and_extras(self, capsys): + e = JsonEmitter(json_mode=True) + e.set_client_id("c") + e.emit_failed("client_error", "Bad request", status_code=401, body="unauthorized") + event = json.loads(capsys.readouterr().out.strip()) + assert event["event"] == "failed" + assert event["error"]["kind"] == "client_error" + assert event["error"]["message"] == "Bad request" + assert event["error"]["status_code"] == 401 + assert event["error"]["body"] == "unauthorized" + assert event["client_id"] == "c" + assert event["prompt_id"] is None # never set + assert isinstance(event["elapsed_seconds"], float) + + def test_fail_helper_emits_event_and_returns_exit(self, capsys): + # JSON mode: the helper emits a `failed` event, returns a typer.Exit + # (not raised — caller raises so `from e` chaining is clean), and + # does NOT print prose (stdout stays NDJSON-only). + e = JsonEmitter(json_mode=True) + e.set_client_id("c") + result = e.fail("client_error", "Bad request", status_code=403, body="forbidden") + assert isinstance(result, typer.Exit) + assert result.exit_code == 1 + out = capsys.readouterr().out + event = json.loads(out.strip()) + assert event["event"] == "failed" + assert event["error"]["kind"] == "client_error" + assert event["error"]["message"] == "Bad request" + assert event["error"]["status_code"] == 403 + + def test_fail_helper_wraps_text_mode_message_in_bold_red(self, capsys): + # Non-JSON mode: the helper auto-wraps `message` in + # [bold red]...[/bold red] and returns typer.Exit (no event on stdout). + e = JsonEmitter(json_mode=False) + result = e.fail("client_error", "Bad request") + assert isinstance(result, typer.Exit) + out = capsys.readouterr().out + assert "Bad request" in out + # Rich strips markup tags but still applies the formatting; the + # message content must reach stdout. No NDJSON in text mode. + assert "failed" not in out # no event was emitted + + def test_fail_helper_rich_message_overrides_text_only(self, capsys): + # `rich_message` replaces the auto-wrapped text; JSON event still + # carries the original `message`. + e = JsonEmitter(json_mode=True) + e.set_client_id("c") + e.fail("client_error", "machine-readable", rich_message="human-friendly") + event = json.loads(capsys.readouterr().out.strip()) + assert event["error"]["message"] == "machine-readable" + # In text mode it'd flip — verify separately. + e2 = JsonEmitter(json_mode=False) + e2.fail("client_error", "machine-readable", rich_message="human-friendly") + out = capsys.readouterr().out + assert "human-friendly" in out + assert "machine-readable" not in out + + def test_title_falls_back_to_class_type(self, simple_workflow): + e = JsonEmitter(json_mode=True) + # Drop _meta.title from node 1 + wf = {"1": {"class_type": "EmptyLatentImage", "inputs": {}}} + e.set_workflow(wf) + assert e.get_title("1") == "EmptyLatentImage" + + def test_title_falls_back_to_node_id_for_unknown(self): + e = JsonEmitter(json_mode=True) + e.set_workflow({}) + assert e.get_title("unknown") == "unknown" + + def test_ascii_safe_emission(self, capsys): + """ensure_ascii=True: non-ASCII becomes \\u escapes.""" + e = JsonEmitter(json_mode=True) + e.set_client_id("c") + e.emit_failed("workflow_not_found", "found: 猫_00001_.png") + out, _ = capsys.readouterr() + # The wire must contain \u escapes, not raw UTF-8 bytes. + assert "\\u732b" in out + assert "猫" not in out + + +class TestClassifyApiWorkflow: + def test_well_formed(self): + assert _classify_api_workflow({"1": {"class_type": "X", "inputs": {}}})[0] == "ok" + + def test_empty_dict(self): + assert _classify_api_workflow({})[0] == "empty" + + def test_invalid_first_node(self): + assert _classify_api_workflow({"foo": "bar"})[0] == "invalid" + + def test_invalid_not_a_dict(self): + assert _classify_api_workflow([])[0] == "invalid" + + +class TestPreFlightFailures: + """Single `failed` event, prompt_id=null, client_id=null.""" + + def test_workflow_not_found(self, capsys): + events = _run_execute_capture("/nonexistent.json", capsys) + assert len(events) == 1 + assert events[0]["event"] == "failed" + assert events[0]["error"]["kind"] == "workflow_not_found" + assert events[0]["prompt_id"] is None + assert events[0]["client_id"] is None + assert events[0]["schema_version"] == 1 + + def test_workflow_invalid_json(self, capsys): + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + f.write("{ this is not json") + path = f.name + try: + with patch("comfy_cli.command.run.check_comfy_server_running", return_value=True): + events = _run_execute_capture(path, capsys) + assert len(events) == 1 + assert events[0]["error"]["kind"] == "workflow_invalid_json" + finally: + os.unlink(path) + + def test_workflow_read_error_unicode(self, capsys): + with tempfile.NamedTemporaryFile(mode="wb", suffix=".json", delete=False) as f: + f.write(b"\xff\xfe\xfa\x00") # invalid UTF-8 + path = f.name + try: + with patch("comfy_cli.command.run.check_comfy_server_running", return_value=True): + events = _run_execute_capture(path, capsys) + assert len(events) == 1 + assert events[0]["error"]["kind"] == "workflow_read_error" + finally: + os.unlink(path) + + def test_workflow_empty_api(self, capsys): + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({}, f) + path = f.name + try: + with patch("comfy_cli.command.run.check_comfy_server_running", return_value=True): + events = _run_execute_capture(path, capsys) + assert len(events) == 1 + assert events[0]["error"]["kind"] == "workflow_empty" + finally: + os.unlink(path) + + def test_workflow_format_invalid(self, capsys): + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({"foo": "bar"}, f) + path = f.name + try: + with patch("comfy_cli.command.run.check_comfy_server_running", return_value=True): + events = _run_execute_capture(path, capsys) + assert len(events) == 1 + assert events[0]["error"]["kind"] == "workflow_format_invalid" + finally: + os.unlink(path) + + def test_connection_error_server_down(self, workflow_file, capsys): + with patch("comfy_cli.command.run.check_comfy_server_running", return_value=False): + events = _run_execute_capture(workflow_file, capsys) + assert len(events) == 1 + assert events[0]["error"]["kind"] == "connection_error" + + +class TestSuccessfulRun: + def test_no_wait_emits_prompt_preview_then_queued(self, workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p123"}).encode() + events = _run_execute_capture(workflow_file, capsys, wait=False) + # prompt_preview is always emitted in --json before queued so agents + # have a full audit trail of the submitted workflow graph. + assert [e["event"] for e in events] == ["prompt_preview", "queued"] + assert events[0]["prompt"] + assert events[1]["prompt_id"] == "p123" + assert events[1]["validation_warnings"] == [] + + def test_completed_event_after_success(self, workflow_file, capsys): + """Mocked WS flow → expect queued + node_* + completed.""" + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + + def msg(t, **d): + return json.dumps({"type": t, "data": {"prompt_id": "p", **d}}) + + ws_instance.recv.side_effect = [ + msg("executing", node="1"), + msg( + "executed", node="1", output={"images": [{"filename": "x.png", "subfolder": "", "type": "output"}]} + ), + msg("executing", node=None), + ] + events = _run_execute_capture(workflow_file, capsys, wait=True) + + terminal = events[-1] + assert terminal["event"] == "completed" + assert terminal["prompt_id"] == "p" + assert len(terminal["outputs"]) == 1 + assert terminal["outputs"][0]["filename"] == "x.png" + assert terminal["outputs"][0]["category"] == "images" + assert terminal["executed_node_ids"] == ["1"] + + +class TestQueueHttpErrors: + """Verify the 5-way HTTP error mapping for /prompt failures.""" + + def _setup_and_run(self, workflow_file, http_response, capsys, status=None, body=b""): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket"), + ): + if status is None: + # Success path mock + mock_open.return_value.read.return_value = http_response + else: + mock_open.side_effect = _make_http_error(status, body) + return _run_execute_capture(workflow_file, capsys) + + def test_400_with_node_errors_routes_to_validation_error(self, workflow_file, capsys): + body = json.dumps( + { + "error": {"type": "x", "message": "y"}, + "node_errors": {"1": {"errors": [{"type": "z", "message": "bad"}], "class_type": "X"}}, + } + ).encode() + events = self._setup_and_run(workflow_file, None, capsys, status=400, body=body) + terminal = events[-1] + assert terminal["error"]["kind"] == "validation_error" + node_errors = terminal["error"]["node_errors"] + assert isinstance(node_errors, list) + assert any(rec["node_id"] == "1" for rec in node_errors) + + @pytest.mark.parametrize( + "status,body,kind", + [ + (401, b"unauthorized", "client_error"), + (403, b"forbidden", "client_error"), + (429, b"too many", "client_error"), + (500, b"oops", "server_error"), + (503, b"down", "server_error"), + ], + ) + def test_http_status_routes_to_kind(self, workflow_file, capsys, status, body, kind): + events = self._setup_and_run(workflow_file, None, capsys, status=status, body=body) + terminal = events[-1] + assert terminal["error"]["kind"] == kind + assert terminal["error"]["status_code"] == status + assert terminal["error"]["body"] == body.decode() + + def test_200_with_non_json_body_routes_to_invalid_response(self, workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket"), + ): + mock_open.return_value.read.return_value = b"garbage" + events = _run_execute_capture(workflow_file, capsys) + terminal = events[-1] + assert terminal["error"]["kind"] == "invalid_response" + assert terminal["error"]["status_code"] == 200 + + def test_200_without_prompt_id_routes_to_invalid_response(self, workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket"), + ): + mock_open.return_value.read.return_value = json.dumps({"other": "x"}).encode() + events = _run_execute_capture(workflow_file, capsys) + terminal = events[-1] + assert terminal["error"]["kind"] == "invalid_response" + + def test_200_with_utf16_bom_body_routes_to_invalid_response(self, workflow_file, capsys): + # `json.loads(bytes)` sniffs encoding before parsing — a UTF-16 BOM + # makes it raise `UnicodeDecodeError`, not `JSONDecodeError`. + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket"), + ): + mock_open.return_value.read.return_value = b"\x00\x01\xff\xfeNOT JSON \x80\x81" + events = _run_execute_capture(workflow_file, capsys) + terminal = events[-1] + assert terminal["event"] == "failed" + assert terminal["error"]["kind"] == "invalid_response" + assert terminal["error"]["status_code"] == 200 + + def test_url_error_routes_to_connection_error(self, workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket"), + ): + mock_open.side_effect = urllib.error.URLError("refused") + events = _run_execute_capture(workflow_file, capsys) + terminal = events[-1] + assert terminal["error"]["kind"] == "connection_error" + + def test_validation_warnings_on_200_with_partial_node_errors(self, workflow_file, capsys): + """200 + non-empty node_errors → emit `queued` with validation_warnings populated.""" + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + body = json.dumps( + { + "prompt_id": "p", + "node_errors": {"3": {"errors": [{"type": "x", "message": "skipped"}], "class_type": "X"}}, + } + ).encode() + mock_open.return_value.read.return_value = body + ws_instance = MagicMock() + MockWs.return_value = ws_instance + ws_instance.recv.side_effect = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}}), + ] + events = _run_execute_capture(workflow_file, capsys) + queued = next(e for e in events if e["event"] == "queued") + warnings = queued["validation_warnings"] + assert isinstance(warnings, list) + assert any(rec["node_id"] == "3" for rec in warnings) + rec = next(rec for rec in warnings if rec["node_id"] == "3") + assert rec["class_type"] == "X" + assert rec["errors"][0]["message"] == "skipped" + + +class TestWebSocketEvents: + def _run_with_ws_messages(self, workflow_file, recv_side_effect, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + ws_instance.recv.side_effect = recv_side_effect + return _run_execute_capture(workflow_file, capsys) + + def test_websocket_timeout(self, workflow_file, capsys): + events = self._run_with_ws_messages( + workflow_file, + WebSocketTimeoutException("timed out"), + capsys, + ) + terminal = events[-1] + assert terminal["error"]["kind"] == "timeout" + assert isinstance(terminal["error"]["timeout_seconds"], float) + + def test_connection_lost_websocket(self, workflow_file, capsys): + events = self._run_with_ws_messages( + workflow_file, + WebSocketException("dropped"), + capsys, + ) + terminal = events[-1] + assert terminal["error"]["kind"] == "connection_lost" + + def test_keyboard_interrupt_emits_execution_interrupted(self, workflow_file, capsys): + events = self._run_with_ws_messages( + workflow_file, + KeyboardInterrupt(), + capsys, + ) + terminal = events[-1] + assert terminal["event"] == "failed" + assert terminal["error"]["kind"] == "execution_interrupted" + + def test_malformed_frame_is_skipped_run_completes(self, workflow_file, capsys): + """We silently skip malformed JSON frames mid-stream. A valid + executing(node=None) frame following the bad one should still + terminate the run normally with `completed`.""" + events = self._run_with_ws_messages( + workflow_file, + ["{not json", json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}})], + capsys, + ) + # No crash, normal completion path reached. + terminal = events[-1] + assert terminal["event"] == "completed" + + def test_execution_error(self, workflow_file, capsys): + messages = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": "1"}}), + json.dumps( + { + "type": "execution_error", + "data": { + "prompt_id": "p", + "node_id": "1", + "node_type": "EmptyLatentImage", + "exception_type": "RuntimeError", + "exception_message": "boom", + "traceback": [' File "x.py"\n', " raise RuntimeError\n"], + }, + } + ), + ] + events = self._run_with_ws_messages(workflow_file, messages, capsys) + terminal = events[-1] + assert terminal["error"]["kind"] == "execution_error" + assert terminal["error"]["node_id"] == "1" + assert terminal["error"]["class_type"] == "EmptyLatentImage" + assert terminal["error"]["exception_type"] == "RuntimeError" + assert terminal["error"]["title"] == "Latent" # from _meta.title + assert isinstance(terminal["error"]["traceback"], str) + assert "raise RuntimeError" in terminal["error"]["traceback"] + + def test_execution_error_node_id_coerced_to_str(self, workflow_file, capsys): + # If ComfyUI ever sends node_id as an int in execution_error (other + # node_id-bearing events all string-coerce defensively), the + # contract still requires a string. + messages = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": "1"}}), + json.dumps( + { + "type": "execution_error", + "data": { + "prompt_id": "p", + "node_id": 7, + "node_type": "EmptyLatentImage", + "exception_type": "RuntimeError", + "exception_message": "boom", + "traceback": [], + }, + } + ), + ] + events = self._run_with_ws_messages(workflow_file, messages, capsys) + terminal = events[-1] + assert terminal["error"]["kind"] == "execution_error" + assert terminal["error"]["node_id"] == "7" + assert isinstance(terminal["error"]["node_id"], str) + + def test_execution_interrupted(self, workflow_file, capsys): + messages = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": "1"}}), + json.dumps({"type": "execution_interrupted", "data": {"prompt_id": "p"}}), + ] + events = self._run_with_ws_messages(workflow_file, messages, capsys) + terminal = events[-1] + assert terminal["error"]["kind"] == "execution_interrupted" + + +class TestOutputObject: + def _exec(self, simple_workflow): + return _make_workflow_execution(simple_workflow, with_progress=True) + + def test_duck_typed_filter_skips_strings(self, simple_workflow, capsys): + """ComfyUI's `text` output key emits a list of strings; the filter must skip non-file shapes.""" + ex = self._exec(simple_workflow) + ex.prompt_id = "p" + ex.on_executed( + { + "node": "2", + "output": { + "text": ["hello"], + "images": [{"filename": "x.png", "subfolder": "", "type": "output"}], + }, + } + ) + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + executed = next(e for e in events if e["event"] == "node_executed") + assert len(executed["outputs"]) == 1 + assert executed["outputs"][0]["category"] == "images" + + def test_duck_typed_filter_skips_booleans(self, simple_workflow, capsys): + """`animated` key emits list of bool — must be skipped.""" + ex = self._exec(simple_workflow) + ex.prompt_id = "p" + ex.on_executed( + { + "node": "2", + "output": { + "animated": [True], + "images": [{"filename": "x.png", "subfolder": "", "type": "output"}], + }, + } + ) + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + executed = next(e for e in events if e["event"] == "node_executed") + assert len(executed["outputs"]) == 1 + + def test_audio_category_recognized(self, simple_workflow, capsys): + ex = self._exec(simple_workflow) + ex.prompt_id = "p" + ex.on_executed( + { + "node": "2", + "output": { + "audio": [{"filename": "a.wav", "subfolder": "sf", "type": "output"}], + }, + } + ) + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + executed = next(e for e in events if e["event"] == "node_executed") + assert executed["outputs"][0]["category"] == "audio" + assert executed["outputs"][0]["filename"] == "a.wav" + assert executed["outputs"][0]["subfolder"] == "sf" + + def test_output_url_has_correct_format(self, simple_workflow, capsys): + ex = self._exec(simple_workflow) + ex.prompt_id = "p" + ex.on_executed( + { + "node": "2", + "output": { + "images": [{"filename": "x.png", "subfolder": "", "type": "output"}], + }, + } + ) + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + url = events[-1]["outputs"][0]["url"] + assert url.startswith("http://127.0.0.1:8188/view?") + assert "filename=x.png" in url + assert "type=output" in url + + def test_missing_subfolder_defaults_to_empty_string(self, simple_workflow, capsys): + ex = self._exec(simple_workflow) + ex.prompt_id = "p" + ex.on_executed( + { + "node": "2", + "output": { + "images": [{"filename": "x.png", "type": "output"}], + }, + } + ) + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + assert events[-1]["outputs"][0]["subfolder"] == "" + + +UI_WORKFLOW = { + "nodes": [ + { + "id": 1, + "type": "EmptyLatentImage", + "inputs": [], + "outputs": [{"name": "LATENT", "type": "LATENT", "links": [10]}], + "widgets_values": [64, 64, 1], + "mode": 0, + }, + { + "id": 2, + "type": "PreviewImage", + "inputs": [{"name": "images", "link": 10}], + "outputs": [], + "mode": 0, + }, + ], + "links": [[10, 1, 0, 2, 0, "IMAGE"]], +} + +OBJECT_INFO = { + "EmptyLatentImage": { + "input": { + "required": { + "width": ["INT", {"default": 512}], + "height": ["INT", {"default": 512}], + "batch_size": ["INT", {"default": 1}], + } + }, + "input_order": {"required": ["width", "height", "batch_size"]}, + "output_node": False, + "display_name": "Empty Latent Image", + }, + "PreviewImage": { + "input": {"required": {"images": ["IMAGE"]}}, + "input_order": {"required": ["images"]}, + "output_node": True, + "display_name": "Preview Image", + }, +} + + +@pytest.fixture +def ui_workflow_file(): + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(UI_WORKFLOW, f) + f.flush() + path = f.name + yield path + os.unlink(path) + + +class TestWorkflowPathExpansion: + """Regression: `~/wf.json` must be expanded before the existence check. + Otherwise scripted callers passing literal `~/...` see a misleading + workflow_not_found.""" + + def test_tilde_path_is_expanded_before_existence_check(self, capsys, monkeypatch, tmp_path): + workflow_path = tmp_path / "wf.json" + workflow_path.write_text(json.dumps({"1": {"class_type": "X", "inputs": {}}})) + monkeypatch.setenv("HOME", str(tmp_path)) + events = _run_execute_capture("~/wf.json", capsys, print_prompt=True) + assert events[0]["event"] == "prompt_preview", events + + def test_tilde_path_to_missing_file_reports_expanded_path(self, capsys, monkeypatch, tmp_path): + monkeypatch.setenv("HOME", str(tmp_path)) + events = _run_execute_capture("~/missing.json", capsys, print_prompt=True) + assert events[0]["event"] == "failed" + assert events[0]["error"]["kind"] == "workflow_not_found" + # The error message should name the resolved path so the user can + # see exactly where we looked. + assert str(tmp_path) in events[0]["error"]["message"] + + +class TestCliRunnerIntegration: + """End-to-end: the typer entry callback chain (consent prompt, decorators, + config init) must not leak any prose to stdout in JSON mode. Direct + `execute()` tests bypass this seam; agents on a fresh machine with + no recorded consent are exactly where the original prompt-corrupts-stream + bug would have hidden.""" + + def _make_workflow_file(self, tmp_path): + wf_path = tmp_path / "wf.json" + wf_path.write_text(json.dumps({"1": {"class_type": "X", "inputs": {}}})) + return str(wf_path) + + def test_cli_json_print_prompt_emits_clean_ndjson(self, tmp_path): + # Smoke: default config state, --json --print-prompt → every stdout + # line is valid JSON with `event` and `schema_version`. + from typer.testing import CliRunner + + from comfy_cli.cmdline import app + + runner = CliRunner() # non-TTY by default + result = runner.invoke( + app, ["run", "--workflow", self._make_workflow_file(tmp_path), "--json", "--print-prompt"] + ) + assert result.exit_code == 0, f"stdout={result.stdout!r}\nexc={result.exception!r}" + lines = [line for line in result.stdout.splitlines() if line.strip()] + assert lines, "expected at least one NDJSON line" + for line in lines: + event = json.loads(line) + assert "event" in event + assert "schema_version" in event + # Consent prompt text must not appear. + assert "Do you agree" not in result.stdout + assert "improve the application" not in result.stdout + + def test_cli_json_with_fresh_consent_state_stays_clean(self, tmp_path): + # The exact regression scenario: a fresh machine where consent has + # never been recorded. The entry callback enables session-only + # tracking via the non-TTY branch (mocked Mixpanel client so no + # network), and the resulting stdout must still be clean NDJSON. + from typer.testing import CliRunner + + from comfy_cli.cmdline import app + from comfy_cli.config_manager import ConfigManager + + _Cls = ConfigManager.__closure__[0].cell_contents + cfg_dir = tmp_path / "config" + cfg_dir.mkdir() + with ( + patch.object(_Cls, "get_config_path", return_value=str(cfg_dir)), + patch("comfy_cli.tracking.mp") as mock_mp, + ): + mock_mp.track.return_value = None + runner = CliRunner() + result = runner.invoke( + app, ["run", "--workflow", self._make_workflow_file(tmp_path), "--json", "--print-prompt"] + ) + assert result.exit_code == 0, f"stdout={result.stdout!r}\nexc={result.exception!r}" + for line in result.stdout.splitlines(): + if not line.strip(): + continue + event = json.loads(line) + assert "event" in event + assert "Do you agree" not in result.stdout + assert "tracking" not in result.stdout.lower() + + +class TestPromptPreviewAlwaysEmitted: + """In JSON mode the converted workflow graph is always emitted as a + `prompt_preview` event before `queued`. Agents debugging conversions + or building an audit trail get full visibility without re-running + with a flag.""" + + def test_api_input_emits_prompt_preview_before_queued(self, workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + ws_instance.recv.side_effect = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}}), + ] + events = _run_execute_capture(workflow_file, capsys) + kinds = [e["event"] for e in events] + assert kinds[0] == "prompt_preview" + assert "queued" in kinds + assert kinds.index("prompt_preview") < kinds.index("queued") + assert events[0]["prompt"]["1"]["class_type"] == "EmptyLatentImage" + + def test_ui_input_emits_converted_then_prompt_preview_then_queued(self, ui_workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.fetch_object_info", return_value=OBJECT_INFO), + patch("comfy_cli.command.run.request.urlopen") as mock_post, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_post.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + ws_instance.recv.side_effect = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}}), + ] + events = _run_execute_capture(ui_workflow_file, capsys) + kinds = [e["event"] for e in events] + # Ordering: converted, prompt_preview, queued (then node_* / completed). + c = kinds.index("converted") + p = kinds.index("prompt_preview") + q = kinds.index("queued") + assert c < p < q + + def test_prompt_preview_excludes_client_id_and_extra_data(self, workflow_file, capsys): + # The audit trail must carry only the workflow graph, never the + # POST envelope's runtime fields (client_id, extra_data with api_key). + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + ws_instance.recv.side_effect = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}}), + ] + events = _run_execute_capture(workflow_file, capsys, api_key="sk-secret") + preview = next(e for e in events if e["event"] == "prompt_preview") + prompt = preview["prompt"] + assert "client_id" not in prompt + assert "extra_data" not in prompt + assert "sk-secret" not in json.dumps(prompt) + + +class TestPrintPrompt: + """`--print-prompt` returns the would-be `/prompt` body and exits 0 + without POSTing. UI input still needs `/object_info`; API input + doesn't touch the server at all.""" + + def test_api_input_emits_prompt_preview_and_no_other_events(self, workflow_file, capsys): + # No server probe, no /object_info fetch — API input is printed as-is. + with ( + patch("comfy_cli.command.run.check_comfy_server_running") as mock_check, + patch("comfy_cli.command.run.fetch_object_info") as mock_fetch, + patch("comfy_cli.command.run.request.urlopen") as mock_post, + ): + events = _run_execute_capture(workflow_file, capsys, print_prompt=True) + assert mock_check.call_count == 0 + assert mock_fetch.call_count == 0 + assert mock_post.call_count == 0 + assert len(events) == 1 + assert events[0]["event"] == "prompt_preview" + assert events[0]["schema_version"] == 1 + assert isinstance(events[0]["prompt"], dict) + assert "1" in events[0]["prompt"] + assert events[0]["prompt"]["1"]["class_type"] == "EmptyLatentImage" + + def test_ui_input_emits_converted_then_prompt_preview(self, ui_workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.fetch_object_info", return_value=OBJECT_INFO), + patch("comfy_cli.command.run.request.urlopen") as mock_post, + ): + events = _run_execute_capture(ui_workflow_file, capsys, print_prompt=True) + assert mock_post.call_count == 0 + assert [e["event"] for e in events] == ["converted", "prompt_preview"] + prompt = events[1]["prompt"] + assert isinstance(prompt, dict) + # The converted prompt should have entries for the UI nodes. + assert len(prompt) >= 1 + for entry in prompt.values(): + assert "class_type" in entry + + def test_ui_input_with_unreachable_object_info_routes_to_connection_error(self, ui_workflow_file, capsys): + # --print-prompt skips the pre-flight server probe, but UI conversion + # still needs /object_info, so an unreachable host surfaces here. + with ( + patch("comfy_cli.command.run.request.urlopen", side_effect=urllib.error.URLError("Connection refused")), + ): + events = _run_execute_capture(ui_workflow_file, capsys, print_prompt=True) + assert events[-1]["event"] == "failed" + assert events[-1]["error"]["kind"] == "connection_error" + + def test_api_input_works_with_offline_server(self, workflow_file, capsys): + # Hard-fail the server probe — the API path must not call it under --print-prompt. + with patch( + "comfy_cli.command.run.check_comfy_server_running", side_effect=AssertionError("must not be called") + ): + events = _run_execute_capture(workflow_file, capsys, print_prompt=True) + assert len(events) == 1 + assert events[0]["event"] == "prompt_preview" + + def test_print_prompt_does_not_include_api_key_or_client_id(self, workflow_file, capsys): + # The prompt_preview body should only carry the workflow graph, + # not the runtime POST envelope (which would otherwise leak the api_key). + events = _run_execute_capture(workflow_file, capsys, print_prompt=True, api_key="sk-secret") + prompt = events[0]["prompt"] + assert "extra_data" not in prompt + assert "client_id" not in prompt + assert "sk-secret" not in json.dumps(prompt) + + def test_print_prompt_text_mode_pretty_prints_json(self, workflow_file, capsys): + try: + execute(workflow_file, host="127.0.0.1", port=8188, print_prompt=True, json_mode=False) + except typer.Exit: + pass + out, _err = capsys.readouterr() + parsed = json.loads(out) + assert "1" in parsed + assert parsed["1"]["class_type"] == "EmptyLatentImage" + + def test_print_prompt_does_not_post_when_workflow_invalid(self, capsys): + # Pre-flight failures (workflow_not_found, workflow_format_invalid) + # still trigger `failed` and exit 1 under --print-prompt. + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({"not": "a workflow"}, f) + path = f.name + try: + events = _run_execute_capture(path, capsys, print_prompt=True) + assert events[-1]["event"] == "failed" + assert events[-1]["error"]["kind"] == "workflow_format_invalid" + finally: + os.unlink(path) + + +class TestConvertedAndConversionErrors: + """UI-input event path and the conversion_error / conversion_crash kinds.""" + + def test_converted_event_for_ui_input(self, ui_workflow_file, capsys): + """Spec lines 84-98: `converted` is the first event when input is UI format.""" + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.fetch_object_info", return_value=OBJECT_INFO), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + ws_instance.recv.side_effect = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}}), + ] + events = _run_execute_capture(ui_workflow_file, capsys) + + assert events[0]["event"] == "converted" + assert events[0]["schema_version"] == 1 + assert events[0]["node_count"] == 2 # the UI workflow has 2 nodes + + def test_conversion_error_kind(self, ui_workflow_file, capsys): + """WorkflowConversionError → kind=conversion_error, no extras.""" + from comfy_cli.workflow_to_api import WorkflowConversionError + + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.fetch_object_info", return_value=OBJECT_INFO), + patch( + "comfy_cli.command.run.convert_ui_to_api", + side_effect=WorkflowConversionError("broken graph"), + ), + ): + events = _run_execute_capture(ui_workflow_file, capsys) + + terminal = events[-1] + assert terminal["error"]["kind"] == "conversion_error" + assert terminal["client_id"] is None # before WorkflowExecution + assert terminal["prompt_id"] is None + + def test_conversion_crash_kind_with_exception_type(self, ui_workflow_file, capsys): + """Unexpected converter crash → kind=conversion_crash with exception_type extra.""" + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.fetch_object_info", return_value=OBJECT_INFO), + patch( + "comfy_cli.command.run.convert_ui_to_api", + side_effect=KeyError("missing field"), + ), + ): + events = _run_execute_capture(ui_workflow_file, capsys) + + terminal = events[-1] + assert terminal["error"]["kind"] == "conversion_crash" + assert terminal["error"]["exception_type"] == "KeyError" + assert terminal["client_id"] is None + assert terminal["prompt_id"] is None + + def test_workflow_empty_after_conversion(self, capsys): + """UI conversion producing {} → workflow_empty.""" + empty_ui = {"nodes": [], "links": []} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(empty_ui, f) + f.flush() + path = f.name + try: + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.fetch_object_info", return_value=OBJECT_INFO), + patch("comfy_cli.command.run.convert_ui_to_api", return_value={}), + ): + events = _run_execute_capture(path, capsys) + assert events[-1]["error"]["kind"] == "workflow_empty" + finally: + os.unlink(path) + + +class TestObjectInfoFailures: + """HTTP and network errors on /object_info.""" + + def test_object_info_unavailable_on_http_error(self, ui_workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + ): + # _make_http_error builds a /prompt URL by default — build the + # /object_info HTTPError inline so the test exercises that path. + mock_open.side_effect = urllib.error.HTTPError( + url="http://127.0.0.1:8188/object_info", + code=503, + msg="HTTP 503", + hdrs=None, + fp=io.BytesIO(b"service unavailable"), + ) + events = _run_execute_capture(ui_workflow_file, capsys) + + terminal = events[-1] + assert terminal["error"]["kind"] == "object_info_unavailable" + assert terminal["error"]["status_code"] == 503 + assert "service unavailable" in terminal["error"]["body"] + assert terminal["client_id"] is None # pre-WorkflowExecution + + def test_object_info_connection_error_on_urlerror(self, ui_workflow_file, capsys): + """URLError on /object_info → connection_error (NOT object_info_unavailable).""" + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + ): + mock_open.side_effect = urllib.error.URLError("connection refused") + events = _run_execute_capture(ui_workflow_file, capsys) + + terminal = events[-1] + assert terminal["error"]["kind"] == "connection_error" + + +class TestNodeCachedIntegration: + """`execution_cached` WS message → node_cached events with class_type / title.""" + + def test_node_cached_event_shape(self, workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + ws_instance.recv.side_effect = [ + json.dumps({"type": "execution_cached", "data": {"prompt_id": "p", "nodes": ["1", "2"]}}), + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}}), + ] + events = _run_execute_capture(workflow_file, capsys) + + cached_events = [e for e in events if e["event"] == "node_cached"] + assert len(cached_events) == 2 + # Node 1 has _meta.title="Latent"; class_type=EmptyLatentImage + n1 = next(e for e in cached_events if e["node_id"] == "1") + assert n1["class_type"] == "EmptyLatentImage" + assert n1["title"] == "Latent" + # Node 2 has _meta.title="Save"; class_type=SaveImage + n2 = next(e for e in cached_events if e["node_id"] == "2") + assert n2["class_type"] == "SaveImage" + assert n2["title"] == "Save" + + # All cached nodes also appear in completed.cached_node_ids + completed = events[-1] + assert completed["event"] == "completed" + assert set(completed["cached_node_ids"]) == {"1", "2"} + + +class TestNodeExecutedFiresEvenWithoutOutputs: + """`node_executed` must fire whenever the server emits `executed` for our + prompt, even when there's no `output` dict or it's empty (outputs=[]).""" + + def _exec(self, simple_workflow): + return _make_workflow_execution(simple_workflow, with_progress=True) + + def test_output_node_id_coerced_to_str(self, simple_workflow, capsys): + # If the server ever sends `node` as an int, every other emit site + # coerces — outputs[i].node_id must too, since the contract says + # node_id is always str. + ex = self._exec(simple_workflow) + ex.prompt_id = "p" + ex.on_executed( + { + "node": 2, + "output": {"images": [{"filename": "x.png", "subfolder": "", "type": "output"}]}, + } + ) + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + executed = next(e for e in events if e["event"] == "node_executed") + assert isinstance(executed["node_id"], str) + assert executed["outputs"] + for out in executed["outputs"]: + assert isinstance(out["node_id"], str), ( + f"outputs[i].node_id leaked non-str: {type(out['node_id']).__name__}" + ) + assert out["node_id"] == "2" + + def test_executed_with_missing_output(self, simple_workflow, capsys): + ex = self._exec(simple_workflow) + ex.prompt_id = "p" + ex.on_executed({"node": "2"}) # no `output` key at all + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + executed = [e for e in events if e["event"] == "node_executed"] + assert len(executed) == 1 + assert executed[0]["outputs"] == [] + assert executed[0]["node_id"] == "2" + + def test_executed_with_non_dict_output(self, simple_workflow, capsys): + ex = self._exec(simple_workflow) + ex.prompt_id = "p" + ex.on_executed({"node": "2", "output": []}) # list instead of dict + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + executed = [e for e in events if e["event"] == "node_executed"] + assert len(executed) == 1 + assert executed[0]["outputs"] == [] + + def test_executed_with_empty_dict_output(self, simple_workflow, capsys): + ex = self._exec(simple_workflow) + ex.prompt_id = "p" + ex.on_executed({"node": "2", "output": {}}) + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + executed = [e for e in events if e["event"] == "node_executed"] + assert len(executed) == 1 + assert executed[0]["outputs"] == [] + + +class TestFormatImagePathDefensive: + """`format_image_path` must be defensive against missing `type` / `subfolder` + keys — the duck-type filter only requires `filename`.""" + + def _exec(self, simple_workflow): + return _make_workflow_execution(simple_workflow, with_progress=True) + + def test_no_keyerror_on_missing_type(self, simple_workflow): + ex = self._exec(simple_workflow) + # Should not raise — `type` missing, should default to "output" + url = ex.format_image_path({"filename": "x.png", "subfolder": ""}) + assert "filename=x.png" in url + assert "type=output" in url + + def test_no_keyerror_on_missing_subfolder(self, simple_workflow): + ex = self._exec(simple_workflow) + url = ex.format_image_path({"filename": "x.png", "type": "output"}) + assert "filename=x.png" in url + + +class TestVerboseNoOpInJsonMode: + """Spec lines 24-25: `--verbose` has no effect in JSON mode. Regression + against a bug where `log_node()` printed Rich-formatted lines to stdout + when verbose=True, corrupting the NDJSON stream.""" + + def test_verbose_does_not_corrupt_json_stream(self, workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + ws_instance.recv.side_effect = [ + json.dumps({"type": "execution_cached", "data": {"prompt_id": "p", "nodes": ["1"]}}), + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": "2"}}), + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}}), + ] + try: + execute( + workflow_file, + host="127.0.0.1", + port=8188, + wait=True, + verbose=True, + timeout=30, + json_mode=True, + ) + except typer.Exit: + pass + out, _err = capsys.readouterr() + for line in out.splitlines(): + line = line.strip() + if not line: + continue + # Any Rich-formatted leak would make json.loads raise on a bare + # "Cached : ..." line. + json.loads(line) + + +class TestErrorPathCoverage: + """Less-trodden paths: /object_info timeout/non-JSON, queue() + TimeoutError/OSError, on_executed/on_progress None guards, on_cached + None entries, two consecutive node_executing pattern.""" + + def _make_workflow(self): + return { + "1": { + "class_type": "EmptyLatentImage", + "inputs": {"width": 64, "height": 64, "batch_size": 1}, + "_meta": {"title": "Latent"}, + }, + "2": { + "class_type": "SaveImage", + "inputs": {"filename_prefix": "x", "images": ["1", 0]}, + }, + } + + def _make_exec(self, workflow): + return _make_workflow_execution(workflow) + + def test_object_info_timeout_routes_to_connection_error(self, capsys): + """fetch_object_info(timeout → connection_error). Previously untested.""" + ui_wf = { + "nodes": [ + { + "id": 1, + "type": "EmptyLatentImage", + "inputs": [], + "outputs": [], + "widgets_values": [64, 64, 1], + "mode": 0, + } + ], + "links": [], + } + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(ui_wf, f) + path = f.name + try: + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen", side_effect=TimeoutError("timed out")), + ): + events = _run_execute_capture(path, capsys) + assert events[-1]["error"]["kind"] == "connection_error" + finally: + os.unlink(path) + + def test_object_info_non_json_body_routes_to_object_info_unavailable(self, capsys): + """fetch_object_info(200 + non-JSON body → object_info_unavailable status_code=200).""" + ui_wf = { + "nodes": [ + { + "id": 1, + "type": "EmptyLatentImage", + "inputs": [], + "outputs": [], + "widgets_values": [64, 64, 1], + "mode": 0, + } + ], + "links": [], + } + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(ui_wf, f) + path = f.name + try: + mock_resp = MagicMock() + mock_resp.read.return_value = b"not json" + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen", return_value=mock_resp), + ): + events = _run_execute_capture(path, capsys) + terminal = events[-1] + assert terminal["error"]["kind"] == "object_info_unavailable" + assert terminal["error"]["status_code"] == 200 + finally: + os.unlink(path) + + def test_queue_timeout_error_routes_to_connection_error(self, workflow_file, capsys): + """queue()'s urlopen TimeoutError → connection_error.""" + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen", side_effect=TimeoutError("post timed out")), + patch("comfy_cli.command.run.WebSocket"), + ): + events = _run_execute_capture(workflow_file, capsys) + assert events[-1]["error"]["kind"] == "connection_error" + + def test_queue_oserror_routes_to_connection_error(self, workflow_file, capsys): + """queue()'s urlopen OSError → connection_error.""" + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen", side_effect=OSError("network unreachable")), + patch("comfy_cli.command.run.WebSocket"), + ): + events = _run_execute_capture(workflow_file, capsys) + assert events[-1]["error"]["kind"] == "connection_error" + + def test_on_executed_none_node_id_does_not_emit(self, capsys): + """If server emits `executed` without `node`, skip rather than emit + a malformed event with node_id=null.""" + wf = self._make_workflow() + ex = self._make_exec(wf) + ex.prompt_id = "p" + # Missing "node" key entirely + ex.on_executed({"output": {"images": [{"filename": "x.png", "subfolder": "", "type": "output"}]}}) + # Explicit None + ex.on_executed({"node": None}) + out, _ = capsys.readouterr() + # No events emitted because we skipped pathological frames + assert out.strip() == "", f"unexpected output for None node: {out!r}" + + def test_on_progress_none_node_id_does_not_emit(self, capsys): + wf = self._make_workflow() + ex = self._make_exec(wf) + ex.prompt_id = "p" + ex.on_progress({"value": 1, "max": 10}) # missing node + ex.on_progress({"node": None, "value": 2, "max": 10}) + out, _ = capsys.readouterr() + assert out.strip() == "" + + @pytest.mark.parametrize("malformed", [None, 42, "string", [1, 2, 3], True]) + def test_on_message_skips_non_dict_payloads(self, capsys, malformed): + # A bad JSON frame (scalar, array, etc.) must not raise out of the + # recv loop — that would tear down the run without a terminal + # `failed` event and break the stream contract. + wf = self._make_workflow() + ex = self._make_exec(wf) + ex.prompt_id = "p" + assert ex.on_message(malformed) is True + out, _err = capsys.readouterr() + assert out == "" + + def test_on_message_skips_when_data_is_not_dict(self, capsys): + wf = self._make_workflow() + ex = self._make_exec(wf) + ex.prompt_id = "p" + # message is a dict but `data` is the wrong shape. + assert ex.on_message({"type": "executing", "data": "not a dict"}) is True + assert ex.on_message({"type": "executing", "data": [1, 2, 3]}) is True + assert ex.on_message({"type": "executing", "data": None}) is True + out, _err = capsys.readouterr() + assert out == "" + + def test_on_executing_skips_when_node_key_missing(self, capsys): + # Missing `node` key is a protocol violation; we skip rather than + # treating it as None (which means "execution done"). + wf = self._make_workflow() + ex = self._make_exec(wf) + ex.prompt_id = "p" + assert ex.on_executing({"prompt_id": "p"}) is True + + def test_on_cached_skips_none_entries(self, capsys): + wf = self._make_workflow() + ex = self._make_exec(wf) + ex.prompt_id = "p" + ex.on_cached({"nodes": ["1", None, "2"]}) + events = [json.loads(line) for line in capsys.readouterr().out.splitlines() if line.strip()] + assert len(events) == 2 + assert {ev["node_id"] for ev in events} == {"1", "2"} + + def test_two_consecutive_node_executing_includes_intermediate(self, workflow_file, capsys): + """`executed_node_ids` is the union of nodes that emitted `node_executing` + OR `node_executed` — intermediate compute nodes that only fire `executing` + are still included so consumers see the complete 'what ran' picture.""" + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + ws_instance.recv.side_effect = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": "1"}}), + # node 2 starts without a node_executed for 1 — intermediate compute node + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": "2"}}), + json.dumps( + { + "type": "executed", + "data": { + "prompt_id": "p", + "node": "2", + "output": {"images": [{"filename": "x.png", "subfolder": "", "type": "output"}]}, + }, + } + ), + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}}), + ] + events = _run_execute_capture(workflow_file, capsys) + completed = events[-1] + assert completed["event"] == "completed" + # Both nodes ran; both should appear in executed_node_ids + # (1 via node_executing only, 2 via both events with dedup) + assert set(completed["executed_node_ids"]) == {"1", "2"} + # And node 2 should only appear once (dedup verified) + assert completed["executed_node_ids"].count("2") == 1 + + +class TestTimeoutAppliesToConnectAndPost: + """`--timeout` must bound every blocking network call (ws.connect, /prompt + POST, ws.recv) so the terminal-event guarantee holds under server hangs.""" + + def test_queue_passes_timeout_to_urlopen(self, workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + # Single executing(node=None) → on_executing returns False → loop exits + ws_instance.recv.side_effect = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}}), + ] + try: + execute( + workflow_file, + host="127.0.0.1", + port=8188, + wait=True, + verbose=False, + timeout=42, + json_mode=True, + ) + except typer.Exit: + pass + _ = capsys.readouterr() + # Verify urlopen was called with timeout=42 + assert mock_open.called + call = mock_open.call_args + timeout_arg = call.kwargs.get("timeout") + if timeout_arg is None and len(call.args) >= 2: + timeout_arg = call.args[1] + assert timeout_arg == 42, f"urlopen not called with timeout=42, got {timeout_arg!r}" + + def test_preflight_probe_passes_timeout(self, workflow_file, capsys): + # Pre-flight probe gets the same --timeout as everything else, + # otherwise a slow-to-respond ComfyUI would be falsely reported + # "not running" by the probe's default 5s. + with patch("comfy_cli.command.run.check_comfy_server_running", return_value=False) as mock_probe: + try: + execute( + workflow_file, + host="127.0.0.1", + port=8188, + timeout=55, + json_mode=True, + ) + except typer.Exit: + pass + _ = capsys.readouterr() + assert mock_probe.called + call = mock_probe.call_args + timeout_arg = call.kwargs.get("timeout") + if timeout_arg is None and len(call.args) >= 3: + timeout_arg = call.args[2] + assert timeout_arg == 55, f"check_comfy_server_running not called with timeout=55, got {timeout_arg!r}" + + def test_connect_passes_timeout_to_ws_connect(self, workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + patch("comfy_cli.command.run.WebSocket") as MockWs, + ): + mock_open.return_value.read.return_value = json.dumps({"prompt_id": "p"}).encode() + ws_instance = MagicMock() + MockWs.return_value = ws_instance + ws_instance.recv.side_effect = [ + json.dumps({"type": "executing", "data": {"prompt_id": "p", "node": None}}), + ] + try: + execute( + workflow_file, + host="127.0.0.1", + port=8188, + wait=True, + verbose=False, + timeout=37, + json_mode=True, + ) + except typer.Exit: + pass + _ = capsys.readouterr() + # Verify ws.connect was called with timeout=37 + assert ws_instance.connect.called + connect_call = ws_instance.connect.call_args + timeout_arg = connect_call.kwargs.get("timeout") + if timeout_arg is None and len(connect_call.args) >= 2: + timeout_arg = connect_call.args[1] + assert timeout_arg == 37, f"ws.connect not called with timeout=37, got {timeout_arg!r}" + + +class TestNoWaitQueueErrorRegression: + """--no-wait + queue HTTPError must not crash on the progress-stop path + (progress is None in --no-wait mode).""" + + def test_no_wait_with_400_emits_validation_error(self, workflow_file, capsys): + with ( + patch("comfy_cli.command.run.check_comfy_server_running", return_value=True), + patch("comfy_cli.command.run.request.urlopen") as mock_open, + ): + body = json.dumps( + { + "error": {"type": "x", "message": "y"}, + "node_errors": {"1": {"errors": [{"type": "z", "message": "bad"}], "class_type": "X"}}, + } + ).encode() + mock_open.side_effect = _make_http_error(400, body) + events = _run_execute_capture(workflow_file, capsys, wait=False) + terminal = events[-1] + assert terminal["error"]["kind"] == "validation_error" + # The big invariant: it didn't crash with AttributeError on `progress.stop()` diff --git a/tests/comfy_cli/test_env_checker.py b/tests/comfy_cli/test_env_checker.py index 00677c6d..bed31756 100644 --- a/tests/comfy_cli/test_env_checker.py +++ b/tests/comfy_cli/test_env_checker.py @@ -52,7 +52,17 @@ def test_non_200_status(self, mock_get): def test_custom_port_and_host(self, mock_get): mock_get.return_value.status_code = 200 check_comfy_server_running(port=9999, host="0.0.0.0") - mock_get.assert_called_with("http://0.0.0.0:9999/history") + mock_get.assert_called_once() + assert mock_get.call_args.args == ("http://0.0.0.0:9999/history",) + # Pin the default timeout — a silent change to this value would + # alter user-visible "is the server up?" behaviour on slow hosts. + assert mock_get.call_args.kwargs["timeout"] == 5.0 + + @patch("comfy_cli.env_checker.requests.get") + def test_caller_can_override_timeout(self, mock_get): + mock_get.return_value.status_code = 200 + check_comfy_server_running(port=8188, host="127.0.0.1", timeout=42) + assert mock_get.call_args.kwargs["timeout"] == 42 class TestEnvChecker: diff --git a/tests/comfy_cli/test_tracking.py b/tests/comfy_cli/test_tracking.py index 5cb7b020..a2e619eb 100644 --- a/tests/comfy_cli/test_tracking.py +++ b/tests/comfy_cli/test_tracking.py @@ -25,6 +25,7 @@ def tracking_module(tmp_path): patch.object(tracking_mod, "cli_version", "test-cli-version"), patch.object(tracking_mod, "tracing_id", "test-tracing-id"), patch.object(tracking_mod, "mp", MagicMock()), + patch.object(tracking_mod, "_session_only_tracking", False), ): yield tracking_mod @@ -138,3 +139,120 @@ def test_install_event_fires_once_across_calls(self, tracking_module): assert tracking_module.mp.track.call_count == 1 tracking_module.init_tracking(True) assert tracking_module.mp.track.call_count == 1 + + +class TestPromptTrackingConsent: + def test_enables_session_only_when_stdin_not_tty(self, tracking_module): + with ( + patch.object(tracking_module.sys.stdin, "isatty", return_value=False), + patch.object(tracking_module.sys.stdout, "isatty", return_value=True), + patch.object(tracking_module.ui, "prompt_confirm_action") as mock_prompt, + ): + tracking_module.prompt_tracking_consent() + mock_prompt.assert_not_called() + assert tracking_module.config_manager.get_bool(constants.CONFIG_KEY_ENABLE_TRACKING) is None + assert tracking_module._session_only_tracking is True + assert tracking_module.user_id is not None + + def test_enables_session_only_when_stdout_not_tty(self, tracking_module): + with ( + patch.object(tracking_module.sys.stdin, "isatty", return_value=True), + patch.object(tracking_module.sys.stdout, "isatty", return_value=False), + patch.object(tracking_module.ui, "prompt_confirm_action") as mock_prompt, + ): + tracking_module.prompt_tracking_consent() + mock_prompt.assert_not_called() + assert tracking_module.config_manager.get_bool(constants.CONFIG_KEY_ENABLE_TRACKING) is None + assert tracking_module._session_only_tracking is True + + def test_session_only_tracking_fires_track_event(self, tracking_module): + with ( + patch.object(tracking_module.sys.stdin, "isatty", return_value=False), + patch.object(tracking_module.sys.stdout, "isatty", return_value=False), + ): + tracking_module.prompt_tracking_consent() + tracking_module.track_event("some_event", {"k": "v"}) + tracking_module.mp.track.assert_called_once() + _, kwargs = tracking_module.mp.track.call_args + assert kwargs["event_name"] == "some_event" + assert kwargs["distinct_id"] is not None + + def test_session_only_persists_user_id(self, tracking_module): + with ( + patch.object(tracking_module.sys.stdin, "isatty", return_value=False), + patch.object(tracking_module.sys.stdout, "isatty", return_value=False), + ): + tracking_module.prompt_tracking_consent() + persisted = tracking_module.config_manager.get(constants.CONFIG_KEY_USER_ID) + assert persisted is not None + assert persisted == tracking_module.user_id + + def test_session_only_survives_unwritable_config(self, tracking_module): + # Read-only / missing config dir (fresh CI, restricted sandbox) must + # not crash the caller mid-typer-callback — otherwise an agent gets + # a Python traceback instead of a structured `failed` event. + with ( + patch.object(tracking_module.sys.stdin, "isatty", return_value=False), + patch.object(tracking_module.sys.stdout, "isatty", return_value=False), + patch.object(tracking_module.config_manager, "set", side_effect=PermissionError("read-only fs")), + ): + tracking_module.prompt_tracking_consent() + # In-memory state is still correct so this process tracks normally. + assert tracking_module._session_only_tracking is True + assert tracking_module.user_id is not None + + def test_session_only_reuses_existing_user_id(self, tracking_module): + existing_id = "existing-uuid-from-prior-run" + tracking_module.config_manager.set(constants.CONFIG_KEY_USER_ID, existing_id) + with ( + patch.object(tracking_module, "user_id", existing_id), + patch.object(tracking_module.sys.stdin, "isatty", return_value=False), + patch.object(tracking_module.sys.stdout, "isatty", return_value=False), + ): + tracking_module.prompt_tracking_consent() + assert tracking_module.user_id == existing_id + assert tracking_module.config_manager.get(constants.CONFIG_KEY_USER_ID) == existing_id + + def test_prompts_when_both_are_tty(self, tracking_module): + with ( + patch.object(tracking_module.sys.stdin, "isatty", return_value=True), + patch.object(tracking_module.sys.stdout, "isatty", return_value=True), + patch.object(tracking_module.ui, "prompt_confirm_action", return_value=False) as mock_prompt, + ): + tracking_module.prompt_tracking_consent() + mock_prompt.assert_called_once() + assert tracking_module.config_manager.get_bool(constants.CONFIG_KEY_ENABLE_TRACKING) is False + assert tracking_module._session_only_tracking is False + + def test_skip_prompt_bypasses_tty_check(self, tracking_module): + with ( + patch.object(tracking_module.sys.stdin, "isatty", return_value=False), + patch.object(tracking_module.sys.stdout, "isatty", return_value=False), + patch.object(tracking_module.ui, "prompt_confirm_action") as mock_prompt, + ): + tracking_module.prompt_tracking_consent(skip_prompt=True, default_value=False) + mock_prompt.assert_not_called() + assert tracking_module.config_manager.get_bool(constants.CONFIG_KEY_ENABLE_TRACKING) is False + assert tracking_module._session_only_tracking is False + + def test_no_op_when_already_configured(self, tracking_module): + tracking_module.config_manager.set(constants.CONFIG_KEY_ENABLE_TRACKING, "True") + with ( + patch.object(tracking_module.sys.stdin, "isatty", return_value=False), + patch.object(tracking_module.sys.stdout, "isatty", return_value=False), + patch.object(tracking_module.ui, "prompt_confirm_action") as mock_prompt, + ): + tracking_module.prompt_tracking_consent() + mock_prompt.assert_not_called() + assert tracking_module.config_manager.get_bool(constants.CONFIG_KEY_ENABLE_TRACKING) is True + assert tracking_module._session_only_tracking is False + + def test_session_only_is_idempotent(self, tracking_module): + with ( + patch.object(tracking_module.sys.stdin, "isatty", return_value=False), + patch.object(tracking_module.sys.stdout, "isatty", return_value=False), + ): + tracking_module.prompt_tracking_consent() + first_user_id = tracking_module.user_id + tracking_module.prompt_tracking_consent() + assert tracking_module.user_id == first_user_id