diff --git a/.github/workflows/docker/docker-compose.yaml b/.github/workflows/docker/docker-compose.yaml index 0ea1026f3f0..544916bdd93 100644 --- a/.github/workflows/docker/docker-compose.yaml +++ b/.github/workflows/docker/docker-compose.yaml @@ -1,6 +1,6 @@ services: trinity-node-1: - image: trinity-rft-unittest:20260420 + image: trinity-rft-unittest:20260506 cap_add: - SYS_PTRACE pull_policy: never @@ -34,7 +34,7 @@ services: capabilities: [gpu] trinity-node-2: - image: trinity-rft-unittest:20260420 + image: trinity-rft-unittest:20260506 cap_add: - SYS_PTRACE pull_policy: never diff --git a/docs/sphinx_doc/source/tutorial/example_megatron.md b/docs/sphinx_doc/source/tutorial/example_megatron.md index b8d0831d2af..e852cfd8f33 100644 --- a/docs/sphinx_doc/source/tutorial/example_megatron.md +++ b/docs/sphinx_doc/source/tutorial/example_megatron.md @@ -39,10 +39,10 @@ We provide a Docker setup to simplify environment management. #### Build the Docker Image -Trinity-RFT provides a dedicated Dockerfile for Megatron-LM located at `scripts/docker/Dockerfile.megatron`. You can build the image using the following command: +Trinity-RFT's provided Docker already has Megatron-LM related dependencies pre-installed. You can either use our provided Docker image directly or customize the Dockerfile to build your own image as needed. ```bash -docker build -f scripts/docker/Dockerfile.megatron -t trinity-rft-megatron:latest . +docker build -f scripts/docker/Dockerfile.uv -t trinity-rft-megatron:latest . ``` > 💡 You can customize the Dockerfile before building — for example, to add pip mirrors or set API keys. @@ -60,6 +60,7 @@ docker run -it \ ``` Replace `` with the actual path on your machine where datasets and model checkpoints are stored. +The image uses `uv` to manage Python dependencies, the virtual environment will be automatically activated after entering the docker container (you can also manually activate it with `source /opt/venv/bin/activate`). The image has include dependencies such as vllm, flash-attn and Megatron-LM, if you need to download more packages, don't forget to activate the virtual environment and use `uv pip install` to install them. --- diff --git a/docs/sphinx_doc/source/tutorial/metrics_reference.md b/docs/sphinx_doc/source/tutorial/metrics_reference.md index 205e9eb7af8..66cf67397ea 100644 --- a/docs/sphinx_doc/source/tutorial/metrics_reference.md +++ b/docs/sphinx_doc/source/tutorial/metrics_reference.md @@ -164,7 +164,7 @@ This category includes metrics that track the training dynamics of the policy (a This category includes metrics that track the processing of experiences through various pipeline operators (`experience_pipeline/`) and data sampling statistics (`sample/`). These metrics are aggregated at the step level, as the experience pipeline and data sampling are performed in each step. -#### Experience Pipeline Metrics (`experience_pipeline/` and `time/experience_pipeline/`) +#### Experience Pipeline Metrics (`experience_pipeline/` and `experience_pipeline/time/`) Experience pipeline metrics track the processing of experiences through various pipeline operators. Each metric represents the count of the specific operator in one step. diff --git a/docs/sphinx_doc/source/tutorial/trinity_installation.md b/docs/sphinx_doc/source/tutorial/trinity_installation.md index a1abd379f9d..1a8f647ff0e 100644 --- a/docs/sphinx_doc/source/tutorial/trinity_installation.md +++ b/docs/sphinx_doc/source/tutorial/trinity_installation.md @@ -126,7 +126,7 @@ cd Trinity-RFT # Build the Docker image ## Tip: You can modify the Dockerfile to add mirrors or set API keys -docker build -f scripts/docker/Dockerfile -t trinity-rft:latest . +docker build -f scripts/docker/Dockerfile.uv -t trinity-rft:latest . # Run the container, replacing with your actual path docker run -it \ diff --git a/docs/sphinx_doc/source_zh/tutorial/example_megatron.md b/docs/sphinx_doc/source_zh/tutorial/example_megatron.md index f0eaa632791..fc671ba5bfb 100644 --- a/docs/sphinx_doc/source_zh/tutorial/example_megatron.md +++ b/docs/sphinx_doc/source_zh/tutorial/example_megatron.md @@ -44,11 +44,10 @@ pip install -v --disable-pip-version-check --no-cache-dir --no-build-isolation \ #### 构建 Docker 镜像 -Trinity-RFT 提供了专门用于 Megatron-LM 的 Dockerfile,位于 `scripts/docker/Dockerfile.megatron`。 -可以使用以下命令构建镜像: +Trinity-RFT 提供的 Docker 已经预装了 Megatron-LM 相关依赖。你可以直接使用我们提供的 Docker 镜像,或者根据需要自定义 Dockerfile 来构建镜像。 ```bash -docker build -f scripts/docker/Dockerfile.megatron -t trinity-rft-megatron:latest . +docker build -f scripts/docker/Dockerfile.uv -t trinity-rft:latest . ``` > 💡 你可以在构建前自定义 Dockerfile —— 例如添加 pip 镜像源或设置 API 密钥。 @@ -62,10 +61,11 @@ docker run -it \ --rm \ -v $PWD:/workspace \ -v :/data \ - trinity-rft-megatron:latest + trinity-rft:latest ``` 请将 `` 替换为你机器上存储数据集和模型检查点的实际路径。 +该镜像使用 `uv` 来管理 Python 依赖,进入容器后虚拟环境会自动激活(也可通过 `source /opt/venv/bin/activate` 手动激活)。该镜像已经包含了 vllm, flash-attn 以及 Megatron-LM,如果需要使用其他依赖,可直接使用 `uv pip install` 来安装它们。 --- diff --git a/docs/sphinx_doc/source_zh/tutorial/metrics_reference.md b/docs/sphinx_doc/source_zh/tutorial/metrics_reference.md index 0fad6c42a23..a3e9792c701 100644 --- a/docs/sphinx_doc/source_zh/tutorial/metrics_reference.md +++ b/docs/sphinx_doc/source_zh/tutorial/metrics_reference.md @@ -165,7 +165,7 @@ graph TD 此类别包括跟踪通过各种数据处理操作(`experience_pipeline/`)和数据采样统计(`sample/`)的指标。这些指标在步骤(step)级别计算,因为 experience 处理和数据采样会在每个步骤中执行一次。 -#### Experience Pipeline 相关指标(`experience_pipeline/` 和 `time/experience_pipeline/`) +#### Experience Pipeline 相关指标(`experience_pipeline/` 和 `experience_pipeline/time/`) Experience Pipeline 相关的指标统计了和数据处理相关的值,每个指标表示一个步骤中特定操作的统计值。 diff --git a/docs/sphinx_doc/source_zh/tutorial/trinity_installation.md b/docs/sphinx_doc/source_zh/tutorial/trinity_installation.md index f3eed4cadd8..a50d20b70f3 100644 --- a/docs/sphinx_doc/source_zh/tutorial/trinity_installation.md +++ b/docs/sphinx_doc/source_zh/tutorial/trinity_installation.md @@ -127,7 +127,7 @@ cd Trinity-RFT # 构建 Docker 镜像 ## 提示:可根据需要修改 Dockerfile 添加镜像源或设置 API 密钥 -docker build -f scripts/docker/Dockerfile -t trinity-rft:latest . +docker build -f scripts/docker/Dockerfile.uv -t trinity-rft:latest . # 运行容器,请将 替换为实际需要挂载的路径 docker run -it \ diff --git a/perf/scripts/explorer/README.md b/perf/scripts/explorer/README.md new file mode 100644 index 00000000000..c7beddbd30d --- /dev/null +++ b/perf/scripts/explorer/README.md @@ -0,0 +1,376 @@ +# Explorer 性能评测工具 + +## 目标 + +该工具用于评测 Trinity-RFT 中 Explorer 模块的运行性能,不关注模型训练效果,也不复用 Trinity 主流程中的 benchmark 或 eval 语义。 + +第一版的设计目标如下: + +1. 不修改 Trinity 主流程代码,只在 perf 目录下独立实现。 +2. 资源采集能力可独立复用,后续可直接供 trainer perf 使用。 +3. 吞吐量和任务平均完成时间优先复用 Explorer 已有 step metrics。 +4. 同时提供全局汇总指标和 step 级指标。 +5. 单次运行完成后直接输出汇总结果,不考虑 warmup 和多轮重复实验。 + +## 统计范围 + +该工具计划统计以下性能指标: + +1. 初始化时间:从开始启动到初始化完成的时间。 +2. 吞吐量:单位时间内完成的任务数量,单位为 task / min。 +3. 每个任务的平均完成时间:单位为 sec / task。 +4. 资源使用情况:CPU、GPU 利用率、内存以及 GPU 显存使用情况,按固定间隔采样,并保留时序序列。 +5. step 级性能:每个 step 的任务完成数、吞吐量、平均任务耗时以及对应原始 rollout 指标。 + +## 边界约束 + +该工具的边界约束如下: + +1. 不修改 [trinity/explorer/explorer.py](/root/Trinity-RFT/trinity/explorer/explorer.py) 或其他主流程模块。 +2. 不把资源采集逻辑塞进现有 monitor 框架。 +3. 不将训练效果评估类的 benchmark 或 eval 指标混入性能结果。 +4. 不做 warmup、对照实验或多次重复取均值。 +5. 第一版默认运行环境具备可用 GPU,不额外兼容无 GPU 场景。 +6. 第一版优先支持单次本地运行和结果落盘。 + +## 设计概览 + +整体思路是把 Explorer perf 拆成两条完全独立的链路: + +1. 运行链路:启动 Explorer,统计启动耗时和总运行时间。 +2. 观测链路:在外部独立采集系统资源,并从 TensorBoard 文件读取 step metrics。 + +其中: + +1. 资源数据来自 perf 下的独立采样模块。 +2. 吞吐量和任务平均完成时间来自 Explorer monitor 产出的 step metrics。 +3. 汇总逻辑在 perf 脚本内完成,不侵入 Trinity 现有实现。 + +## 推荐目录结构 + +建议按如下方式组织代码: + +```text +perf/ + scripts/ + explorer/ + README.md + example.yaml +trinity/ + perf/ + __init__.py + stage_perf.py + tensorboard_metrics.py + report_utils.py + resource_backends.py + resource_sampler.py +``` + +各文件职责建议如下: + +1. `trinity/perf/resource_backends.py` + 封装资源采集后端,例如 `psutil` 和 `pynvml`。 +2. `trinity/perf/resource_sampler.py` + 提供独立资源采样器,支持启动、停止、导出原始样本和聚合统计。 +3. `trinity/perf/report_utils.py` + 提供时间序列聚合、百分位数计算和统一 JSON 序列化能力。 +4. `trinity/perf/stage_perf.py` + 负责 Explorer perf 的单次运行编排和结果落盘。 +5. `perf/scripts/explorer/example.yaml` + 提供最小可运行的 Trinity Explorer 配置样例。 + +这种拆分方式的核心目的是让资源采集模块天然进入 `trinity` 命名空间,后续 trainer perf 可以直接复用 `trinity.perf.*`。 + +## 运行流程草案 + +Explorer perf 脚本建议按以下阶段执行: + +1. 读取 Trinity 配置文件并校验 `mode: explore`。 +2. 校验 `monitor.monitor_type == tensorboard`。 +3. 初始化资源采样器并启动后台采样。 +4. 创建 Explorer actor。 +5. 单独计时 `prepare.remote()`,得到启动耗时。 +6. 执行 `sync_weight.remote()`。 +7. 执行 `explore.remote()` 直到运行结束。 +8. 停止资源采样。 +9. 解析 TensorBoard 本地文件,提取 step 级 metrics。 +10. 聚合资源指标和 Explorer step 指标。 +11. 输出 JSON 结果到指定路径。 + +这里的“启动耗时”定义为: + +1. 从 perf 脚本开始创建 Explorer actor。 +2. 到 `prepare.remote()` 成功返回为止。 + +这样可以覆盖模型准备、rollout coordinator 准备等初始化成本,同时保持对主流程零侵入。 + +## 指标来源草案 + +### 资源指标 + +资源指标由 perf 目录下的独立采样模块提供,建议采样字段如下: + +1. `timestamp` +2. `cpu_percent` +3. `memory_rss_mb` +4. `memory_percent` +5. `gpu_metrics` + +其中 `gpu_metrics` 建议按卡记录,例如: + +1. `gpu_id` +2. `gpu_util_percent` +3. `gpu_memory_used_mb` +4. `gpu_memory_total_mb` + +第一版建议优先支持整机级采样,不强制做按 Ray actor 或 PID 树聚合。 + +资源序列的展示目标如下: + +1. CPU 只保留一条时间线。 +2. GPU 为每张卡分别保留一条时间线。 +3. 结果格式优先为后续折线图绘制服务,而不是做离线聚合统计。 + +### Explorer 运行指标 + +Explorer 运行指标优先从 TensorBoard 事件文件提取,原因如下: + +1. 不需要修改 Explorer 主流程。 +2. Explorer 已有 monitor 写本地标量文件。 +3. step 级指标能够自然复用,不需要重新推导内部状态。 + +因此,第一版建议明确要求 monitor 使用 `tensorboard`。 + +## 吞吐量与平均任务耗时口径 + +建议统一采用以下统计口径: + +1. step 吞吐量:`finished_task_count / step_time_sec * 60` +2. step 平均任务耗时:`step_time_sec / finished_task_count` +3. 全局吞吐量:`sum(finished_task_count) / sum(step_time_sec) * 60` +4. 全局平均任务耗时:`sum(step_time_sec) / sum(finished_task_count)` + +实现时优先直接读取 TensorBoard 中已有的 step 级时间类指标。如果不同配置场景下字段名存在差异,建议在 perf 代码中维护字段映射表,而不要把具体字段名散落在业务逻辑中。 + +## 输出结果结构草案 + +结果文件建议输出为一个 JSON 文档,结构如下: + +```json +{ + "run_meta": {}, + "timing": {}, + "resource_timeline": [], + "step_metrics": [], + "global_metrics": {}, + "artifacts": {}, + "status": {} +} +``` + +各字段建议含义如下: + +### `run_meta` + +记录一次 perf 运行的基础信息: + +1. config 路径 +2. explorer 名称 +3. 采样间隔 +4. 启动时间 +5. hostname +6. pid + +### `timing` + +记录关键耗时: + +1. `startup_time_sec` +2. `execution_time_sec` +3. `total_time_sec` + +### `resource_timeline` + +记录原始采样序列,用于后续可视化或 trainer perf 复用。 + +建议至少包含以下结构: + +1. `timestamp` +2. `cpu_percent` +3. `memory_rss_mb` +4. `gpu_metrics` + +其中 `gpu_metrics` 为数组,每个元素对应一张卡,例如: + +1. `gpu_id` +2. `gpu_util_percent` +3. `gpu_memory_used_mb` + +结果组织应优先满足以下可视化需求: + +1. CPU 一条折线。 +2. GPU utilization 按卡多条折线。 +3. GPU memory used 按卡多条折线。 + +### `step_metrics` + +每个 step 一条记录,建议包含: + +1. `step` +2. `finished_task_count` +3. `throughput_task_per_min` +4. `avg_task_time_sec` +5. `raw_metrics` + +### `global_metrics` + +记录全局性能指标,建议至少包含: + +1. `total_finished_task_count` +2. `overall_throughput_task_per_min` +3. `overall_avg_task_time_sec` + +### `artifacts` + +记录排障和追踪所需路径: + +1. `checkpoint_job_dir` +2. `tensorboard_dir` +3. `log_dir` +4. `output_json` + +### `status` + +记录运行状态: + +1. 是否成功完成。 +2. 异常信息。 +3. 是否拿到 GPU 指标。 + +## 命令行接口草案 + +当前建议的命令行形式如下: + +## 使用方法 + +``` +python -m trinity.cli.launcher perf --module explorer --config --output-path [--monitor-interval ] +``` + +建议支持以下参数: + +1. `--config` + Trinity 配置文件路径,要求符合 Trinity 配置规范,且模式为 `explore`。 +2. `--output-path` + 结果 JSON 输出路径。 +3. `--monitor-interval` + 资源采样间隔,默认 5 秒。 +4. `--timeout` + 整次 perf 运行的超时时间,可选。 +5. `--total-steps` + 覆盖配置中的 Explorer 总步数,默认 5。 +6. `--module` + 当前固定为 `explorer`,为后续扩展 trainer perf 预留统一入口。 + +## 配置要求 + +该工具依赖以下配置约束: + +1. `mode` 必须为 `explore`。 +2. `monitor.monitor_type` 必须为 `tensorboard`。 +3. Explorer 本身应能在当前环境下正常启动和运行。 + +如果 monitor 不是 `tensorboard`,建议 perf 工具直接报错退出,而不是在运行时偷偷覆盖用户配置。 + +## 示例结果草案 + +下面给出一个结果结构示意: + +```json +{ + "run_meta": { + "config_path": "perf/scripts/explorer/example.yaml", + "monitor_interval_sec": 5 + }, + "timing": { + "startup_time_sec": 32.5, + "execution_time_sec": 640.2, + "total_time_sec": 672.7 + }, + "resource_timeline": [ + { + "timestamp": 1710000000.0, + "cpu_percent": 71.2, + "memory_rss_mb": 18342.0, + "gpu_metrics": [ + { + "gpu_id": 0, + "gpu_util_percent": 84.0, + "gpu_memory_used_mb": 22134.0 + }, + { + "gpu_id": 1, + "gpu_util_percent": 79.0, + "gpu_memory_used_mb": 21980.0 + } + ] + } + ], + "step_metrics": [ + { + "step": 1, + "finished_task_count": 64, + "throughput_task_per_min": 384.0, + "avg_task_time_sec": 0.156, + "raw_metrics": {} + } + ], + "global_metrics": { + "total_finished_task_count": 1024, + "overall_throughput_task_per_min": 401.7, + "overall_avg_task_time_sec": 0.149 + } +} +``` + +## 实现清单 + +下面是建议的实现顺序: + +1. 定义结果 JSON schema 和字段口径。 +2. 实现 `trinity/perf/resource_backends.py`。 +3. 实现 `trinity/perf/resource_sampler.py`。 +4. 实现 `trinity/perf/report_utils.py`。 +5. 在 `trinity/perf/stage_perf.py` 中完成单次运行编排。 +6. 在 `trinity/perf/stage_perf.py` 中实现 TensorBoard 指标解析。 +7. 补充 `perf/scripts/explorer/example.yaml`。 +8. 补充测试和文档示例。 + +## 测试建议 + +第一版建议优先补以下测试: + +1. 资源采样器可以稳定输出 CPU 单线时序和按卡 GPU 时序。 +3. TensorBoard 解析逻辑可以正确提取 step 级 metrics。 +4. 全局吞吐量和平均任务耗时计算口径正确。 +5. Explorer perf 运行失败时仍能输出可诊断的状态字段。 + +## 已知取舍 + +第一版的取舍如下: + +1. 优先做整机级资源观测,不强制追踪 Ray actor 子进程。 +2. 强依赖 `tensorboard` monitor,不额外兼容 wandb 或 mlflow。 +3. 默认要求运行环境具备 GPU,不额外兼容无 GPU 场景。 +4. 资源结果优先保留时序序列,不在第一版中输出 `mean/max/min/p50/p95` 聚合统计。 +5. 只做单次运行和汇总,不做 warmup 和多次重复实验。 +6. 资源采样和 TensorBoard 指标解析均在 perf 层完成,不入侵主流程。 + +## 后续扩展方向 + +该设计后续可以自然扩展到: + +1. trainer perf 复用资源采样模块。 +2. 自动生成 Markdown 报告。 +3. 增加 PID 级或进程树级资源观测。 +4. 支持更多 monitor 后端的指标解析。 diff --git a/pyproject.toml b/pyproject.toml index b4d38474712..bbb92fb6b83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,9 @@ dependencies = [ "sortedcontainers", "word2number", "matplotlib", - "transformers>=5.5.3", + "psutil", + "nvidia-ml-py", + "transformers>=5.6.2", "datasets>=4.0.0", "typer>=0.20.1", ] @@ -52,9 +54,7 @@ trinity = "trinity.cli.launcher:main" [project.optional-dependencies] vllm = [ - "vllm>=0.17.0,<=0.19.1", - # For v0.17 to v0.19, the default dependencies require transformers < 5. - # We have patched vLLM to support transformers >= 5.0.0. + "vllm>=0.19.1,<=0.20.1", ] data = [ "py-data-juicer>=1.4.3" @@ -79,9 +79,8 @@ dev = [ ] megatron = [ "megatron-core[mlm]==0.16.1", - # if you found "undefined symbol" error in transformer engine - # reinstall it with --no-build-isolation and `--no-cache-dir` flag - "transformer_engine[pytorch]==2.13.0", + # Please install transformer-engine from source, do not install from pip + # "transformer-engine[pytorch]==2.14.1", # Install mbridge from main branch (unreleased version) # "mbridge @ git+https://github.com/ISEEKYAN/mbridge.git@90c4633a6cdcfe5d29723d7b145d32f6f5e73303", @@ -89,7 +88,7 @@ megatron = [ # "megatron-bridge==0.3.1", ] tinker = [ - "tinker>=0.10.0; python_version >= '3.11'", + "tinker>=0.10.0,<=0.16.1; python_version >= '3.11'", ] doc = [ @@ -104,7 +103,6 @@ doc = [ mm = [ "qwen-vl-utils", - "transformers>=4.54.0", "blobfile", ] diff --git a/scripts/docker/Dockerfile b/scripts/docker/Dockerfile deleted file mode 100644 index ab218222891..00000000000 --- a/scripts/docker/Dockerfile +++ /dev/null @@ -1,40 +0,0 @@ -# This Dockerfile sets up a Trinity-RFT environment with minimal support. -# Build and run the docker image with the following command: -# -# cd -# docker build -f scripts/docker/Dockerfile -t trinity-rft:latest . -# docker run -it --gpus all --shm-size="64g" --rm -v $PWD:/workspace -v :/data trinity-rft:latest - - -FROM nvcr.io/nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04 - -WORKDIR /workspace - -RUN chmod 1777 /tmp && apt update && apt install -y \ - build-essential \ - curl git wget vim tmux net-tools \ - python3 python3-pip python3-dev python3-packaging \ - libomp-dev infiniband-diags libibverbs-dev librdmacm-dev rdma-core perftest \ - && rm -rf /var/lib/apt/lists/* \ - && ln -sf /usr/bin/python3 /usr/bin/python \ - && ln -sf /usr/bin/pip3 /usr/bin/pip - - -# For Aliyun users: update pip mirror to aliyun to speed up pip install -# RUN pip config set global.index-url http://mirrors.cloud.aliyuncs.com/pypi/simple/ \ -# && pip config set install.trusted-host mirrors.cloud.aliyuncs.com - -# copy the Trinity-RFT dir into the workspace -COPY . . - -RUN pip install --upgrade pip && pip install -e .[vllm,mm,dev] && pip install flash_attn==2.8.1 --no-build-isolation - -# Set Env variables - -# WANDB -# ENV WANDB_API_KEY= -# ENV WANDB_BASE_URL= - -# LLM API -# ENV OPENAI_API_KEY= -# ENV DASH_API_KEY= diff --git a/scripts/docker/Dockerfile.megatron b/scripts/docker/Dockerfile.megatron deleted file mode 100644 index 681dd1c9f5e..00000000000 --- a/scripts/docker/Dockerfile.megatron +++ /dev/null @@ -1,50 +0,0 @@ -# This Dockerfile sets up a Trinity-RFT environment with Megatron-LM support. -# Build and run the docker image with the following command: -# -# cd -# docker build -f scripts/docker/Dockerfile.megatron -t trinity-rft-megatron:latest . -# docker run -it --gpus all --shm-size="64g" --rm -v $PWD:/workspace -v :/data trinity-rft-megatron:latest - - -FROM nvcr.io/nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04 - -WORKDIR /workspace - -RUN chmod 1777 /tmp && apt update && apt install -y \ - build-essential \ - curl git wget vim tmux net-tools \ - python3 python3-pip python3-dev python3-packaging \ - libomp-dev infiniband-diags libibverbs-dev librdmacm-dev rdma-core perftest \ - && rm -rf /var/lib/apt/lists/* \ - && ln -sf /usr/bin/python3 /usr/bin/python \ - && ln -sf /usr/bin/pip3 /usr/bin/pip - -# For Aliyun users: update pip mirror to aliyun to speed up pip install -# RUN pip config set global.index-url http://mirrors.cloud.aliyuncs.com/pypi/simple/ \ -# && pip config set install.trusted-host mirrors.cloud.aliyuncs.com - -# copy the Trinity-RFT dir into the workspace -COPY . . - -# Install Trinity-RFT with Megatron -RUN pip install --upgrade pip \ - && pip install -e .[vllm,mm,dev] \ - && pip install flash_attn==2.8.1 --no-build-isolation \ - && pip install -e .[megatron] \ - && pip install transformer_engine[pytorch]==2.10.0 --no-build-isolation --no-cache-dir \ - && pip install git+https://github.com/ISEEKYAN/mbridge.git@20e9ffbbe72ae7b1df83bfe1bc3c11f7382f2612 \ - && NVCC_APPEND_FLAGS="--threads 4" APEX_PARALLEL_BUILD=8 pip install -v \ - --disable-pip-version-check --no-cache-dir --no-build-isolation \ - --config-settings "--build-option=--cpp_ext" \ - --config-settings "--build-option=--cuda_ext" \ - --resume-retries 20 git+https://github.com/NVIDIA/apex.git - -# Set Env variables - -# WANDB -# ENV WANDB_API_KEY= -# ENV WANDB_BASE_URL= - -# LLM API -# ENV OPENAI_API_KEY= -# ENV DASH_API_KEY= diff --git a/scripts/docker/Dockerfile.uv b/scripts/docker/Dockerfile.uv index 9af1071e065..58e62d7d100 100644 --- a/scripts/docker/Dockerfile.uv +++ b/scripts/docker/Dockerfile.uv @@ -10,20 +10,24 @@ # 2. The uv virtual environment is created at `/opt/venv`, use `source /opt/venv/bin/activate` to activate it. # 3. Make sure to use `uv pip` to install packages within the virtual environment. -FROM nvcr.io/nvidia/cuda:12.8.1-cudnn-devel-ubuntu22.04 +FROM nvcr.io/nvidia/cuda:13.0.1-cudnn-devel-ubuntu22.04 WORKDIR /workspace RUN chmod 1777 /tmp && apt update && apt install -y \ build-essential \ - curl git wget vim tmux net-tools \ + curl git wget vim tmux net-tools cmake \ python3 python3-pip python3-dev python3-packaging python3-venv \ libomp-dev libnuma1 infiniband-diags libibverbs-dev librdmacm-dev rdma-core perftest \ + libnuma-dev \ && rm -rf /var/lib/apt/lists/* \ && ln -sf /usr/bin/python3 /usr/bin/python \ && ln -sf /usr/bin/pip3 /usr/bin/pip ENV VIRTUAL_ENV=/opt/venv +ARG BUILD_JOBS=32 +ARG NVTE_BUILD_THREADS_PER_JOB=2 +ARG NVCC_THREADS=8 # copy the Trinity-RFT dir into the workspace COPY . . @@ -37,18 +41,38 @@ RUN pip install uv && uv venv /opt/venv --python=python3.12 # Install Trinity-RFT RUN . /opt/venv/bin/activate && \ uv pip install -e.[mm,dev,tinker,data,agent] && \ - uv pip install vllm==0.19.1 && \ - uv pip install flash_attn==2.8.3 --no-build-isolation && \ - uv pip install -e .[megatron,qwen3_5] --no-build-isolation && \ + uv pip install vllm==0.20.1 + +# Install flash attention +RUN . /opt/venv/bin/activate && \ + MAX_JOBS=${BUILD_JOBS} \ + uv pip install flash_attn==2.8.3 --no-build-isolation + +# Install Transformer Engine +RUN . /opt/venv/bin/activate && \ + git clone --branch stable --depth 1 --recursive https://github.com/NVIDIA/TransformerEngine.git /tmp/TransformerEngine && \ + MAX_JOBS=${BUILD_JOBS} \ + CMAKE_BUILD_PARALLEL_LEVEL=${BUILD_JOBS} \ + NVTE_BUILD_THREADS_PER_JOB=${NVTE_BUILD_THREADS_PER_JOB} \ + NVTE_FRAMEWORK=pytorch \ + uv pip install --no-build-isolation /tmp/TransformerEngine && \ + rm -rf /tmp/TransformerEngine + +# Install Megatron +RUN . /opt/venv/bin/activate && MAX_JOBS=${BUILD_JOBS} \ + CMAKE_BUILD_PARALLEL_LEVEL=${BUILD_JOBS} \ + NVTE_BUILD_THREADS_PER_JOB=${NVTE_BUILD_THREADS_PER_JOB} \ + uv pip install -e .[qwen3_5] --no-build-isolation && \ + uv pip install megatron-core[mlm]==0.16.1 --no-build-isolation && \ uv pip install git+https://github.com/ISEEKYAN/mbridge.git@90c4633a6cdcfe5d29723d7b145d32f6f5e73303 && \ - uv pip install transformers==5.5.4 && \ - NVCC_APPEND_FLAGS="--threads 4" APEX_PARALLEL_BUILD=8 \ + NVCC_APPEND_FLAGS="--threads ${NVCC_THREADS}" APEX_PARALLEL_BUILD=${BUILD_JOBS} \ uv pip install -v --no-build-isolation \ --config-settings="--build-option=--cpp_ext" \ --config-settings="--build-option=--cuda_ext" \ git+https://github.com/NVIDIA/apex.git # Set Env variables +# ENV LD_LIBRARY_PATH=/usr/lib/x86_64-linux-gnu:/usr/local/nvidia/lib:/usr/local/nvidia/lib64:/usr/local/cuda/lib64 # WANDB # ENV WANDB_API_KEY= diff --git a/tests/cli/launcher_test.py b/tests/cli/launcher_test.py index 5e5aa9e0089..337e17fe4dd 100644 --- a/tests/cli/launcher_test.py +++ b/tests/cli/launcher_test.py @@ -401,7 +401,7 @@ def test_debug_mode(self, mock_load): process.terminate() @mock.patch("trinity.manager.log_manager.LogManager") - @mock.patch("trinity.cli.launcher.load_config") + @mock.patch("trinity.cli.log.load_config") def test_log_mode(self, mock_load_config, mock_log_manager): result = runner.invoke(launcher.app, ["log"]) self.assertNotEqual(result.exit_code, 0) diff --git a/tests/perf/resource_backends_test.py b/tests/perf/resource_backends_test.py new file mode 100644 index 00000000000..0b2f6e414be --- /dev/null +++ b/tests/perf/resource_backends_test.py @@ -0,0 +1,70 @@ +"""Tests for NVML-backed perf resource sampling.""" + +import unittest +from types import SimpleNamespace +from unittest.mock import patch + +from trinity.perf.resource_backends import SystemResourceBackend + + +class FakeProcess: + def __init__(self): + self._cpu_values = iter([0.0, 12.5]) + + def cpu_percent(self, interval=None): + return next(self._cpu_values) + + def memory_info(self): + return SimpleNamespace(rss=256 * 1024 * 1024) + + def memory_percent(self): + return 1.25 + + +class SystemResourceBackendTest(unittest.TestCase): + @patch("trinity.perf.resource_backends.time.sleep") + @patch("trinity.perf.resource_backends.nvmlDeviceGetName", return_value="GPU-0") + @patch("trinity.perf.resource_backends.nvmlDeviceGetHandleByIndex", return_value=object()) + @patch("trinity.perf.resource_backends.nvmlDeviceGetCount", return_value=1) + @patch("trinity.perf.resource_backends.nvmlShutdown") + @patch("trinity.perf.resource_backends.nvmlInit") + @patch("trinity.perf.resource_backends.psutil.Process", return_value=FakeProcess()) + def test_sample_keeps_peak_gpu_utilization_within_one_outer_sample( + self, + _mock_process, + _mock_nvml_init, + _mock_nvml_shutdown, + _mock_gpu_count, + _mock_gpu_handle, + _mock_gpu_name, + _mock_sleep, + ): + utilization_side_effect = [ + SimpleNamespace(gpu=0.0), + SimpleNamespace(gpu=35.0), + SimpleNamespace(gpu=80.0), + ] + memory_side_effect = [ + SimpleNamespace(used=100 * 1024 * 1024, total=500 * 1024 * 1024), + SimpleNamespace(used=120 * 1024 * 1024, total=500 * 1024 * 1024), + SimpleNamespace(used=110 * 1024 * 1024, total=500 * 1024 * 1024), + ] + + with patch( + "trinity.perf.resource_backends.nvmlDeviceGetUtilizationRates", + side_effect=utilization_side_effect, + ), patch( + "trinity.perf.resource_backends.nvmlDeviceGetMemoryInfo", + side_effect=memory_side_effect, + ): + backend = SystemResourceBackend( + gpu_subsample_count=3, + gpu_subsample_interval_seconds=0.0, + ) + backend.open() + sample = backend.sample() + backend.close() + + self.assertEqual(sample.cpu_percent, 12.5) + self.assertEqual(sample.gpu_metrics[0].gpu_util_percent, 80.0) + self.assertEqual(sample.gpu_metrics[0].gpu_memory_used_mb, 120.0) diff --git a/tests/perf/resource_sampler_test.py b/tests/perf/resource_sampler_test.py new file mode 100644 index 00000000000..dbf624e88a0 --- /dev/null +++ b/tests/perf/resource_sampler_test.py @@ -0,0 +1,82 @@ +"""Tests for perf resource timeline helpers.""" + +import itertools +import time +import unittest +from typing import cast + +from trinity.perf.resource_backends import ( + GPUSample, + ResourceSample, + SystemResourceBackend, +) +from trinity.perf.resource_sampler import ResourceSampler + + +class FakeBackend: + def __init__(self): + self.opened = False + self.closed = False + self.sample_index = itertools.count() + + def open(self): + self.opened = True + + def close(self): + self.closed = True + + def sample(self): + index = next(self.sample_index) + return ResourceSample( + timestamp=1000.0 + index, + cpu_percent=50.0 + index, + memory_rss_mb=1024.0 + index, + memory_percent=20.0 + index, + gpu_metrics=[ + GPUSample( + gpu_id=0, + name="GPU-0", + gpu_util_percent=70.0 + index, + gpu_memory_used_mb=16000.0 + index, + gpu_memory_total_mb=24000.0, + ), + GPUSample( + gpu_id=1, + name="GPU-1", + gpu_util_percent=75.0 + index, + gpu_memory_used_mb=15000.0 + index, + gpu_memory_total_mb=24000.0, + ), + ], + ) + + +class ResourceSamplerTest(unittest.TestCase): + def test_resource_sampler_collects_samples(self): + backend = FakeBackend() + sampler = ResourceSampler( + interval_seconds=0.01, + backend=cast(SystemResourceBackend, backend), + ) + + sampler.start() + time.sleep(0.03) + samples = sampler.stop() + + self.assertTrue(backend.opened) + self.assertTrue(backend.closed) + self.assertGreaterEqual(len(samples), 2) + self.assertEqual(samples[0].gpu_metrics[0].gpu_id, 0) + + def test_resource_samples_serialize_cpu_single_line_and_gpu_per_device(self): + samples = [FakeBackend().sample(), FakeBackend().sample()] + + payload = {"resource_timeline": [sample.to_dict() for sample in samples]} + + self.assertEqual(len(payload["resource_timeline"]), 2) + self.assertEqual(payload["resource_timeline"][0]["cpu_percent"], 50.0) + self.assertEqual(len(payload["resource_timeline"][0]["gpu_metrics"]), 2) + self.assertEqual( + payload["resource_timeline"][0]["gpu_metrics"][0]["name"], + "GPU-0", + ) diff --git a/tests/trainer/trainer_test.py b/tests/trainer/trainer_test.py index 2a71a965b08..739b64c8169 100644 --- a/tests/trainer/trainer_test.py +++ b/tests/trainer/trainer_test.py @@ -31,7 +31,8 @@ get_vision_language_model_path, ) from trinity.buffer import get_buffer_reader -from trinity.cli.launcher import bench, both, convert, explore, run, serve, train +from trinity.cli.convert import convert_command +from trinity.cli.launcher import bench, both, explore, run, serve, train from trinity.common.config import ( AlgorithmConfig, BufferConfig, @@ -149,7 +150,7 @@ def test_trainer(self): self.assertGreater(len(hf_dir_step_4), 0) self.assertGreater(len(hf_dir_step_8), 0) # test checkpoint convert - convert(self.config.checkpoint_job_dir) + convert_command(self.config.checkpoint_job_dir) hf_dir_step_4 = os.listdir(os.path.join(checkpoint_step_4, "actor", "huggingface")) hf_dir_step_8 = os.listdir(os.path.join(checkpoint_step_8, "actor", "huggingface")) self.assertIn("model.safetensors", hf_dir_step_4) diff --git a/trinity/buffer/pipelines/experience_pipeline.py b/trinity/buffer/pipelines/experience_pipeline.py index f136bd7004b..340ec1efd44 100644 --- a/trinity/buffer/pipelines/experience_pipeline.py +++ b/trinity/buffer/pipelines/experience_pipeline.py @@ -164,16 +164,16 @@ async def _process_experiences(self, exps: list[Experience]) -> Dict: # Process experiences through operators for idx, operator in enumerate(self.operators): with Timer( - metrics, f"time/experience_pipeline/operator/{idx}_{operator.__class__.__name__}" + metrics, f"experience_pipeline/time/operator/{idx}_{operator.__class__.__name__}" ): exps, metric = await operator.process(exps) metrics.update(metric) metrics["experience_count"] = len(exps) # Write processed experiences to output buffer - with Timer(metrics, "time/experience_pipeline/write"): + with Timer(metrics, "experience_pipeline/time/write"): await self.output.write_async(exps) - metrics["time/experience_pipeline/total"] = time.time() - st + metrics["experience_pipeline/time/total"] = time.time() - st # prefix metrics keys with 'pipeline/' result_metrics = {} diff --git a/trinity/cli/convert.py b/trinity/cli/convert.py new file mode 100644 index 00000000000..085df5f17ae --- /dev/null +++ b/trinity/cli/convert.py @@ -0,0 +1,26 @@ +import os +from typing import Optional + +import typer +from typing_extensions import Annotated + + +def convert_command( + checkpoint_dir: Annotated[ + str, + typer.Option("--checkpoint-dir", "-c", help="The path to the checkpoint directory."), + ], + base_model_dir: Annotated[ + Optional[str], + typer.Option("--base-model-dir", "-b", help="The path to the base model."), + ] = None, +) -> None: + """Convert model checkpoints to huggingface format.""" + from trinity.manager.checkpoint_converter import Converter + + dir_path = checkpoint_dir + if "global_step_" in dir_path: + while not os.path.basename(dir_path).startswith("global_step_"): + dir_path = os.path.dirname(dir_path) + converter = Converter(base_model_dir) + converter.convert(dir_path) diff --git a/trinity/cli/launcher.py b/trinity/cli/launcher.py index 98aec921a12..3bf6ca7e0e9 100644 --- a/trinity/cli/launcher.py +++ b/trinity/cli/launcher.py @@ -2,7 +2,9 @@ import asyncio import os import sys +import time import traceback +from dataclasses import dataclass from pprint import pprint from typing import Optional @@ -10,10 +12,13 @@ import typer from typing_extensions import Annotated +from trinity.cli.convert import convert_command +from trinity.cli.log import log_command +from trinity.cli.perf import perf_app +from trinity.cli.studio import studio_command +from trinity.cli.view import view_command from trinity.common.config import Config, load_config from trinity.common.constants import DEBUG_NAMESPACE, PLUGIN_DIRS_ENV_VAR -from trinity.manager.checkpoint_converter import Converter -from trinity.manager.state_manager import StateManager from trinity.utils.dlc_utils import is_running, setup_ray_cluster, stop_ray_cluster from trinity.utils.log import get_logger from trinity.utils.plugin_loader import load_plugins @@ -27,71 +32,196 @@ ) -def bench(config: Config) -> None: +@dataclass(slots=True) +class StageError: + type_name: str + message: str + traceback_text: str + + +@dataclass(slots=True) +class StageStatus: + stage: str + success: bool + startup_time_sec: Optional[float] = None + execution_time_sec: Optional[float] = None + total_time_sec: Optional[float] = None + error: Optional[StageError] = None + + +def _build_stage_error(error: BaseException) -> StageError: + return StageError( + type_name=type(error).__name__, + message=str(error), + traceback_text=traceback.format_exc(), + ) + + +def bench(config: Config, *, timeout: Optional[float] = None) -> StageStatus: """Evaluate model.""" from trinity.explorer.explorer import Explorer config.explorer.name = "benchmark" explorer = Explorer.get_actor(config) + startup_started_at = time.perf_counter() + startup_time_sec: Optional[float] = None try: - ray.get(explorer.prepare.remote()) - ray.get(explorer.benchmark.remote()) + ray.get(explorer.prepare.remote(), timeout=timeout) + startup_time_sec = time.perf_counter() - startup_started_at + + run_started_at = time.perf_counter() + ray.get(explorer.benchmark.remote(), timeout=timeout) + execution_time_sec = time.perf_counter() - run_started_at logger.info("Benchmark finished.") - except Exception: - logger.error(f"Benchmark failed:\n{traceback.format_exc()}") + return StageStatus( + stage="bench", + success=True, + startup_time_sec=startup_time_sec, + execution_time_sec=execution_time_sec, + total_time_sec=time.perf_counter() - startup_started_at, + ) + except Exception as exc: + error = _build_stage_error(exc) + logger.error(f"Benchmark failed:\n{error.traceback_text}") + return StageStatus( + stage="bench", + success=False, + startup_time_sec=startup_time_sec, + execution_time_sec=None, + total_time_sec=time.perf_counter() - startup_started_at, + error=error, + ) finally: - ray.get(explorer.shutdown.remote()) + ray.get(explorer.shutdown.remote(), timeout=timeout) -def explore(config: Config) -> None: +def explore(config: Config, *, timeout: Optional[float] = None) -> StageStatus: """Run explorer.""" from trinity.explorer.explorer import Explorer explorer = Explorer.get_actor(config) + startup_started_at = time.perf_counter() + startup_time_sec: Optional[float] = None + run_started_at: Optional[float] = None try: - ray.get(explorer.prepare.remote()) - ray.get(explorer.sync_weight.remote()) - ray.get(explorer.explore.remote()) - except Exception: - logger.error(f"Explorer failed:\n{traceback.format_exc()}") + ray.get(explorer.prepare.remote(), timeout=timeout) + startup_time_sec = time.perf_counter() - startup_started_at + + run_started_at = time.perf_counter() + ray.get(explorer.sync_weight.remote(), timeout=timeout) + ray.get(explorer.explore.remote(), timeout=timeout) + execution_time_sec = time.perf_counter() - run_started_at + return StageStatus( + stage="explore", + success=True, + startup_time_sec=startup_time_sec, + execution_time_sec=execution_time_sec, + total_time_sec=time.perf_counter() - startup_started_at, + ) + except Exception as exc: + error = _build_stage_error(exc) + logger.error(f"Explorer failed:\n{error.traceback_text}") + execution_time_sec = ( + time.perf_counter() - run_started_at if run_started_at is not None else None + ) + return StageStatus( + stage="explore", + success=False, + startup_time_sec=startup_time_sec, + execution_time_sec=execution_time_sec, + total_time_sec=time.perf_counter() - startup_started_at, + error=error, + ) finally: - ray.get(explorer.shutdown.remote()) + ray.get(explorer.shutdown.remote(), timeout=timeout) -def train(config: Config) -> None: +def train(config: Config, *, timeout: Optional[float] = None) -> StageStatus: """Run trainer.""" from trinity.trainer.trainer import Trainer trainer = Trainer.get_actor(config) + startup_started_at = time.perf_counter() + startup_time_sec: Optional[float] = None + run_started_at: Optional[float] = None try: - ray.get(trainer.prepare.remote()) - ray.get(trainer.sync_weight.remote()) - ray.get(trainer.train.remote()) - except Exception: - logger.error(f"Trainer failed:\n{traceback.format_exc()}") + ray.get(trainer.prepare.remote(), timeout=timeout) + startup_time_sec = time.perf_counter() - startup_started_at + + run_started_at = time.perf_counter() + ray.get(trainer.sync_weight.remote(), timeout=timeout) + ray.get(trainer.train.remote(), timeout=timeout) + execution_time_sec = time.perf_counter() - run_started_at + return StageStatus( + stage="train", + success=True, + startup_time_sec=startup_time_sec, + execution_time_sec=execution_time_sec, + total_time_sec=time.perf_counter() - startup_started_at, + ) + except Exception as exc: + error = _build_stage_error(exc) + logger.error(f"Trainer failed:\n{error.traceback_text}") + execution_time_sec = ( + time.perf_counter() - run_started_at if run_started_at is not None else None + ) + return StageStatus( + stage="train", + success=False, + startup_time_sec=startup_time_sec, + execution_time_sec=execution_time_sec, + total_time_sec=time.perf_counter() - startup_started_at, + error=error, + ) finally: - ray.get(trainer.shutdown.remote()) + ray.get(trainer.shutdown.remote(), timeout=timeout) -def serve(config: Config) -> None: +def serve(config: Config, *, timeout: Optional[float] = None) -> StageStatus: """Run explorer in server mode.""" from trinity.explorer.explorer import Explorer explorer = Explorer.get_actor(config) + startup_started_at = time.perf_counter() + startup_time_sec: Optional[float] = None + run_started_at: Optional[float] = None try: - ray.get(explorer.prepare.remote()) - ray.get(explorer.sync_weight.remote()) - ray.get(explorer.serve.remote()) - except Exception: - logger.error(f"Explorer failed:\n{traceback.format_exc()}") + ray.get(explorer.prepare.remote(), timeout=timeout) + startup_time_sec = time.perf_counter() - startup_started_at + + run_started_at = time.perf_counter() + ray.get(explorer.sync_weight.remote(), timeout=timeout) + ray.get(explorer.serve.remote(), timeout=timeout) + execution_time_sec = time.perf_counter() - run_started_at + return StageStatus( + stage="serve", + success=True, + startup_time_sec=startup_time_sec, + execution_time_sec=execution_time_sec, + total_time_sec=time.perf_counter() - startup_started_at, + ) + except Exception as exc: + error = _build_stage_error(exc) + logger.error(f"Explorer failed:\n{error.traceback_text}") + execution_time_sec = ( + time.perf_counter() - run_started_at if run_started_at is not None else None + ) + return StageStatus( + stage="serve", + success=False, + startup_time_sec=startup_time_sec, + execution_time_sec=execution_time_sec, + total_time_sec=time.perf_counter() - startup_started_at, + error=error, + ) finally: - ray.get(explorer.shutdown.remote()) + ray.get(explorer.shutdown.remote(), timeout=timeout) -def both(config: Config) -> None: +def both(config: Config) -> StageStatus: """Setup both explorer and trainer. For the explorer, a step contains `batch_size * sync_interval` number @@ -106,6 +236,7 @@ def both(config: Config) -> None: explorer = Explorer.get_actor(config) trainer = Trainer.get_actor(config) + started_at = time.perf_counter() try: ray.get([explorer.__ray_ready__.remote(), trainer.__ray_ready__.remote()]) ray.get( @@ -147,8 +278,20 @@ def both(config: Config) -> None: "===============================================================" ) ray.wait(wait_ref, timeout=config.synchronizer.sync_timeout) - except Exception: - logger.error(f"Explorer or Trainer failed:\n{traceback.format_exc()}") + return StageStatus( + stage="both", + success=True, + total_time_sec=time.perf_counter() - started_at, + ) + except Exception as exc: + error = _build_stage_error(exc) + logger.error(f"Explorer or Trainer failed:\n{error.traceback_text}") + return StageStatus( + stage="both", + success=False, + total_time_sec=time.perf_counter() - started_at, + error=error, + ) finally: ray.wait( [explorer.shutdown.remote(), trainer.shutdown.remote()], @@ -167,7 +310,7 @@ def both(config: Config) -> None: } -def run_stage(config: Config) -> None: +def run_stage(config: Config) -> StageStatus: ray.init( address=config.cluster.ray_address, ignore_reinit_error=True, @@ -179,7 +322,7 @@ def run_stage(config: Config) -> None: from trinity.buffer.pipelines.task_pipeline import check_and_run_task_pipeline check_and_run_task_pipeline(config) - MODE_MAP[config.mode](config) + return MODE_MAP[config.mode](config) # type: ignore[operator] finally: if config.monitor.enable_ray_timeline: timeline_file = os.path.join(config.monitor.cache_dir, "timeline.json") @@ -224,6 +367,7 @@ def run( try: if cfg.stages: + from trinity.manager.state_manager import StateManager from trinity.trainer.verl.utils import get_latest_hf_checkpoint_path state_manager = StateManager( @@ -265,19 +409,6 @@ def run( stop_ray_cluster(namespace=cluster_namespace) -@app.command() -def studio( - port: Annotated[ - int, - typer.Option("--port", "-p", help="The port for Trinity-Studio."), - ] = 8501, -) -> None: - """Run studio to manage configurations.""" - from trinity.manager.config_manager import ConfigManager - - ConfigManager.run(port) - - @app.command() def debug( config: Annotated[ @@ -357,145 +488,11 @@ def debug( ) -@app.command() -def view( - url: Annotated[ - str, - typer.Option( - "--url", - help="Database URL for the experience table, for example sqlite:////path/to/debug_buffer.db.", - ), - ], - table: Annotated[ - str, - typer.Option("--table", help="Name of the experience table to monitor."), - ], - tokenizer: Annotated[ - str, - typer.Option( - "--tokenizer", - help="Tokenizer/model path used to decode token ids in the viewer.", - ), - ], - schema: Annotated[ - str, - typer.Option( - "--schema", - help="Schema type of the table. Supported values: experience, sft.", - ), - ] = "experience", - port: Annotated[ - int, - typer.Option("--port", "-p", help="The port for Experience Viewer."), - ] = 8502, -) -> None: - """Run the Streamlit viewer to inspect an experience table.""" - from trinity.buffer.viewer import SQLExperienceViewer - - schema = schema.lower() - if schema not in {"experience", "sft"}: - raise typer.BadParameter("--schema only supports 'experience' or 'sft'.") - - SQLExperienceViewer.run_viewer( - model_path=tokenizer, - db_url=url, - table_name=table, - schema_type=schema, - port=port, - ) - - -@app.command() -def convert( - checkpoint_dir: Annotated[ - str, - typer.Option("--checkpoint-dir", "-c", help="The path to the checkpoint directory."), - ], - base_model_dir: Annotated[ - Optional[str], - typer.Option("--base-model-dir", "-b", help="The path to the base model."), - ] = None, -) -> None: - """Convert checkpoints to huggingface format.""" - dir_path = checkpoint_dir - if "global_step_" in dir_path: - while not os.path.basename(dir_path).startswith("global_step_"): - dir_path = os.path.dirname(dir_path) - converter = Converter(base_model_dir) - converter.convert(dir_path) - - -@app.command() -def log( - log_dir: Annotated[ - str, - typer.Option( - "--log-dir", - "-d", - help="Path to the log directory. If provided, it will be used directly and ignore --config.", - ), - ] = "", - config: Annotated[ - str, - typer.Option( - "--config", - "-c", - help="Path to the config file. If provided, it will automatically locate the log directory based on the config.", - ), - ] = "", - keyword: Annotated[ - Optional[str], - typer.Option( - "--keyword", - "-k", - help="Only track log files containing the keyword in their filenames.", - ), - ] = None, - level: Annotated[ - str, - typer.Option("--level", "-l", help="The minimum log level to display in real-time."), - ] = "INFO", - last_n_lines: Annotated[ - int, - typer.Option("--last-n-lines", "-n", help="Number of last lines to display when starting."), - ] = 0, - search_pattern: Annotated[ - Optional[str], - typer.Option( - "--search-pattern", - "-p", - help="The pattern to search in log files. Only search for history logs and display all lines containing the pattern.", - ), - ] = None, - no_color: Annotated[ - bool, - typer.Option("--no-color", help="Disable colored output."), - ] = False, -) -> None: - """Monitor log files in real-time.""" - from trinity.manager.log_manager import LogManager - - if not config and not log_dir: - raise typer.BadParameter("Either --config or --log-dir must be provided.") - if not log_dir: - cfg = load_config(config) - checkpoint_job_dir = cfg.get_checkpoint_job_dir() - # we do not use check_and_update here because user may use this command - # in another environment - log_dir = os.path.join(checkpoint_job_dir, "log") - - if not os.path.exists(log_dir): - raise FileNotFoundError(f"Log directory not found: {log_dir}") - - log_manager = LogManager( - log_dir=log_dir, - keyword=keyword, - min_level=level, - color_output=not no_color, - last_n_lines=last_n_lines, - search_pattern=search_pattern, - ) - log_manager.monitor() +app.command("studio")(studio_command) +app.add_typer(perf_app, name="perf") +app.command("view")(view_command) +app.command("convert")(convert_command) +app.command("log")(log_command) def main() -> None: diff --git a/trinity/cli/log.py b/trinity/cli/log.py new file mode 100644 index 00000000000..40dc5d9301f --- /dev/null +++ b/trinity/cli/log.py @@ -0,0 +1,79 @@ +import os +from typing import Optional + +import typer +from typing_extensions import Annotated + +from trinity.common.config import load_config + + +def log_command( + log_dir: Annotated[ + str, + typer.Option( + "--log-dir", + "-d", + help="Path to the log directory. If provided, it will be used directly and ignore --config.", + ), + ] = "", + config: Annotated[ + str, + typer.Option( + "--config", + "-c", + help="Path to the config file. If provided, it will automatically locate the log directory based on the config.", + ), + ] = "", + keyword: Annotated[ + Optional[str], + typer.Option( + "--keyword", + "-k", + help="Only track log files containing the keyword in their filenames.", + ), + ] = None, + level: Annotated[ + str, + typer.Option("--level", "-l", help="The minimum log level to display in real-time."), + ] = "INFO", + last_n_lines: Annotated[ + int, + typer.Option("--last-n-lines", "-n", help="Number of last lines to display when starting."), + ] = 0, + search_pattern: Annotated[ + Optional[str], + typer.Option( + "--search-pattern", + "-p", + help="The pattern to search in log files. Only search for history logs and display all lines containing the pattern.", + ), + ] = None, + no_color: Annotated[ + bool, + typer.Option("--no-color", help="Disable colored output."), + ] = False, +) -> None: + """Monitor log files in real-time.""" + from trinity.manager.log_manager import LogManager + + if not config and not log_dir: + raise typer.BadParameter("Either --config or --log-dir must be provided.") + if not log_dir: + cfg = load_config(config) + checkpoint_job_dir = cfg.get_checkpoint_job_dir() + # we do not use check_and_update here because user may use this command + # in another environment + log_dir = os.path.join(checkpoint_job_dir, "log") + + if not os.path.exists(log_dir): + raise FileNotFoundError(f"Log directory not found: {log_dir}") + + log_manager = LogManager( + log_dir=log_dir, + keyword=keyword, + min_level=level, + color_output=not no_color, + last_n_lines=last_n_lines, + search_pattern=search_pattern, + ) + log_manager.monitor() diff --git a/trinity/cli/perf.py b/trinity/cli/perf.py new file mode 100644 index 00000000000..4c3383ace16 --- /dev/null +++ b/trinity/cli/perf.py @@ -0,0 +1,99 @@ +import os +import traceback +from typing import Optional + +import typer +from typing_extensions import Annotated + +from trinity.common.constants import PLUGIN_DIRS_ENV_VAR + +perf_app = typer.Typer(help="Performance testing tools.") + + +@perf_app.command("run") +def perf_run( + config: Annotated[ + str, + typer.Option("--config", "-c", help="Path to the config file."), + ], + module: Annotated[ + str, + typer.Option( + "--module", "-m", help="Perf module to run. Currently only supports 'explorer'." + ), + ] = "explorer", + output_path: Annotated[ + str, + typer.Option("--output-path", "-o", help="Path to the output JSON file."), + ] = "./perf/output.json", + monitor_interval: Annotated[ + float, + typer.Option("--monitor-interval", help="Resource sampling interval in seconds."), + ] = 2.0, + total_steps: Annotated[ + int, + typer.Option("--total-steps", help="Total steps to run the explorer for."), + ] = 5, + timeout: Annotated[ + Optional[float], + typer.Option( + "--timeout", help="Optional timeout in seconds for prepare, sync and explore calls." + ), + ] = None, + plugin_dir: Annotated[ + Optional[str], + typer.Option("--plugin-dir", help="Path to the directory containing plugin modules."), + ] = None, +) -> None: + """Run performance benchmark.""" + if module != "explorer": + raise typer.BadParameter("Only --module explorer is supported for now.") + + from trinity.perf import ( + ExplorerPerfOptions, + run_explorer_perf, + write_explorer_perf_output, + ) + + try: + if plugin_dir: + os.environ[PLUGIN_DIRS_ENV_VAR] = plugin_dir + + options = ExplorerPerfOptions( + config_path=config, + output_path=output_path, + monitor_interval=monitor_interval, + total_steps=total_steps, + timeout=timeout, + ) + payload = run_explorer_perf(options) + write_explorer_perf_output(output_path, payload) + except Exception: # noqa: BLE001 + payload = { + "status": { + "success": False, + "error": traceback.format_exc(), + }, + "data": None, + } + write_explorer_perf_output(output_path, payload) + + if not payload["status"]["success"]: + typer.echo(f"Failed to run perf: {payload['status']['error']}") + + +@perf_app.command("view") +def perf_view( + report: Annotated[ + str, + typer.Option("--report", "-r", help="Path to the perf report JSON file."), + ], + port: Annotated[ + int, + typer.Option("--port", "-p", help="Port used by the Streamlit report viewer."), + ] = 8503, +) -> None: + """Open the Streamlit perf report viewer.""" + from trinity.perf.report_viewer import launch_report_viewer + + launch_report_viewer(report, port) diff --git a/trinity/cli/studio.py b/trinity/cli/studio.py new file mode 100644 index 00000000000..e9ca39a2aef --- /dev/null +++ b/trinity/cli/studio.py @@ -0,0 +1,14 @@ +import typer +from typing_extensions import Annotated + + +def studio_command( + port: Annotated[ + int, + typer.Option("--port", "-p", help="The port for Trinity-Studio."), + ] = 8501, +) -> None: + """Run studio to manage configurations.""" + from trinity.manager.config_manager import ConfigManager + + ConfigManager.run(port) diff --git a/trinity/cli/view.py b/trinity/cli/view.py new file mode 100644 index 00000000000..05dc537d317 --- /dev/null +++ b/trinity/cli/view.py @@ -0,0 +1,49 @@ +import typer +from typing_extensions import Annotated + + +def view_command( + url: Annotated[ + str, + typer.Option( + "--url", + help="Database URL for the experience table, for example sqlite:////path/to/debug_buffer.db.", + ), + ], + table: Annotated[ + str, + typer.Option("--table", help="Name of the experience table to monitor."), + ], + tokenizer: Annotated[ + str, + typer.Option( + "--tokenizer", + help="Tokenizer/model path used to decode token ids in the viewer.", + ), + ], + schema: Annotated[ + str, + typer.Option( + "--schema", + help="Schema type of the table. Supported values: experience, sft.", + ), + ] = "experience", + port: Annotated[ + int, + typer.Option("--port", "-p", help="The port for Experience Viewer."), + ] = 8502, +) -> None: + """Run the Streamlit viewer to inspect an experience table.""" + from trinity.buffer.viewer import SQLExperienceViewer + + schema = schema.lower() + if schema not in {"experience", "sft"}: + raise typer.BadParameter("--schema only supports 'experience' or 'sft'.") + + SQLExperienceViewer.run_viewer( + model_path=tokenizer, + db_url=url, + table_name=table, + schema_type=schema, + port=port, + ) diff --git a/trinity/common/models/vllm_patch/__init__.py b/trinity/common/models/vllm_patch/__init__.py index b71996b7395..f458ec65000 100644 --- a/trinity/common/models/vllm_patch/__init__.py +++ b/trinity/common/models/vllm_patch/__init__.py @@ -9,7 +9,7 @@ VLLM_VERSION_0120 = parse_version("0.12.0") VLLM_VERSION_0170 = parse_version("0.17.0") -VLLM_VERSION_0191 = parse_version("0.19.1") +VLLM_VERSION_0201 = parse_version("0.20.1") def vllm_patch(): @@ -64,7 +64,7 @@ def _get_api_server_runner(vllm_version): return run_api_server_in_ray_actor_v13 - if VLLM_VERSION_0170 <= vllm_version <= VLLM_VERSION_0191: + if VLLM_VERSION_0170 <= vllm_version <= VLLM_VERSION_0201: from trinity.common.models.vllm_patch.api_patch_v17 import ( run_api_server_in_ray_actor_v17, ) @@ -73,7 +73,7 @@ def _get_api_server_runner(vllm_version): raise ValueError( f"Unsupported vLLM version: {vllm.__version__}. " - "This patch supports vLLM versions 0.12.0, (0.12.0, 0.17.0), and [0.17.0, 0.19.1]." + "This patch supports vLLM versions 0.12.0, (0.12.0, 0.17.0), and [0.17.0, 0.20.1]." ) diff --git a/trinity/common/models/vllm_patch/worker_patch.py b/trinity/common/models/vllm_patch/worker_patch.py index 6975d5f954f..1f57cc5a7e8 100644 --- a/trinity/common/models/vllm_patch/worker_patch.py +++ b/trinity/common/models/vllm_patch/worker_patch.py @@ -13,10 +13,10 @@ def patch_vllm_prompt_logprobs(model_runner: GPUModelRunner): # noqa: C901 """Patch vLLM model runner to support prompt logprobs extraction.""" version = get_vllm_version() - if version < parse_version("0.10.2") or version > parse_version("0.19.1"): + if version < parse_version("0.10.2") or version > parse_version("0.20.1"): raise ValueError( f"Unsupported vllm version: {vllm.__version__}. " - "This patch requires vllm version >= 0.10.2, <= 0.19.1." + "This patch requires vllm version >= 0.10.2, <= 0.20.1." ) is_v0102 = version == parse_version("0.10.2") diff --git a/trinity/explorer/explorer.py b/trinity/explorer/explorer.py index 5264252d331..30466b67387 100644 --- a/trinity/explorer/explorer.py +++ b/trinity/explorer/explorer.py @@ -392,7 +392,7 @@ async def _finish_steps(self, start_step: int, end_step: int, model_version: int # Record the time: read_task + explore_step (>=1) + eval (if any) if self.explore_start_time is not None: - metric = {"time/explorer_sync_interval": time.time() - self.explore_start_time} + metric = {"explore/time/sync_interval": time.time() - self.explore_start_time} self.explore_start_time = None if self.monitor is not None: self.monitor.log(metric, step=end_step) @@ -400,7 +400,7 @@ async def _finish_steps(self, start_step: int, end_step: int, model_version: int async def _finish_explore_step(self, step: int, model_version: int) -> None: assert self.rollout_coordinator is not None, "Rollout coordinator must be prepared first." metric = {"rollout/model_version": model_version} - with Timer(metric, "time/wait_explore_step"): + with Timer(metric, "explorer/time/wait_explore_step"): result = await self.rollout_coordinator.finalize_train_batch.remote(step) if self.taskset is not None: self.taskset.feedback(result["metrics"]) diff --git a/trinity/perf/__init__.py b/trinity/perf/__init__.py new file mode 100644 index 00000000000..f2d9587344a --- /dev/null +++ b/trinity/perf/__init__.py @@ -0,0 +1,18 @@ +"""Performance tooling package for Trinity.""" + +from .resource_sampler import ResourceSampler +from .stage_perf import ( + ExplorerPerfOptions, + run_explorer_perf, + write_explorer_perf_output, +) +from .tensorboard_metrics import TensorBoardScalarReader, collect_step_metrics + +__all__ = [ + "ExplorerPerfOptions", + "ResourceSampler", + "TensorBoardScalarReader", + "collect_step_metrics", + "run_explorer_perf", + "write_explorer_perf_output", +] diff --git a/trinity/perf/report_viewer.py b/trinity/perf/report_viewer.py new file mode 100644 index 00000000000..415bfe74ca5 --- /dev/null +++ b/trinity/perf/report_viewer.py @@ -0,0 +1,389 @@ +import argparse +import json +import sys +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +import matplotlib.pyplot as plt +import streamlit as st + +try: + from streamlit.runtime.scriptrunner import get_script_run_ctx +except ImportError: # pragma: no cover - fallback for streamlit runtime layout changes + from streamlit.runtime.scriptrunner_utils.script_run_context import ( + get_script_run_ctx, + ) + +STEP_METRIC_PREFIXES_BY_MODULE: dict[str, list[str]] = { + "explorer": [ + "rollout/time/run_execution/mean", + "rollout/time/task_execution/mean", + "rollout/prompt_length/mean", + "rollout/response_length/mean", + "experience_pipeline/experience_count", + ], + "trainer": [], +} + +MEMORY_SERIES_KEY = "memory_rss_mb" + + +class PerfReportViewer: + @staticmethod + def run_viewer(report_path: str, port: int) -> None: + """Start the Streamlit perf report viewer.""" + from streamlit.web import cli + + viewer_path = Path(__file__) + sys.argv = [ + "streamlit", + "run", + str(viewer_path.resolve()), + "--server.port", + str(port), + "--server.fileWatcherType", + "none", + "--", + "--report", + report_path, + ] + sys.exit(cli.main()) + + +def launch_report_viewer(report_path: str, port: int) -> None: + """Launch the Streamlit perf report viewer from another CLI entrypoint.""" + PerfReportViewer.run_viewer(report_path, port) + + +def has_streamlit_context() -> bool: + return get_script_run_ctx() is not None + + +def configure_streamlit_page() -> None: + if has_streamlit_context(): + st.set_page_config(page_title="Trinity Performance Report", layout="wide") + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Trinity Performance Report Viewer") + parser.add_argument("--report", type=str, required=True, help="Path to the perf report JSON.") + parser.add_argument( + "--port", + type=int, + default=8503, + help="Port used when auto-launching the Streamlit report viewer.", + ) + return parser.parse_args() + + +def load_report(report_path: str) -> dict[str, Any]: + report_file = Path(report_path) + if not report_file.exists(): + raise FileNotFoundError(f"Report file not found: {report_path}") + with report_file.open("r", encoding="utf-8") as handle: + return json.load(handle) + + +def infer_module_name(report: dict[str, Any]) -> str: + run_meta = report.get("run_meta", {}) + return str(run_meta.get("module")) + + +def get_step_metric_prefixes(report: dict[str, Any]) -> list[str]: + module_name = infer_module_name(report) + return STEP_METRIC_PREFIXES_BY_MODULE.get(module_name, []) + + +def format_timestamp(timestamp: Optional[float]) -> str: + if timestamp is None: + return "N/A" + return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") + + +def format_metric_value(value: Any) -> str: + if value is None: + return "N/A" + if isinstance(value, float): + return f"{value:.4f}" + return str(value) + + +def metric_label(metric_name: str) -> str: + return metric_name.replace("_", " ").title() + + +def gpu_series_label(gpu_payload: dict[str, Any]) -> str: + gpu_id = gpu_payload.get("gpu_id", "?") + gpu_name = gpu_payload.get("name") + if gpu_name: + return f"GPU {gpu_id} ({gpu_name})" + return f"GPU {gpu_id}" + + +def render_metric_card(metric_name: str, value: Any) -> None: + display_value = format_metric_value(value) + label = metric_label(metric_name) + st.markdown( + f""" +
+
{label}
+
{display_value}
+
+ """, + unsafe_allow_html=True, + ) + + +def build_elapsed_series(series: list[dict[str, Any]]) -> tuple[list[float], list[float]]: + if not series: + return [], [] + start_timestamp = float(series[0]["timestamp"]) + x_values = [float(point["timestamp"]) - start_timestamp for point in series] + y_values = [float(point["value"]) for point in series] + return x_values, y_values + + +def build_scalar_timeline_series( + timeline: list[dict[str, Any]], metric_key: str +) -> list[dict[str, float]]: + return [ + {"timestamp": sample["timestamp"], "value": sample[metric_key]} + for sample in timeline + if sample.get(metric_key) is not None + ] + + +def build_gpu_timeline_series( + timeline: list[dict[str, Any]], metric_key: str +) -> dict[str, dict[str, Any]]: + series_by_gpu: dict[str, dict[str, Any]] = {} + for sample in timeline: + timestamp = sample.get("timestamp") + for gpu_sample in sample.get("gpu_metrics", []): + if gpu_sample.get(metric_key) is None: + continue + gpu_key = str(gpu_sample.get("gpu_id")) + gpu_payload = series_by_gpu.setdefault( + gpu_key, + { + "gpu_id": gpu_sample.get("gpu_id"), + "name": gpu_sample.get("name"), + "values": [], + }, + ) + gpu_payload["values"].append({"timestamp": timestamp, "value": gpu_sample[metric_key]}) + return series_by_gpu + + +def render_line_chart( + title: str, + x_values: list[float], + y_series: dict[str, list[float]], + y_label: str, + legend_below: bool = False, + legend_columns: int = 1, +) -> None: + st.markdown(f"#### {title}") + if not x_values or not y_series: + st.info(f"No data for {title}.") + return + + figure, axis = plt.subplots(figsize=(6, 2.6)) + for series_name, y_values in y_series.items(): + axis.plot(x_values[: len(y_values)], y_values, label=series_name) + axis.set_xlabel("Elapsed Time (s)") + axis.set_ylabel(y_label) + axis.grid(True, alpha=0.3) + if len(y_series) > 1: + if legend_below: + axis.legend( + loc="upper center", + bbox_to_anchor=(0.5, -0.28), + ncol=min(legend_columns, len(y_series)), + frameon=False, + fontsize=8, + ) + figure.subplots_adjust(bottom=0.32) + else: + axis.legend() + st.pyplot(figure, clear_figure=True) + + +def render_step_metric_chart(step_metrics: list[dict[str, Any]], metric_key: str) -> None: + x_values = [ + int(step_metric["step"]) for step_metric in step_metrics if metric_key in step_metric + ] + y_values = [ + float(step_metric[metric_key]) + for step_metric in step_metrics + if step_metric.get(metric_key) is not None + ] + + st.markdown(f"#### {metric_label(metric_key)}") + if not x_values or not y_values: + st.info(f"No data for {metric_key}.") + return + + figure, axis = plt.subplots(figsize=(6, 2.6)) + axis.plot(x_values[: len(y_values)], y_values, marker="o") + axis.set_xlabel("Step") + axis.set_ylabel(metric_label(metric_key)) + axis.grid(True, alpha=0.3) + st.pyplot(figure, clear_figure=True) + + +def render_header(report: dict[str, Any], report_path: str) -> None: + run_meta = report.get("run_meta", {}) + status = report.get("status", {}) + + st.title("Trinity Performance Report") + st.caption(f"Report: {report_path}") + st.caption(f"Generated At: {format_timestamp(run_meta.get('generated_at'))}") + + if not status.get("success"): + st.error("Run failed.") + if status.get("error"): + with st.expander("Error Traceback"): + st.code(str(status["error"])) + + +def render_global_metrics(report: dict[str, Any]) -> None: + st.header("Global Metrics") + timing = report.get("timing", {}) + + metric_items: list[tuple[str, Any]] = [] + metric_items.extend( + ( + metric_key, + timing.get(metric_key), + ) + for metric_key in ("startup_time_sec", "execution_time_sec", "total_time_sec") + ) + + shown_items = [(key, value) for key, value in metric_items if value is not None] + if not shown_items: + st.info("No global metrics found in this report.") + return + + columns = st.columns(min(4, len(shown_items))) + for index, (metric_key, value) in enumerate(shown_items): + with columns[index % len(columns)]: + render_metric_card(metric_key, value) + + +def render_step_metrics(report: dict[str, Any]) -> None: + st.header("Step Metrics") + step_metrics = report.get("step_metrics", []) + if not step_metrics: + st.info("No step metrics found in this report.") + return + + metric_prefixes = get_step_metric_prefixes(report) + metric_keys: list[str] = [] + for step_metric in step_metrics: + for metric_key, metric_value in step_metric.items(): + if metric_key in {"step", "raw_metrics"} or metric_value is None: + continue + if any(metric_key.startswith(prefix) for prefix in metric_prefixes): + if metric_key not in metric_keys: + metric_keys.append(metric_key) + + if not metric_keys: + st.info("No configured step metrics matched the current report.") + return + + for metric_index in range(0, len(metric_keys), 2): + columns = st.columns(2) + for column_index, metric_key in enumerate(metric_keys[metric_index : metric_index + 2]): + with columns[column_index]: + render_step_metric_chart(step_metrics, metric_key) + + with st.expander("Step Metrics Table"): + compact_rows = [] + for step_metric in step_metrics: + compact_row = {key: value for key, value in step_metric.items() if key != "raw_metrics"} + compact_rows.append(compact_row) + st.dataframe(compact_rows, use_container_width=True) + + +def render_resource_utilization(report: dict[str, Any]) -> None: + st.header("Resource Utilization") + resource_timeline = report.get("resource_timeline", []) + + cpu_series = build_scalar_timeline_series(resource_timeline, "cpu_percent") + cpu_x, cpu_y = build_elapsed_series(cpu_series) + + memory_series = build_scalar_timeline_series(resource_timeline, MEMORY_SERIES_KEY) + memory_x, memory_y = build_elapsed_series(memory_series) + + gpu_util_series = build_gpu_timeline_series(resource_timeline, "gpu_util_percent") + gpu_util_x: list[float] = [] + gpu_util_y: dict[str, list[float]] = {} + for gpu_payload in gpu_util_series.values(): + gpu_util_x, values = build_elapsed_series(gpu_payload.get("values", [])) + gpu_util_y[gpu_series_label(gpu_payload)] = values + gpu_memory_series = build_gpu_timeline_series(resource_timeline, "gpu_memory_used_mb") + gpu_memory_x: list[float] = [] + gpu_memory_y: dict[str, list[float]] = {} + for gpu_payload in gpu_memory_series.values(): + gpu_memory_x, values = build_elapsed_series(gpu_payload.get("values", [])) + gpu_memory_y[gpu_series_label(gpu_payload)] = values + first_row = st.columns(2) + with first_row[0]: + render_line_chart("CPU Utilization", cpu_x, {"CPU": cpu_y}, "CPU %") + with first_row[1]: + render_line_chart("Memory Usage", memory_x, {"Memory": memory_y}, "MB") + + second_row = st.columns(2) + with second_row[0]: + render_line_chart( + "GPU Utilization", + gpu_util_x, + gpu_util_y, + "GPU %", + legend_below=True, + legend_columns=2, + ) + with second_row[1]: + render_line_chart( + "GPU Memory Usage", + gpu_memory_x, + gpu_memory_y, + "MB", + legend_below=True, + legend_columns=2, + ) + + +def main(args: Optional[argparse.Namespace] = None) -> None: + configure_streamlit_page() + if args is None: + args = parse_args() + + try: + report = load_report(args.report) + except (FileNotFoundError, json.JSONDecodeError, OSError) as error: + st.title("Trinity Perf Report Viewer") + st.error(str(error)) + return + + render_header(report, args.report) + render_global_metrics(report) + render_step_metrics(report) + render_resource_utilization(report) + + +if __name__ == "__main__": + parsed_args = parse_args() + if has_streamlit_context(): + main(parsed_args) + else: + launch_report_viewer(parsed_args.report, parsed_args.port) diff --git a/trinity/perf/resource_backends.py b/trinity/perf/resource_backends.py new file mode 100644 index 00000000000..817a2fd6f86 --- /dev/null +++ b/trinity/perf/resource_backends.py @@ -0,0 +1,162 @@ +"""System resource collection backends for performance tooling.""" + +from __future__ import annotations + +import time +from dataclasses import asdict, dataclass +from typing import Iterable + +import psutil +from pynvml import ( + NVMLError, + nvmlDeviceGetCount, + nvmlDeviceGetHandleByIndex, + nvmlDeviceGetMemoryInfo, + nvmlDeviceGetName, + nvmlDeviceGetUtilizationRates, + nvmlInit, + nvmlShutdown, +) + + +@dataclass +class GPUSample: + """One GPU sample at one point in time.""" + + gpu_id: int + name: str + gpu_util_percent: float + gpu_memory_used_mb: float + gpu_memory_total_mb: float + + def to_dict(self) -> dict: + """Serialize the GPU sample to a dictionary.""" + return asdict(self) + + +@dataclass +class ResourceSample: + """One system resource sample at one point in time.""" + + timestamp: float + cpu_percent: float + memory_rss_mb: float + memory_percent: float + gpu_metrics: list[GPUSample] + + def to_dict(self) -> dict: + """Serialize the resource sample to a dictionary.""" + payload = asdict(self) + payload["gpu_metrics"] = [gpu_sample.to_dict() for gpu_sample in self.gpu_metrics] + return payload + + +class SystemResourceBackend: + """Collect system-level CPU, memory and per-GPU metrics.""" + + def __init__( + self, + gpu_subsample_count: int = 5, + gpu_subsample_interval_seconds: float = 0.2, + ) -> None: + if gpu_subsample_count <= 0: + raise ValueError("gpu_subsample_count must be greater than 0.") + if gpu_subsample_interval_seconds < 0: + raise ValueError("gpu_subsample_interval_seconds must be non-negative.") + self._process = psutil.Process() + self._initialized = False + self._gpu_count = 0 + self._gpu_subsample_count = gpu_subsample_count + self._gpu_subsample_interval_seconds = gpu_subsample_interval_seconds + + def open(self) -> None: + """Initialize the GPU management library and validate the environment.""" + if self._initialized: + return + try: + nvmlInit() + self._gpu_count = nvmlDeviceGetCount() + except NVMLError as error: + raise RuntimeError(f"Failed to initialize NVML: {error}") from error + if self._gpu_count <= 0: + self.close() + raise RuntimeError("No GPU devices detected by NVML.") + + self._process.cpu_percent(interval=None) + self._initialized = True + + def close(self) -> None: + """Release NVML resources.""" + if not self._initialized: + return + try: + nvmlShutdown() + except NVMLError: + pass + self._initialized = False + self._gpu_count = 0 + + def sample(self) -> ResourceSample: + """Collect one resource sample.""" + if not self._initialized: + raise RuntimeError("SystemResourceBackend must be opened before sampling.") + + timestamp = time.time() + memory_info = self._process.memory_info() + gpu_metrics = self._collect_gpu_metrics() + for _ in range(1, self._gpu_subsample_count): + if self._gpu_subsample_interval_seconds > 0: + time.sleep(self._gpu_subsample_interval_seconds) + gpu_metrics = self._merge_gpu_metrics(gpu_metrics, self._collect_gpu_metrics()) + + return ResourceSample( + timestamp=timestamp, + cpu_percent=float(self._process.cpu_percent(interval=None)), + memory_rss_mb=float(memory_info.rss) / (1024 * 1024), + memory_percent=float(self._process.memory_percent()), + gpu_metrics=gpu_metrics, + ) + + def _collect_gpu_metrics(self) -> list[GPUSample]: + """Collect one instantaneous GPU snapshot from NVML.""" + gpu_metrics: list[GPUSample] = [] + for gpu_index in range(self._gpu_count): + gpu_handle = nvmlDeviceGetHandleByIndex(gpu_index) + utilization = nvmlDeviceGetUtilizationRates(gpu_handle) + gpu_memory = nvmlDeviceGetMemoryInfo(gpu_handle) + gpu_name = nvmlDeviceGetName(gpu_handle) + if isinstance(gpu_name, bytes): + gpu_name = gpu_name.decode("utf-8") + gpu_metrics.append( + GPUSample( + gpu_id=gpu_index, + name=str(gpu_name), + gpu_util_percent=float(utilization.gpu), + gpu_memory_used_mb=float(gpu_memory.used) / (1024 * 1024), + gpu_memory_total_mb=float(gpu_memory.total) / (1024 * 1024), + ) + ) + return gpu_metrics + + def _merge_gpu_metrics( + self, + base_metrics: Iterable[GPUSample], + next_metrics: Iterable[GPUSample], + ) -> list[GPUSample]: + """Merge GPU snapshots by keeping peak utilization and memory usage.""" + merged_metrics = {gpu_metric.gpu_id: gpu_metric for gpu_metric in base_metrics} + for gpu_metric in next_metrics: + prior_metric = merged_metrics.get(gpu_metric.gpu_id) + if prior_metric is None: + merged_metrics[gpu_metric.gpu_id] = gpu_metric + continue + merged_metrics[gpu_metric.gpu_id] = GPUSample( + gpu_id=gpu_metric.gpu_id, + name=gpu_metric.name, + gpu_util_percent=max(prior_metric.gpu_util_percent, gpu_metric.gpu_util_percent), + gpu_memory_used_mb=max( + prior_metric.gpu_memory_used_mb, gpu_metric.gpu_memory_used_mb + ), + gpu_memory_total_mb=gpu_metric.gpu_memory_total_mb, + ) + return [merged_metrics[gpu_index] for gpu_index in sorted(merged_metrics)] diff --git a/trinity/perf/resource_sampler.py b/trinity/perf/resource_sampler.py new file mode 100644 index 00000000000..c6ccd285c67 --- /dev/null +++ b/trinity/perf/resource_sampler.py @@ -0,0 +1,69 @@ +"""Threaded resource sampler for performance tooling.""" + +from __future__ import annotations + +import threading +import time +from typing import Optional + +from trinity.perf.resource_backends import ResourceSample, SystemResourceBackend + + +class ResourceSampler: + """Periodically collect system resource samples in a background thread.""" + + def __init__( + self, + interval_seconds: float, + backend: Optional[SystemResourceBackend] = None, + ) -> None: + if interval_seconds <= 0: + raise ValueError("interval_seconds must be greater than 0.") + self.interval_seconds = interval_seconds + self.backend = backend or SystemResourceBackend() + self._samples: list[ResourceSample] = [] + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._thread: Optional[threading.Thread] = None + self._started = False + + def start(self) -> None: + """Start sampling in the background.""" + if self._started: + raise RuntimeError("ResourceSampler has already been started.") + self.backend.open() + self._started = True + self._stop_event.clear() + self._thread = threading.Thread(target=self._run, name="resource-sampler", daemon=True) + self._thread.start() + + def stop(self) -> list[ResourceSample]: + """Stop sampling and return the collected samples.""" + if not self._started: + return self.samples() + self._stop_event.set() + if self._thread is not None: + self._thread.join() + self._thread = None + self.backend.close() + self._started = False + return self.samples() + + def samples(self) -> list[ResourceSample]: + """Return a snapshot of all collected samples.""" + with self._lock: + return list(self._samples) + + def _run(self) -> None: + next_sample_time = time.monotonic() + while not self._stop_event.is_set(): + self._collect_once() + next_sample_time += self.interval_seconds + remaining_time = max(0.0, next_sample_time - time.monotonic()) + if self._stop_event.wait(remaining_time): + break + + def _collect_once(self) -> None: + sample = self.backend.sample() + with self._lock: + self._samples.append(sample) diff --git a/trinity/perf/stage_perf.py b/trinity/perf/stage_perf.py new file mode 100644 index 00000000000..7185c26ebaa --- /dev/null +++ b/trinity/perf/stage_perf.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import json +import os +import socket +import time +import traceback +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Optional + +import ray + +from trinity.buffer.pipelines.task_pipeline import check_and_run_task_pipeline +from trinity.common.config import Config, load_config +from trinity.perf.resource_sampler import ResourceSampler +from trinity.perf.tensorboard_metrics import ( + TensorBoardScalarReader, + collect_step_metrics, +) +from trinity.utils.plugin_loader import load_plugins + + +@dataclass(slots=True) +class ExplorerPerfOptions: + config_path: str + output_path: str + monitor_interval: float = 2.0 + total_steps: int = 5 + timeout: Optional[float] = None + + +def validate_explorer_perf_config(config: Config) -> None: + """Validate perf-specific config constraints.""" + if config.mode != "explore": + raise ValueError(f"Explorer perf requires mode 'explore', got '{config.mode}'.") + + +def build_explorer_perf_payload( + *, + config: Optional[Config], + options: ExplorerPerfOptions, + startup_time_sec: Optional[float], + execution_time_sec: Optional[float], + total_time_sec: Optional[float], + resource_payload: dict[str, Any], + step_metrics: list[dict[str, Any]], + success: bool, + error: Optional[str], +) -> dict[str, Any]: + """Assemble the final JSON payload.""" + artifacts = {} + explorer_name = None + if config is not None: + explorer_name = config.explorer.name + artifacts = { + "checkpoint_job_dir": config.checkpoint_job_dir, + "tensorboard_dir": os.path.join( + config.monitor.cache_dir, "tensorboard", config.explorer.name + ), + "log_dir": config.log.save_dir, + "output_json": options.output_path, + } + + return { + "run_meta": { + "module": "explorer", + "config_path": options.config_path, + "explorer_name": explorer_name, + "monitor_interval_sec": options.monitor_interval, + "hostname": socket.gethostname(), + "pid": os.getpid(), + "generated_at": time.time(), + }, + "timing": { + "startup_time_sec": startup_time_sec, + "execution_time_sec": execution_time_sec, + "total_time_sec": total_time_sec, + }, + **resource_payload, + "step_metrics": step_metrics, + "artifacts": artifacts, + "status": { + "success": success, + "error": error, + "gpu_metrics_available": bool(resource_payload.get("resource_timeline")), + "tensorboard_metrics_available": bool(step_metrics), + }, + } + + +def write_explorer_perf_output(output_path: str, payload: dict[str, Any]) -> None: + """Write the final payload to disk.""" + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + output_file.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + +def run_explorer_perf(options: ExplorerPerfOptions) -> dict[str, Any]: + """Run Explorer perf collection and return the result payload.""" + from trinity.cli.launcher import explore + + load_plugins() + config: Optional[Config] = None + sampler: Optional[ResourceSampler] = None + error: Optional[str] = None + startup_time_sec: Optional[float] = None + execution_time_sec: Optional[float] = None + total_time_sec: Optional[float] = None + resource_payload: dict[str, Any] = {"resource_timeline": []} + step_metrics: list[dict[str, Any]] = [] + + try: + config = load_config(options.config_path) + config.buffer.total_steps = options.total_steps + config.monitor.monitor_type = "tensorboard" + config.continue_from_checkpoint = False # ensure we start fresh for perf testing + validate_explorer_perf_config(config) + config.check_and_update() + + ray.init( + address=config.cluster.ray_address, + ignore_reinit_error=True, + namespace=config.ray_namespace, + runtime_env={"env_vars": config.get_envs()}, + ) + check_and_run_task_pipeline(config) + + sampler = ResourceSampler(interval_seconds=options.monitor_interval) + sampler.start() + + stage_status = explore(config, timeout=options.timeout) + startup_time_sec = stage_status.startup_time_sec + execution_time_sec = stage_status.execution_time_sec + total_time_sec = stage_status.total_time_sec + if stage_status.error is not None: + error = stage_status.error.traceback_text + except (RuntimeError, ValueError) as e: + error = traceback.format_exc() + print(f"Explorer perf failed with error: {e}\n{error}") + raise e + finally: + collected_samples = sampler.stop() if sampler is not None else [] + resource_payload = {"resource_timeline": [sample.to_dict() for sample in collected_samples]} + + if config is not None: + tensorboard_dir = os.path.join( + config.monitor.cache_dir, "tensorboard", config.explorer.name + ) + if os.path.isdir(tensorboard_dir): + scalar_reader = TensorBoardScalarReader(tensorboard_dir) + step_metrics = collect_step_metrics(scalar_reader.metrics) + + if ray.is_initialized(): + ray.shutdown() + + return build_explorer_perf_payload( + config=config, + options=options, + startup_time_sec=startup_time_sec, + execution_time_sec=execution_time_sec, + total_time_sec=total_time_sec, + resource_payload=resource_payload, + step_metrics=step_metrics, + success=error is None, + error=error, + ) diff --git a/trinity/perf/tensorboard_metrics.py b/trinity/perf/tensorboard_metrics.py new file mode 100644 index 00000000000..2fb69d3bd99 --- /dev/null +++ b/trinity/perf/tensorboard_metrics.py @@ -0,0 +1,63 @@ +"""Helpers for TensorBoard metric parsing and aggregation.""" + +from __future__ import annotations + +import os +from collections import defaultdict +from typing import Any + +from tensorboard.backend.event_processing.event_accumulator import EventAccumulator + +TASK_EXECUTION_METRIC_NAME = "rollout/time/task_execution/mean" +RUN_EXECUTION_METRIC_NAME = "rollout/time/run_execution/mean" +FINISHED_TASK_METRIC_NAME = "rollout/finished_task_count" + + +class TensorBoardScalarReader: + """Read scalar metrics from TensorBoard event files.""" + + def __init__(self, log_dir: str): + self.log_dir = log_dir + self.metrics = self._load_metrics(log_dir) + + def _load_metrics(self, log_dir: str) -> dict[str, dict[int, float]]: + metric_map: dict[str, dict[int, float]] = defaultdict(dict) + for event_file in self._find_event_files(log_dir): + accumulator = EventAccumulator(event_file) + accumulator.Reload() + for tag in accumulator.Tags().get("scalars", []): + for scalar in accumulator.Scalars(tag): + prior_value = metric_map[tag].get(scalar.step) + if prior_value is None or scalar.value > prior_value: + metric_map[tag][scalar.step] = scalar.value + return dict(metric_map) + + def _find_event_files(self, log_dir: str) -> list[str]: + event_files: list[str] = [] + for root, _, files in os.walk(log_dir): + for file_name in files: + if file_name.startswith("events.out.tfevents."): + event_files.append(os.path.join(root, file_name)) + return sorted(event_files) + + +def extract_raw_metrics_for_step( + metric_map: dict[str, dict[int, float]], step: int +) -> dict[str, float]: + """Extract all scalar metrics that were logged for one step.""" + return { + metric_name: float(step_values[step]) + for metric_name, step_values in metric_map.items() + if step in step_values + } + + +def collect_step_metrics(metric_map: dict[str, dict[int, float]]) -> list[dict[str, Any]]: + """Build per-step metrics from TensorBoard scalars.""" + step_numbers = sorted(metric_map.get(FINISHED_TASK_METRIC_NAME, {}).keys()) + step_metrics: list[dict[str, Any]] = [] + for step in step_numbers: + metrics = extract_raw_metrics_for_step(metric_map, step) + metrics["step"] = step + step_metrics.append(metrics) + return step_metrics