Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ xtest/sdk/java/cmdline.jar
/xtest/otdfctl/

/tmp/

# Multi-instance test harness state (DSPX-3302). Per-instance config, logs, and
# keys live under tests/instances/; otdf-sdk-mgr install scenario writes
# .installed.json next to each scenarios.yaml.
/instances/
xtest/scenarios/*.installed.json
.claude/tmp/
4 changes: 4 additions & 0 deletions otdf-local/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"httpx>=0.27.0",
"otdf-sdk-mgr",
"pydantic-settings>=2.2.0",
"rich>=13.7.0",
"ruamel.yaml>=0.18.0",
"typer>=0.12.0",
]

[tool.uv.sources]
otdf-sdk-mgr = { path = "../otdf-sdk-mgr", editable = true }

[dependency-groups]
dev = [
"pyright>=1.1.408",
Expand Down
28 changes: 26 additions & 2 deletions otdf-local/src/otdf_local/cli.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Typer CLI for otdf_local - OpenTDF test environment management."""

import json
import os
import shutil
import sys
import time
from typing import Annotated
from pathlib import Path
from typing import Annotated, Optional

import httpx
import typer
Expand Down Expand Up @@ -44,6 +46,18 @@
)


def _register_subapps() -> None:
"""Defer imports so the schema dependency only loads when needed."""
from otdf_local.cli_instance import instance_app
from otdf_local.cli_scenario import scenario_app

app.add_typer(instance_app, name="instance")
app.add_typer(scenario_app, name="scenario")


_register_subapps()


def _show_provision_error(result: ProvisionResult, target: str) -> None:
"""Display provisioning error with stderr details."""
print_error(f"{target} provisioning failed (exit code {result.return_code})")
Expand Down Expand Up @@ -75,9 +89,19 @@ def main(
is_eager=True,
),
] = False,
instance: Annotated[
Optional[str],
typer.Option(
"--instance",
help='Named instance under tests/instances/. Defaults to "default" (or $OTDF_LOCAL_INSTANCE_NAME).',
),
] = None,
) -> None:
"""OpenTDF test environment management CLI."""
pass
if instance is not None:
os.environ["OTDF_LOCAL_INSTANCE_NAME"] = instance
# Invalidate the cached Settings so subsequent commands see the new value
get_settings.cache_clear()
Comment on lines +101 to +104
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The --instance flag correctly updates the environment and clears the settings cache. However, the up command in this file (specifically at lines 192 and 225) still relies on static Ports constants for health checks and port resolution. This will cause health checks to fail when a non-default instance with a different ports_base is active. The up command should be updated to use the instance-aware settings.get_kas_port(name) or the port property of the service instances, and it should iterate over the instances managed by kas_manager instead of Ports.all_kas_names().



@app.command()
Expand Down
183 changes: 183 additions & 0 deletions otdf-local/src/otdf_local/cli_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""`otdf-local instance` subcommands: init / ls / rm."""

from __future__ import annotations

import shutil
from pathlib import Path
from typing import Annotated, Optional

import typer
from otdf_sdk_mgr.schema import Instance, Metadata, PlatformPin, PortsConfig, dump_instance

from otdf_local.config.settings import get_settings

instance_app = typer.Typer(help="Manage named test environment instances.")


@instance_app.command("init")
def init(
name: Annotated[str, typer.Argument(help="Instance name (used as directory name)")],
from_scenario: Annotated[
Optional[Path],
typer.Option("--from-scenario", help="Initialize from a scenarios.yaml or instance.yaml"),
] = None,
ports_base: Annotated[
int,
typer.Option("--ports-base", help="Base port (KAS ports computed as base+N*101)"),
] = 8080,
platform_dist: Annotated[
Optional[str],
typer.Option("--platform", help="Platform dist version (e.g., v0.9.0)"),
] = None,
) -> None:
"""Scaffold a new instance directory at tests/instances/<name>/."""
settings = get_settings()
instance_dir = settings.instances_root / name

if from_scenario is not None:
_init_from_scenario(name, from_scenario, instance_dir)
else:
if platform_dist is None:
typer.echo("Error: --platform <dist> is required when not using --from-scenario", err=True)
raise typer.Exit(2)
_init_minimal(name, instance_dir, ports_base, platform_dist)

_validate_port_uniqueness(settings.instances_root, name)
typer.echo(f" Initialized instance '{name}' at {instance_dir}")


def _init_from_scenario(name: str, scenario_path: Path, instance_dir: Path) -> None:
"""Copy the embedded Instance from a Scenario or load a standalone Instance."""
from otdf_sdk_mgr.schema import load_instance, load_scenario
from ruamel.yaml import YAML

y = YAML(typ="safe")
raw = y.load(scenario_path.read_text())
if not isinstance(raw, dict):
raise typer.BadParameter(f"{scenario_path} top-level YAML must be a mapping")
kind = raw.get("kind")
if kind == "Scenario":
scenario = load_scenario(scenario_path)
instance = scenario.instance
elif kind == "Instance":
instance = load_instance(scenario_path)
else:
raise typer.BadParameter(f"{scenario_path} has unknown kind {kind!r}")
# Ensure the metadata name matches the chosen directory name.
instance.metadata = Metadata(**{**instance.metadata.model_dump(exclude_none=True), "name": name})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Updating the metadata name by dumping and re-creating the entire Metadata object is unnecessarily complex and inefficient. Since Pydantic models are mutable by default, you can update the field directly on the existing object.

Suggested change
instance.metadata = Metadata(**{**instance.metadata.model_dump(exclude_none=True), "name": name})
instance.metadata.name = name

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This line is quite verbose. Since Metadata is a Pydantic model, you can use model_copy with the update parameter to achieve the same result more cleanly.

Suggested change
instance.metadata = Metadata(**{**instance.metadata.model_dump(exclude_none=True), "name": name})
instance.metadata = instance.metadata.model_copy(update={"name": name})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since Metadata is a Pydantic model, you can use model_copy with the update parameter to modify the name. This is more idiomatic and concise than dumping to a dict and reconstructing the model.

Suggested change
instance.metadata = Metadata(**{**instance.metadata.model_dump(exclude_none=True), "name": name})
instance.metadata = instance.metadata.model_copy(update={"name": name})

instance_dir.mkdir(parents=True, exist_ok=True)
(instance_dir / "kas").mkdir(parents=True, exist_ok=True)
(instance_dir / "keys").mkdir(mode=0o700, parents=True, exist_ok=True)
(instance_dir / "logs").mkdir(parents=True, exist_ok=True)
dump_instance(instance, instance_dir / "instance.yaml")


def _init_minimal(name: str, instance_dir: Path, ports_base: int, platform_dist: str) -> None:
"""Create a barebones instance.yaml with default KAS layout."""
instance = Instance(
metadata=Metadata(name=name),
platform=PlatformPin(dist=platform_dist),
ports=PortsConfig(base=ports_base),
kas={},
)
instance_dir.mkdir(parents=True, exist_ok=True)
(instance_dir / "kas").mkdir(parents=True, exist_ok=True)
(instance_dir / "keys").mkdir(mode=0o700, parents=True, exist_ok=True)
(instance_dir / "logs").mkdir(parents=True, exist_ok=True)
dump_instance(instance, instance_dir / "instance.yaml")


def _validate_port_uniqueness(instances_root: Path, new_name: str) -> None:
"""Warn if another instance shares the same `ports.base`."""
from otdf_sdk_mgr.schema import load_instance

new_yaml = instances_root / new_name / "instance.yaml"
if not new_yaml.exists():
return
new_inst = load_instance(new_yaml)
new_base = new_inst.ports.base
if not instances_root.exists():
return
for child in instances_root.iterdir():
if not child.is_dir() or child.name == new_name:
continue
other_yaml = child / "instance.yaml"
if not other_yaml.is_file():
continue
try:
other = load_instance(other_yaml)
except Exception:
continue
if other.ports.base == new_base:
typer.echo(
f" Warning: instance '{child.name}' already uses ports.base={new_base}; "
f"running both simultaneously will collide. Change one with `otdf-local instance init`.",
err=True,
)


@instance_app.command("ls")
def ls(
as_json: Annotated[bool, typer.Option("--json", "-j", help="Emit JSON")] = False,
) -> None:
"""List known instances."""
import json as _json

from otdf_sdk_mgr.schema import load_instance

settings = get_settings()
root = settings.instances_root
if not root.exists():
if as_json:
typer.echo(_json.dumps([]))
else:
typer.echo(" (no instances yet)")
return
rows: list[dict[str, object]] = []
for child in sorted(root.iterdir()):
if not child.is_dir():
continue
ymp = child / "instance.yaml"
if not ymp.is_file():
continue
try:
inst = load_instance(ymp)
except Exception as e:
rows.append({"name": child.name, "error": str(e)})
continue
rows.append(
{
"name": child.name,
"platform": (
inst.platform.dist
or (inst.platform.source.ref if inst.platform.source else inst.platform.image)
),
"ports_base": inst.ports.base,
"kas": list(inst.kas.keys()),
}
)
if as_json:
typer.echo(_json.dumps(rows, indent=2))
else:
for row in rows:
typer.echo(f" {row}")


@instance_app.command("rm")
def rm(
name: Annotated[str, typer.Argument(help="Instance to remove")],
yes: Annotated[bool, typer.Option("--yes", "-y", help="Skip confirmation")] = False,
) -> None:
"""Remove an instance directory."""
settings = get_settings()
instance_dir = settings.instances_root / name
if not instance_dir.exists():
typer.echo(f"Error: instance '{name}' not found at {instance_dir}", err=True)
raise typer.Exit(1)
if not yes:
confirm = typer.confirm(f"Delete {instance_dir}?", default=False)
if not confirm:
typer.echo("aborted")
raise typer.Exit(1)
shutil.rmtree(instance_dir)
typer.echo(f" Removed {instance_dir}")
101 changes: 101 additions & 0 deletions otdf-local/src/otdf_local/cli_scenario.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""`otdf-local scenario` subcommands.

Today's surface area is intentionally narrow — `run` is the only command
that's part of the bug-repro MVP. Bisect and other higher-level loops are
deferred (see plan §9).
"""

from __future__ import annotations

import os
import subprocess
from pathlib import Path
from typing import Annotated

import typer
from otdf_sdk_mgr.schema import (
Scenario,
installed_json_for,
load_scenario,
scenario_to_pytest_sdks,
)

from otdf_local.config.settings import get_settings

scenario_app = typer.Typer(help="Run scenarios.yaml against a healthy instance.")


def _build_pytest_args(scenario: Scenario, scenario_path: Path) -> list[str]:
"""Translate the scenario's `suite` block into pytest CLI args.

SDK pins go through `scenario_to_pytest_sdks` so they're forwarded as
the `sdk@<resolved-dist>` tokens xtest's #446 specifier format expects.
Requires that `otdf-sdk-mgr install scenario` has been run first; the
helper raises FileNotFoundError with a clean hint otherwise.
"""
suite = scenario.suite
args: list[str] = [suite.select]

tokens = scenario_to_pytest_sdks(scenario, installed_json_for(scenario_path))
if tokens["encrypt"]:
args.extend(["--sdks-encrypt", " ".join(tokens["encrypt"])])
if tokens["decrypt"]:
args.extend(["--sdks-decrypt", " ".join(tokens["decrypt"])])
if suite.containers:
args.extend(["--containers", suite.containers])
if suite.markers:
args.extend(["-m", suite.markers])
args.extend(suite.extra_args)
return args


@scenario_app.command("run")
def run(
path: Annotated[Path, typer.Argument(help="Path to scenarios.yaml")],
instance: Annotated[
str | None,
typer.Option(
"--instance",
help="Override which instance to use (defaults to scenario.instance.metadata.name)",
),
] = None,
extra: Annotated[
list[str] | None,
typer.Argument(help="Extra args passed through to pytest (after --)"),
] = None,
) -> None:
"""Run the pytest suite declared by the scenario against its instance."""
if not path.exists():
typer.echo(f"Error: {path} not found", err=True)
raise typer.Exit(1)

scenario = load_scenario(path)
instance_name = instance or scenario.instance.metadata.name
if not instance_name:
typer.echo("Error: scenario.instance.metadata.name not set; pass --instance", err=True)
raise typer.Exit(2)

settings = get_settings()
# Force the chosen instance via env so child pytest invocations agree.
os.environ["OTDF_LOCAL_INSTANCE_NAME"] = instance_name

xtest_root = settings.xtest_root
if not xtest_root.exists():
typer.echo(f"Error: xtest root not found at {xtest_root}", err=True)
raise typer.Exit(1)

try:
pytest_args = _build_pytest_args(scenario, path)
except FileNotFoundError as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
except ValueError as e:
typer.echo(f"Error: {e}", err=True)
raise typer.Exit(1)
if extra:
pytest_args.extend(extra)

cmd = ["uv", "run", "pytest", *pytest_args]
typer.echo(f" Running: {' '.join(cmd)} (cwd={xtest_root})")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using ' '.join(cmd) for display can be misleading if any of the arguments contain spaces. It is safer to use shlex.join to format the command string for the console.

Suggested change
typer.echo(f" Running: {' '.join(cmd)} (cwd={xtest_root})")
import shlex
typer.echo(f" Running: {shlex.join(cmd)} (cwd={xtest_root})")

completed = subprocess.run(cmd, cwd=xtest_root)
raise typer.Exit(completed.returncode)
Loading
Loading