From 41d4207df9e8a6410c5e1421e5c8b2e607f4d520 Mon Sep 17 00:00:00 2001 From: umaru Date: Sat, 30 May 2026 11:04:59 +0800 Subject: [PATCH 01/12] =?UTF-8?q?docs(openspec):=20=E6=96=B0=E5=A2=9E=20li?= =?UTF-8?q?ve-pulse-bar=20=E5=8F=98=E6=9B=B4=E6=8F=90=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实时脉搏状态条:在所有管理页顶栏常驻一条秒级运行状态条, 复用既有进程内 pub/sub + SSE + 三态降级链路。指标为网关增强版 (滚动 60 秒 req/min、错误率、平均延迟、TPM,外加健康上游数/总数 与熔断打开数)。含 proposal/design/specs/tasks 四件工件。 --- .../changes/live-pulse-bar/.openspec.yaml | 2 + openspec/changes/live-pulse-bar/design.md | 115 ++++++++++++++++++ openspec/changes/live-pulse-bar/proposal.md | 35 ++++++ .../specs/live-pulse-bar/spec.md | 90 ++++++++++++++ openspec/changes/live-pulse-bar/tasks.md | 45 +++++++ 5 files changed, 287 insertions(+) create mode 100644 openspec/changes/live-pulse-bar/.openspec.yaml create mode 100644 openspec/changes/live-pulse-bar/design.md create mode 100644 openspec/changes/live-pulse-bar/proposal.md create mode 100644 openspec/changes/live-pulse-bar/specs/live-pulse-bar/spec.md create mode 100644 openspec/changes/live-pulse-bar/tasks.md diff --git a/openspec/changes/live-pulse-bar/.openspec.yaml b/openspec/changes/live-pulse-bar/.openspec.yaml new file mode 100644 index 00000000..b44fb0ec --- /dev/null +++ b/openspec/changes/live-pulse-bar/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-05-30 diff --git a/openspec/changes/live-pulse-bar/design.md b/openspec/changes/live-pulse-bar/design.md new file mode 100644 index 00000000..97ebe95d --- /dev/null +++ b/openspec/changes/live-pulse-bar/design.md @@ -0,0 +1,115 @@ +## Context + +AutoRouter 已有一套进程内实时链路,本变更在其之上叠加一个面向「运行健康概览」的实时脉搏视图,而不是新造一套推送机制。现状如下: + +``` +请求收口 (request-logger.ts) + └─ notifyRequestLogChange(logEntry) // 完整请求行:statusCode/durationMs/totalTokens/upstreamId + └─ publishRequestLogLiveUpdate(event) // 进程内发布,当前仅转发 logId + statusCode + └─ subscribeRequestLogLiveUpdates // 订阅者 + └─ /api/admin/logs/live (SSE) // 仅推送 request-log-changed,供日志页失效查询 + └─ use-request-log-live // 前端 SSE 客户端,三态 + 降级轮询 +``` + +现有仪表盘 `StatsCards` 走 `/admin/stats/overview`,按「今日/昨日」DB 聚合,60 秒刷新,粒度粗、非实时。`Topbar` 组件当前仅渲染标题,右侧整片空白,且为 `hidden md:block`(移动端隐藏)。全部 11 个管理页各自渲染 ``,因此把状态条做进 `Topbar` 即可全局常驻。 + +网关健康信号的现成来源:`health-checker.ts` 的 `getAllHealthStatusWithCircuitBreaker()` 同时返回上游健康状态与熔断状态,可一次性拼出「健康上游数/总数」与「熔断打开数」。 + +## Goals / Non-Goals + +**Goals:** + +- 在所有管理页顶栏常驻一条秒级实时运行状态条,复用既有 pub/sub + SSE + 三态降级语义。 +- 指标为网关增强版:滚动 60 秒窗口的 req/min、错误率、平均延迟、TPM,外加健康上游数/总数与熔断打开数。 +- 服务端滚动窗口聚合,避免前端在页面加载后从零累积窗口;DB 负载尽量低。 +- 移动端提供紧凑形态,桌面顶栏隐藏时仍可见核心状态。 + +**Non-Goals:** + +- 不替换或改写既有 `StatsCards`「今日/昨日」汇总视图。 +- 不改动 `/api/admin/logs/live` 既有事件契约与日志页行为。 +- 不引入跨实例共享状态(Redis 等);滚动窗口为进程内内存状态,沿用既有进程内实时链路的单实例语义。 +- 不做历史回放或持久化;脉搏只反映「当下最近 60 秒」。 + +## Decisions + +### 决策 1:服务端滚动窗口聚合,而非前端累积 + +新增 `live-pulse-aggregator.ts`,在内存中维护最近 60 秒的时间分桶计数(按秒分桶的环形结构)。每个桶累计:请求数、非 2xx 数、成功请求延迟之和与成功请求数、token 总量。读取快照时把窗口内各桶合并换算为 req/min、错误率、平均延迟、TPM。 + +为什么选服务端:前端累积方案在页面刚加载时窗口为空,req/min 需要 60 秒才能爬满,且只反映「连接后」的流量,失真明显。服务端聚合在任意时刻读到的都是真实最近 60 秒。 + +考虑过的替代方案:每隔 2~3 秒查一次 `requestLogs` 最近 60 秒(DB 聚合)。准确但每个在线管理员每数秒触发一次聚合查询,DB 负载随在线人数线性增长;内存环形桶则与查询人数无关,且与现有进程内 pub/sub 模型同构。故采用内存聚合。 + +### 决策 2:取样源接入请求收口发布点 + +聚合器订阅既有 `subscribeRequestLogLiveUpdates`,但当前事件只带 `logId + statusCode`,缺 `durationMs/totalTokens/upstreamId`。两种接法: + +- 方案 A:扩展 `RequestLogLiveUpdate` 事件,附带 `durationMs/totalTokens` 等字段,聚合器从订阅回调取样。 +- 方案 B:在 `request-logger.ts` 收口路径直接调用 `recordPulseSample(...)`,与 pub/sub 解耦。 + +采用方案 B。脉搏取样只需在「请求收口为终态」这一刻发生(`durationMs/totalTokens` 此时才确定),而 `notifyRequestLogChange` 还会在请求创建(进行中、无耗时/ token)时触发;复用同一事件会引入「进行中样本」噪声,需要额外过滤。直接在收口路径显式取样语义更清晰,也避免改动既有事件契约。`recordPulseSample` 仅接收收口终态样本。 + +### 决策 3:独立 SSE 端点 `/api/admin/stats/live` + +新增 `/api/admin/stats/live`,鉴权与 `/api/admin/logs/live` 一致(`validateAdminAuth` + `ADMIN_TOKEN` Bearer)。连接建立后立即推送一帧快照,随后每约 2 秒推送一帧 `live-pulse` 事件;保留 15 秒心跳注释行。 + +为什么独立端点而非复用 logs/live:脉搏快照是「定时拼装的聚合帧」,与 logs/live 的「按请求变更触发」语义不同;混用会让日志页接收无关 `live-pulse` 帧、脉搏页接收无关 `request-log-changed` 帧。独立端点与现有「一个关注点一个端点」的目录结构一致(`/admin/logs/live`、`/admin/stats/*`)。 + +快照拼装时读取 `getAllHealthStatusWithCircuitBreaker()` 得到健康/熔断信号,与滚动窗口指标合并为一帧。 + +### 决策 4:前端复用三态与降级,新增 `use-live-pulse` + +新增 `use-live-pulse.ts`,结构对齐 `use-request-log-live.ts`:连接 `/api/admin/stats/live`,解析 `live-pulse` 事件,维护快照状态与 `connecting/live/fallback` 三态。降级时改用对同一端点的快照拉取(或退化为定时请求一次性快照接口),保证指标持续更新。 + +状态条组件 `live-pulse-bar.tsx` 消费该 hook,纯展示。`Topbar` 扩展为可选承载状态条,所有页面经 `Topbar` 自动获得。 + +### 决策 5:放置与响应式布局 + +桌面端把状态条挂在 `Topbar` 右侧(`justify-between` 的右栏)。移动端 `Topbar` 为 `hidden md:block`,因此状态条在移动端走紧凑形态:仅呈现在线指示灯 + req/min + 错误率,挂在移动端可见的页头区域,不挤压标题与返回导航。 + +桌面顶栏布局示意: + +``` +┌────────────────────────────────────────────────────────────────────────┐ +│ >> DASHBOARD ● Live 128 req/min · 0.4% err · 842ms · 1.2M TPM │ +│ ▣ 8/9 上游健康 · ⚡ 1 熔断打开 │ +├────────────────────────────────────────────────────────────────────────┤ +│ 今日请求 │ 平均响应 │ Token │ 成本 ... (StatsCards 区域不变) │ +└────────────────────────────────────────────────────────────────────────┘ +``` + +移动端紧凑形态示意(顶部窄条): + +``` +┌──────────────────────────────┐ +│ ● Live 128 req/min 0.4% err│ +└──────────────────────────────┘ +``` + +指示灯与状态色映射: + +| 连接态 | 指示灯 | 含义 | +|---|---|---| +| `live` | 绿色常亮/脉动 | SSE 在线,快照随推送更新 | +| `connecting` | 灰色 | 正在建立连接 | +| `fallback` | 琥珀色 | 已降级为定时拉取 | + +错误率与健康信号的强调色:错误率超过阈值(例如 >5%)时以错误色强调;熔断打开数 >0 时以警示色强调,与既有日志页错误强调样式风格一致。 + +## Risks / Trade-offs + +- [进程内内存状态,多实例下各自为政] → 沿用既有进程内 SSE 链路的现状约束;在 proposal/design 中显式声明按单实例部署语义工作,多实例场景不在本变更范围。 +- [进程重启后窗口清空,req/min 短暂从低值爬升] → 可接受:脉搏本就只反映「最近 60 秒」,重启后 60 秒内自然恢复;状态条无流量时显示零值而非报错。 +- [每个在线管理员各持一条 SSE 连接,约 2 秒一帧] → 帧体小(数百字节级),且快照在服务端按需拼装;健康信号读取走既有 `getAllHealthStatusWithCircuitBreaker`,必要时对快照拼装结果做短 TTL 缓存以避免高频读取放大。 +- [取样点遗漏导致脉搏与日志数据不一致] → 取样统一接入请求收口路径这一唯一终态写入点,并以单元测试覆盖「过期样本移出窗口」「错误率只算非 2xx」「平均延迟只算成功请求」等场景,保证聚合口径与 spec 一致。 +- [移动端空间紧张] → 紧凑形态仅保留在线指示与最关键的 req/min、错误率,避免与标题、返回导航争抢空间。 + +## Migration Plan + +纯增量特性,无数据迁移。新增端点与组件,默认在所有管理页生效。回滚方式为移除 `Topbar` 中的状态条挂载点与新增端点/服务文件,不影响既有统计与日志链路。 + +## Open Questions + +- 错误率/熔断的强调色阈值取值(如错误率 5%)需在实现时对照现有设计 token 与既有强调样式确定,默认采用与日志页错误强调一致的色板。 +- 移动端紧凑形态的具体挂载位置(复用 `layout.tsx` 中既有移动端 `header`,还是独立窄条)在实现时按视觉协调度确定,spec 仅约束「核心状态可见且不挤压标题与导航」。 diff --git a/openspec/changes/live-pulse-bar/proposal.md b/openspec/changes/live-pulse-bar/proposal.md new file mode 100644 index 00000000..b4986d0b --- /dev/null +++ b/openspec/changes/live-pulse-bar/proposal.md @@ -0,0 +1,35 @@ +## Why + +当前仪表盘的 `StatsCards` 只展示「今日 vs 昨日」的粗粒度汇总,刷新间隔为 60 秒,管理员无法一眼看到网关此刻的运行状态。当上游突发抖动、错误率上升或熔断器打开时,需要切换到日志页或时序图才能察觉,缺少一个常驻、秒级、跨页面可见的运行健康概览。 + +AutoRouter 本身已经具备进程内实时发布订阅(`request-log-live-updates.ts`)、SSE 推送端点(`/api/admin/logs/live`)以及带降级轮询的前端实时客户端(`use-request-log-live.ts`)。在这套既有基础设施之上,可以低成本地提供一条始终可见的实时运行状态条,并结合 AutoRouter 作为多上游网关独有的健康信号(上游健康度、熔断器状态),形成区别于通用中转站的网关运行脉搏视图。 + +## What Changes + +- 新增「实时脉搏状态条」(Live Pulse Bar),常驻在所有管理页 `Topbar` 的右侧(移动端提供紧凑形态)。 +- 新增服务端滚动窗口聚合器:以最近 60 秒为窗口,对已收口请求按时间分桶累计请求数、非 2xx 错误数、成功请求延迟、token 总量;从 `request-logger` 的请求收口发布点取样(携带 `durationMs`、`totalTokens`、`statusCode`、`upstreamId`)。 +- 新增 SSE 推送通道,周期性向已认证管理员推送实时脉搏快照(滚动窗口的 req/min、错误率、平均延迟、TPM),并附带网关健康信号「健康上游数 / 上游总数」与「熔断打开数」。 +- 新增前端实时脉搏客户端与状态条组件,复用现有 `connecting / live / fallback` 三态语义:SSE 不可用时自动降级为定时拉取,连接状态以指示灯形式呈现。 +- 扩展 `Topbar` 组件,使其右侧可承载状态条;所有管理页因复用 `Topbar` 自动获得该状态条。 +- 新增中英文文案(`messages/en.json`、`messages/zh.json`)。 + +无破坏性变更:不改动既有 `/api/admin/logs/live` 的事件契约,不改动既有统计接口的返回结构。 + +## Capabilities + +### New Capabilities + +- `live-pulse-bar`: 实时脉搏状态条能力。覆盖滚动窗口运行指标的服务端聚合与取样、实时快照的 SSE 推送与降级拉取、网关健康信号(上游健康度与熔断状态)的纳入,以及跨管理页常驻的状态条展示与连接状态指示。 + +### Modified Capabilities + +无。本变更复用既有进程内实时发布订阅机制属于实现层面的复用,不改变 `request-log-live-status` 已定义的需求与场景,因此不产生 spec 层面的契约变更。 + +## Impact + +- 服务层:新增滚动窗口聚合服务(取样源为 `src/lib/services/request-logger.ts` 的请求收口路径);读取 `circuit-breaker.ts` 与 `health-checker.ts` 的上游健康与熔断状态用于快照拼装。 +- API 层:新增管理端 SSE 端点用于推送实时脉搏快照(鉴权方式与现有 `/api/admin/logs/live` 一致,使用 `ADMIN_TOKEN` Bearer)。 +- 前端:新增实时脉搏 hook 与状态条组件;扩展 `src/components/admin/topbar.tsx` 承载状态条;移动端 `Topbar` 隐藏时提供紧凑展示路径。 +- 文案:新增 `messages/{en,zh}.json` 中实时脉搏相关键。 +- 测试:新增滚动窗口聚合器单元测试与快照拼装单元测试。 +- 部署假设:滚动窗口为进程内内存状态,与既有进程内 SSE 推送机制一致,按单实例部署(docker-compose 单容器)语义工作;多实例下各实例反映自身流量,此约束沿用既有实时链路的现状。 diff --git a/openspec/changes/live-pulse-bar/specs/live-pulse-bar/spec.md b/openspec/changes/live-pulse-bar/specs/live-pulse-bar/spec.md new file mode 100644 index 00000000..4c473467 --- /dev/null +++ b/openspec/changes/live-pulse-bar/specs/live-pulse-bar/spec.md @@ -0,0 +1,90 @@ +## ADDED Requirements + +### Requirement: 管理页必须常驻展示实时脉搏状态条 + +系统 MUST 在所有管理页的顶栏常驻展示一条实时脉搏状态条,使管理员无需进入任何子页面即可看到网关当前的运行状态概览。状态条 MUST 至少包含滚动窗口的每分钟请求数、错误率、平均延迟、每分钟 token 量,以及网关健康信号(健康上游数与上游总数、熔断打开数)。 + +#### Scenario: 桌面端各管理页常驻可见 + +- **WHEN** 管理员在桌面端浏览仪表盘、密钥、上游、日志、设置或系统类任意管理页 +- **THEN** 顶栏 SHALL 始终展示实时脉搏状态条 +- **AND** 状态条 SHALL 同时呈现请求速率、错误率、平均延迟、每分钟 token 量与网关健康信号 + +#### Scenario: 暂无流量时显示零值而非空白 + +- **WHEN** 最近 60 秒内没有任何已收口请求 +- **THEN** 状态条 SHALL 将请求速率、错误率、平均延迟、每分钟 token 量显示为零值 +- **AND** 不得显示为空白、占位符 `-` 或加载中停滞状态 + +### Requirement: 实时脉搏指标必须基于最近 60 秒滚动窗口聚合 + +系统 MUST 以最近 60 秒为滚动窗口聚合运行指标。请求速率 MUST 按窗口内已收口请求数换算为每分钟值;错误率 MUST 为窗口内非 2xx 请求占比;平均延迟 MUST 为窗口内成功请求的平均处理耗时;每分钟 token 量 MUST 按窗口内 token 总量换算为每分钟值。窗口外的样本 MUST 被排除出聚合结果。 + +#### Scenario: 过期样本被移出窗口 + +- **WHEN** 某个请求样本的发生时间已超过当前时刻 60 秒 +- **THEN** 聚合结果 SHALL 不再计入该样本 +- **AND** 请求速率、错误率、平均延迟与每分钟 token 量 SHALL 仅反映窗口内样本 + +#### Scenario: 错误率只统计非 2xx + +- **WHEN** 窗口内既有 2xx 成功请求也有非 2xx 失败请求 +- **THEN** 错误率 SHALL 等于非 2xx 请求数除以窗口内总请求数 +- **AND** 平均延迟 SHALL 仅基于成功请求的处理耗时计算 + +#### Scenario: 取样来源为请求收口路径 + +- **WHEN** 某个请求在网关侧收口并写入终态(含状态码、处理耗时与 token 用量) +- **THEN** 系统 SHALL 以该请求的状态码、处理耗时、token 用量与上游标识生成一个滚动窗口样本 + +### Requirement: 实时脉搏状态条必须纳入网关健康信号 + +系统 MUST 在实时脉搏快照中纳入 AutoRouter 作为多上游网关独有的健康信号,至少包含当前健康上游数与上游总数、以及处于熔断打开状态的上游数量。健康与熔断状态 MUST 与系统内真实的健康检查与熔断器状态保持一致。 + +#### Scenario: 熔断打开数与真实状态一致 + +- **WHEN** 存在若干上游的熔断器处于打开状态 +- **THEN** 状态条展示的熔断打开数 SHALL 等于真实处于打开状态的上游数量 +- **AND** 不得把打开或半开状态统一显示为关闭 + +#### Scenario: 健康上游比例反映真实健康检查结果 + +- **WHEN** 系统中部分上游被健康检查标记为不健康 +- **THEN** 状态条展示的健康上游数 SHALL 等于真实健康上游数量 +- **AND** 上游总数 SHALL 等于参与统计的上游数量 + +### Requirement: 实时链路必须复用三态语义并在不可用时降级 + +系统 MUST 复用既有实时链路的 `connecting / live / fallback` 三态语义为实时脉搏提供推送。实时推送可用时,状态条 MUST 以指示灯形式呈现在线状态并随推送更新;实时推送不可用或中断时,系统 MUST 自动降级为定时拉取,保证指标在合理时间内持续更新,而不是长期停滞。 + +#### Scenario: 实时推送可用时呈现在线指示 + +- **WHEN** 实时脉搏推送链路连接成功并持续推送快照 +- **THEN** 状态条 SHALL 显示在线指示灯 +- **AND** 指标 SHALL 随推送快照更新 + +#### Scenario: 推送中断时自动降级拉取 + +- **WHEN** 实时脉搏推送链路暂时不可用或中断 +- **THEN** 系统 SHALL 自动回退到定时拉取机制 +- **AND** 指标 SHALL 在合理时间内继续更新,不得长期停滞在旧值 + +### Requirement: 实时脉搏端点必须鉴权 + +系统 MUST 对实时脉搏的推送与拉取端点执行管理员鉴权,鉴权方式与既有管理端实时端点一致。未携带或携带无效管理员凭据的请求 MUST 被拒绝,不得返回任何运行指标。 + +#### Scenario: 缺失或无效凭据被拒绝 + +- **WHEN** 请求未携带有效的管理员凭据访问实时脉搏端点 +- **THEN** 系统 SHALL 返回未授权响应 +- **AND** 不得返回任何实时运行指标数据 + +### Requirement: 移动端必须提供紧凑展示形态 + +系统 MUST 在桌面顶栏不展示的移动端视口下,为实时脉搏提供紧凑展示形态,使管理员在移动端仍能看到核心运行状态,且不挤压页面标题与既有导航。 + +#### Scenario: 移动端展示紧凑核心指标 + +- **WHEN** 管理员在移动端视口浏览管理页 +- **THEN** 系统 SHALL 以紧凑形态展示实时脉搏的核心状态(至少包含在线指示与请求速率、错误率) +- **AND** 紧凑形态 SHALL 不遮挡或挤压页面标题与既有导航元素 diff --git a/openspec/changes/live-pulse-bar/tasks.md b/openspec/changes/live-pulse-bar/tasks.md new file mode 100644 index 00000000..249b0085 --- /dev/null +++ b/openspec/changes/live-pulse-bar/tasks.md @@ -0,0 +1,45 @@ +## 1. 服务端滚动窗口聚合器 + +- [ ] 1.1 新增 `src/lib/services/live-pulse-aggregator.ts`,实现按秒分桶的 60 秒环形窗口:导出 `recordPulseSample({ statusCode, durationMs, totalTokens, occurredAt? })` 与 `getPulseWindowSnapshot()`;快照换算 req/min、错误率(非 2xx 占比)、平均延迟(仅成功请求)、TPM。 +- [ ] 1.2 实现过期桶清理逻辑:读取与写入时均剔除超过 60 秒的样本,保证窗口外样本不计入。 +- [ ] 1.3 新增 `tests/live-pulse-aggregator.test.ts`,覆盖:窗口内聚合正确、过期样本移出窗口、错误率只统计非 2xx、平均延迟只算成功请求、无样本时返回零值快照。 +- [ ] 1.4 运行 `pnpm test:run` 相关用例与 `pnpm exec tsc --noEmit` 通过;提交本阶段。 + - 验收:聚合器单测全绿,类型检查通过;指标口径与 `specs/live-pulse-bar/spec.md` 中「滚动窗口」需求一致。 + +## 2. 取样接入与快照拼装(含网关健康信号) + +- [ ] 2.1 在 `src/lib/services/request-logger.ts` 的请求收口路径(终态写入处)调用 `recordPulseSample(...)`,仅在请求收口为终态、`durationMs/totalTokens` 已确定时取样;不在请求创建(进行中)处取样。 +- [ ] 2.2 新增快照拼装函数(置于 `live-pulse-aggregator.ts` 或新增 `live-pulse-service.ts`),合并滚动窗口快照与 `getAllHealthStatusWithCircuitBreaker()` 得到的健康上游数/总数、熔断打开数,产出完整 `LivePulseSnapshot`;定义其 TypeScript 类型。 +- [ ] 2.3 新增快照拼装单元测试:健康/熔断计数与传入的健康检查结果一致(打开/半开不计为关闭);窗口指标与健康信号正确合并。 +- [ ] 2.4 运行相关测试与类型检查通过;提交本阶段。 + - 验收:取样仅发生在终态收口;快照拼装单测全绿;健康/熔断口径与 `routing-failover-observability` 既有真实状态一致。 + +## 3. 实时脉搏 SSE 端点 + +- [ ] 3.1 新增 `src/app/api/admin/stats/live/route.ts`,鉴权复用 `validateAdminAuth`(`ADMIN_TOKEN` Bearer),与 `/api/admin/logs/live` 一致;`runtime = "nodejs"`、`dynamic = "force-dynamic"`。 +- [ ] 3.2 连接建立即推送一帧 `live-pulse` 快照,随后约每 2 秒推送一帧;保留约 15 秒心跳注释行;正确清理定时器与中止监听,避免断开后写入。 +- [ ] 3.3 提供降级拉取路径:支持以普通 GET 返回一次性快照(供前端 fallback 使用),或在同端点区分一次性快照请求;缺失/无效凭据返回 401 且不泄露任何指标。 +- [ ] 3.4 运行 `pnpm lint` 与 `pnpm exec tsc --noEmit` 通过;提交本阶段。 + - 验收:未授权请求返回 401 且无指标数据;端点能稳定推送 `live-pulse` 帧并按断开清理资源。 + +## 4. 前端实时脉搏客户端与桌面状态条 + +- [ ] 4.1 新增 `src/hooks/use-live-pulse.ts`,结构对齐 `use-request-log-live.ts`:连接 `/api/admin/stats/live`,解析 `live-pulse` 事件,维护快照与 `connecting/live/fallback` 三态;断线降级为定时拉取一次性快照。 +- [ ] 4.2 新增 `src/components/admin/live-pulse-bar.tsx`,纯展示组件:呈现在线指示灯(按三态着色)、req/min、错误率、平均延迟、TPM、健康上游数/总数、熔断打开数;错误率超阈值与熔断打开数 >0 时按既有错误/警示样式强调。 +- [ ] 4.3 扩展 `src/components/admin/topbar.tsx`,在右栏承载状态条(桌面端);所有管理页因复用 `Topbar` 自动获得。 +- [ ] 4.4 在 `src/messages/en.json` 与 `src/messages/zh.json` 新增实时脉搏相关文案键(指标标签、在线/降级提示等)。 +- [ ] 4.5 运行 `pnpm lint`、`pnpm exec tsc --noEmit`、`pnpm format:check` 通过;提交本阶段。 + - 验收:桌面端各管理页顶栏常驻状态条,指标随推送更新,降级时指示灯转琥珀色并持续刷新;无流量时显示零值。 + +## 5. 移动端紧凑形态 + +- [ ] 5.1 为状态条提供紧凑形态(仅在线指示灯 + req/min + 错误率),在移动端 `Topbar` 隐藏时通过 `layout.tsx` 移动端页头区域或独立窄条展示,不挤压标题与返回导航。 +- [ ] 5.2 校验移动端视口下紧凑形态正确显示且不破坏既有移动端布局;运行 `pnpm lint`、`pnpm exec tsc --noEmit`、`pnpm format:check` 通过;提交本阶段。 + - 验收:移动端可见核心状态,标题与返回导航不被遮挡或挤压。 + +## 6. 校验与收尾 + +- [ ] 6.1 运行完整 `pnpm test:run` 与 `pnpm build`,确认无回归。 +- [ ] 6.2 运行 `npx openspec validate live-pulse-bar --strict` 通过;逐项核对 `tasks.md` 勾选完成。 +- [ ] 6.3 创建隔离功能分支并推送,开启 PR(遵循仓库 OpenSpec PR 工作流);不自行合并。 + - 验收:CI 通过,PR 已开启待评审;变更可按需归档。 From 8ae38973ff4ab1fafef5fc951f3d6421876fcc85 Mon Sep 17 00:00:00 2001 From: umaru Date: Sat, 30 May 2026 11:09:58 +0800 Subject: [PATCH 02/12] =?UTF-8?q?feat(live-pulse):=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E6=BB=9A=E5=8A=A8=E7=AA=97=E5=8F=A3=E8=81=9A=E5=90=88=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 按秒分桶维护最近 60 秒滚动窗口,导出 recordPulseSample 与 getPulseWindowSnapshot,换算 req/min、错误率(非 2xx 占比)、 平均延迟(仅成功请求)、TPM;读写时剔除窗口外样本。含单元测试。 --- openspec/changes/live-pulse-bar/tasks.md | 8 +- src/lib/services/live-pulse-aggregator.ts | 148 ++++++++++++++++++ .../services/live-pulse-aggregator.test.ts | 120 ++++++++++++++ 3 files changed, 272 insertions(+), 4 deletions(-) create mode 100644 src/lib/services/live-pulse-aggregator.ts create mode 100644 tests/unit/services/live-pulse-aggregator.test.ts diff --git a/openspec/changes/live-pulse-bar/tasks.md b/openspec/changes/live-pulse-bar/tasks.md index 249b0085..67937b23 100644 --- a/openspec/changes/live-pulse-bar/tasks.md +++ b/openspec/changes/live-pulse-bar/tasks.md @@ -1,9 +1,9 @@ ## 1. 服务端滚动窗口聚合器 -- [ ] 1.1 新增 `src/lib/services/live-pulse-aggregator.ts`,实现按秒分桶的 60 秒环形窗口:导出 `recordPulseSample({ statusCode, durationMs, totalTokens, occurredAt? })` 与 `getPulseWindowSnapshot()`;快照换算 req/min、错误率(非 2xx 占比)、平均延迟(仅成功请求)、TPM。 -- [ ] 1.2 实现过期桶清理逻辑:读取与写入时均剔除超过 60 秒的样本,保证窗口外样本不计入。 -- [ ] 1.3 新增 `tests/live-pulse-aggregator.test.ts`,覆盖:窗口内聚合正确、过期样本移出窗口、错误率只统计非 2xx、平均延迟只算成功请求、无样本时返回零值快照。 -- [ ] 1.4 运行 `pnpm test:run` 相关用例与 `pnpm exec tsc --noEmit` 通过;提交本阶段。 +- [x] 1.1 新增 `src/lib/services/live-pulse-aggregator.ts`,实现按秒分桶的 60 秒环形窗口:导出 `recordPulseSample({ statusCode, durationMs, totalTokens, occurredAt? })` 与 `getPulseWindowSnapshot()`;快照换算 req/min、错误率(非 2xx 占比)、平均延迟(仅成功请求)、TPM。 +- [x] 1.2 实现过期桶清理逻辑:读取与写入时均剔除超过 60 秒的样本,保证窗口外样本不计入。 +- [x] 1.3 新增 `tests/unit/services/live-pulse-aggregator.test.ts`,覆盖:窗口内聚合正确、过期样本移出窗口、错误率只统计非 2xx、平均延迟只算成功请求、无样本时返回零值快照。 +- [x] 1.4 运行 `pnpm test:run` 相关用例与 `pnpm exec tsc --noEmit` 通过;提交本阶段。 - 验收:聚合器单测全绿,类型检查通过;指标口径与 `specs/live-pulse-bar/spec.md` 中「滚动窗口」需求一致。 ## 2. 取样接入与快照拼装(含网关健康信号) diff --git a/src/lib/services/live-pulse-aggregator.ts b/src/lib/services/live-pulse-aggregator.ts new file mode 100644 index 00000000..23afff84 --- /dev/null +++ b/src/lib/services/live-pulse-aggregator.ts @@ -0,0 +1,148 @@ +/** + * In-process rolling-window aggregator for the live pulse status bar. + * + * Samples are taken when a request is finalized (terminal status with known + * duration and token usage). The window covers the most recent 60 seconds and + * is kept as per-second buckets so pruning expired samples is cheap. The window + * is exactly 60 seconds wide, so raw counts over the window equal per-minute + * rates without any extra scaling. + * + * This is intentionally process-local, mirroring the existing in-process + * request-log live updates: each instance reflects its own traffic. + */ + +const WINDOW_SECONDS = 60; +const WINDOW_MS = WINDOW_SECONDS * 1000; + +export interface PulseSample { + /** Terminal HTTP status code; null is treated as a non-2xx (error) sample. */ + statusCode: number | null; + /** Total processing duration in milliseconds; only counted for successful requests. */ + durationMs: number | null; + /** Total token usage for the request. */ + totalTokens: number | null; + /** Epoch milliseconds when the request finalized. Defaults to now. */ + occurredAt?: number; +} + +export interface PulseWindowSnapshot { + /** Requests finalized within the last 60 seconds (equals requests per minute). */ + requestsPerMinute: number; + /** Share of non-2xx requests within the window, as a percentage (0-100). */ + errorRatePct: number; + /** Average processing duration of successful requests in the window, in milliseconds. */ + avgLatencyMs: number; + /** Token usage finalized within the last 60 seconds (equals tokens per minute). */ + tokensPerMinute: number; + /** Total requests counted in the window (window denominator). */ + sampleCount: number; + /** Rolling window width in seconds. */ + windowSeconds: number; +} + +interface PulseBucket { + count: number; + errorCount: number; + successCount: number; + successDurationSumMs: number; + tokenSum: number; +} + +const buckets = new Map(); + +function isSuccessStatus(statusCode: number | null): boolean { + return statusCode !== null && statusCode >= 200 && statusCode <= 299; +} + +function bucketKeyFor(epochMs: number): number { + return Math.floor(epochMs / 1000); +} + +/** + * Remove buckets whose second has fallen out of the rolling window relative to now. + */ +function pruneExpiredBuckets(nowMs: number): void { + const minKey = bucketKeyFor(nowMs - WINDOW_MS); + for (const key of buckets.keys()) { + if (key < minKey) { + buckets.delete(key); + } + } +} + +/** + * Record a finalized request as a rolling-window sample. + */ +export function recordPulseSample(sample: PulseSample): void { + const occurredAt = sample.occurredAt ?? Date.now(); + const key = bucketKeyFor(occurredAt); + + let bucket = buckets.get(key); + if (!bucket) { + bucket = { count: 0, errorCount: 0, successCount: 0, successDurationSumMs: 0, tokenSum: 0 }; + buckets.set(key, bucket); + } + + bucket.count += 1; + + if (isSuccessStatus(sample.statusCode)) { + bucket.successCount += 1; + if (sample.durationMs !== null && sample.durationMs >= 0) { + bucket.successDurationSumMs += sample.durationMs; + } + } else { + bucket.errorCount += 1; + } + + if (sample.totalTokens !== null && sample.totalTokens > 0) { + bucket.tokenSum += sample.totalTokens; + } + + pruneExpiredBuckets(occurredAt); +} + +/** + * Compute the rolling-window snapshot for the most recent 60 seconds. + */ +export function getPulseWindowSnapshot(nowMs: number = Date.now()): PulseWindowSnapshot { + pruneExpiredBuckets(nowMs); + + const minKey = bucketKeyFor(nowMs - WINDOW_MS); + + let count = 0; + let errorCount = 0; + let successCount = 0; + let successDurationSumMs = 0; + let tokenSum = 0; + + for (const [key, bucket] of buckets) { + if (key < minKey) { + continue; + } + count += bucket.count; + errorCount += bucket.errorCount; + successCount += bucket.successCount; + successDurationSumMs += bucket.successDurationSumMs; + tokenSum += bucket.tokenSum; + } + + const errorRatePct = count > 0 ? Math.round((errorCount / count) * 100 * 10) / 10 : 0; + const avgLatencyMs = + successCount > 0 ? Math.round((successDurationSumMs / successCount) * 10) / 10 : 0; + + return { + requestsPerMinute: count, + errorRatePct, + avgLatencyMs, + tokensPerMinute: tokenSum, + sampleCount: count, + windowSeconds: WINDOW_SECONDS, + }; +} + +/** + * Clear all rolling-window state. Intended for tests. + */ +export function resetPulseWindow(): void { + buckets.clear(); +} diff --git a/tests/unit/services/live-pulse-aggregator.test.ts b/tests/unit/services/live-pulse-aggregator.test.ts new file mode 100644 index 00000000..34491ce9 --- /dev/null +++ b/tests/unit/services/live-pulse-aggregator.test.ts @@ -0,0 +1,120 @@ +import { beforeEach, describe, expect, it } from "vitest"; + +import { + getPulseWindowSnapshot, + recordPulseSample, + resetPulseWindow, +} from "@/lib/services/live-pulse-aggregator"; + +const BASE = 1_000_000_000_000; // fixed epoch ms base for deterministic buckets + +describe("live-pulse-aggregator", () => { + beforeEach(() => { + resetPulseWindow(); + }); + + it("returns a zero-valued snapshot when there are no samples", () => { + const snapshot = getPulseWindowSnapshot(BASE); + + expect(snapshot).toEqual({ + requestsPerMinute: 0, + errorRatePct: 0, + avgLatencyMs: 0, + tokensPerMinute: 0, + sampleCount: 0, + windowSeconds: 60, + }); + }); + + it("aggregates request count, tokens and latency within the window", () => { + recordPulseSample({ statusCode: 200, durationMs: 100, totalTokens: 10, occurredAt: BASE }); + recordPulseSample({ + statusCode: 200, + durationMs: 300, + totalTokens: 20, + occurredAt: BASE + 5_000, + }); + recordPulseSample({ + statusCode: 201, + durationMs: 200, + totalTokens: 30, + occurredAt: BASE + 10_000, + }); + + const snapshot = getPulseWindowSnapshot(BASE + 10_000); + + expect(snapshot.requestsPerMinute).toBe(3); + expect(snapshot.sampleCount).toBe(3); + expect(snapshot.tokensPerMinute).toBe(60); + // (100 + 300 + 200) / 3 = 200 + expect(snapshot.avgLatencyMs).toBe(200); + expect(snapshot.errorRatePct).toBe(0); + }); + + it("drops samples that fall outside the 60 second window", () => { + // Sample at BASE is 61s before the read time, so it must be excluded. + recordPulseSample({ statusCode: 200, durationMs: 100, totalTokens: 10, occurredAt: BASE }); + recordPulseSample({ + statusCode: 200, + durationMs: 150, + totalTokens: 5, + occurredAt: BASE + 61_000, + }); + + const snapshot = getPulseWindowSnapshot(BASE + 61_000); + + expect(snapshot.requestsPerMinute).toBe(1); + expect(snapshot.tokensPerMinute).toBe(5); + expect(snapshot.avgLatencyMs).toBe(150); + }); + + it("computes error rate from non-2xx requests only", () => { + recordPulseSample({ statusCode: 200, durationMs: 100, totalTokens: 10, occurredAt: BASE }); + recordPulseSample({ + statusCode: 500, + durationMs: 50, + totalTokens: 0, + occurredAt: BASE + 1_000, + }); + recordPulseSample({ + statusCode: 429, + durationMs: 40, + totalTokens: 0, + occurredAt: BASE + 2_000, + }); + recordPulseSample({ + statusCode: null, + durationMs: null, + totalTokens: 0, + occurredAt: BASE + 3_000, + }); + + const snapshot = getPulseWindowSnapshot(BASE + 3_000); + + expect(snapshot.requestsPerMinute).toBe(4); + // 3 of 4 are non-2xx => 75% + expect(snapshot.errorRatePct).toBe(75); + }); + + it("averages latency over successful requests only", () => { + recordPulseSample({ statusCode: 200, durationMs: 100, totalTokens: 10, occurredAt: BASE }); + recordPulseSample({ + statusCode: 200, + durationMs: 300, + totalTokens: 10, + occurredAt: BASE + 1_000, + }); + // Failed request with a long duration must not affect the success average. + recordPulseSample({ + statusCode: 503, + durationMs: 9_000, + totalTokens: 0, + occurredAt: BASE + 2_000, + }); + + const snapshot = getPulseWindowSnapshot(BASE + 2_000); + + // (100 + 300) / 2 = 200, failed 9000ms excluded + expect(snapshot.avgLatencyMs).toBe(200); + }); +}); From 69abe54a5df872b6bb036a41933dbb15fd219a93 Mon Sep 17 00:00:00 2001 From: umaru Date: Sat, 30 May 2026 11:14:23 +0800 Subject: [PATCH 03/12] =?UTF-8?q?feat(live-pulse):=20=E6=8E=A5=E5=85=A5?= =?UTF-8?q?=E7=BB=88=E6=80=81=E5=8F=96=E6=A0=B7=E4=B8=8E=E7=BD=91=E5=85=B3?= =?UTF-8?q?=E5=81=A5=E5=BA=B7=E5=BF=AB=E7=85=A7=E6=8B=BC=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 request-logger 的请求收口路径(updateRequestLog 设置终态状态码、 logRequest 一次性终态)取样,进行中创建不计入。新增 live-pulse-service: 合并滚动窗口快照与 getAllHealthStatusWithCircuitBreaker 得到的健康上游数/ 总数与熔断打开数,产出 LivePulseSnapshot;打开/半开不计为关闭。含单元测试。 --- openspec/changes/live-pulse-bar/tasks.md | 8 +- src/lib/services/live-pulse-service.ts | 78 +++++++++++++ src/lib/services/request-logger.ts | 27 +++++ .../unit/services/live-pulse-service.test.ts | 108 ++++++++++++++++++ 4 files changed, 217 insertions(+), 4 deletions(-) create mode 100644 src/lib/services/live-pulse-service.ts create mode 100644 tests/unit/services/live-pulse-service.test.ts diff --git a/openspec/changes/live-pulse-bar/tasks.md b/openspec/changes/live-pulse-bar/tasks.md index 67937b23..cc0c1cc1 100644 --- a/openspec/changes/live-pulse-bar/tasks.md +++ b/openspec/changes/live-pulse-bar/tasks.md @@ -8,10 +8,10 @@ ## 2. 取样接入与快照拼装(含网关健康信号) -- [ ] 2.1 在 `src/lib/services/request-logger.ts` 的请求收口路径(终态写入处)调用 `recordPulseSample(...)`,仅在请求收口为终态、`durationMs/totalTokens` 已确定时取样;不在请求创建(进行中)处取样。 -- [ ] 2.2 新增快照拼装函数(置于 `live-pulse-aggregator.ts` 或新增 `live-pulse-service.ts`),合并滚动窗口快照与 `getAllHealthStatusWithCircuitBreaker()` 得到的健康上游数/总数、熔断打开数,产出完整 `LivePulseSnapshot`;定义其 TypeScript 类型。 -- [ ] 2.3 新增快照拼装单元测试:健康/熔断计数与传入的健康检查结果一致(打开/半开不计为关闭);窗口指标与健康信号正确合并。 -- [ ] 2.4 运行相关测试与类型检查通过;提交本阶段。 +- [x] 2.1 在 `src/lib/services/request-logger.ts` 的请求收口路径(终态写入处)调用 `recordPulseSample(...)`,仅在请求收口为终态、`durationMs/totalTokens` 已确定时取样;不在请求创建(进行中)处取样。 +- [x] 2.2 新增快照拼装函数(新增 `live-pulse-service.ts`),合并滚动窗口快照与 `getAllHealthStatusWithCircuitBreaker()` 得到的健康上游数/总数、熔断打开数,产出完整 `LivePulseSnapshot`;定义其 TypeScript 类型。 +- [x] 2.3 新增快照拼装单元测试:健康/熔断计数与传入的健康检查结果一致(打开/半开不计为关闭);窗口指标与健康信号正确合并。 +- [x] 2.4 运行相关测试与类型检查通过;提交本阶段。 - 验收:取样仅发生在终态收口;快照拼装单测全绿;健康/熔断口径与 `routing-failover-observability` 既有真实状态一致。 ## 3. 实时脉搏 SSE 端点 diff --git a/src/lib/services/live-pulse-service.ts b/src/lib/services/live-pulse-service.ts new file mode 100644 index 00000000..b7627dd3 --- /dev/null +++ b/src/lib/services/live-pulse-service.ts @@ -0,0 +1,78 @@ +/** + * Assembles the complete live pulse snapshot: the rolling-window traffic metrics + * combined with AutoRouter's gateway-specific health signals (healthy upstream + * ratio and open circuit breaker count). + */ + +import { CircuitBreakerStateEnum } from "./circuit-breaker"; +import { getAllHealthStatusWithCircuitBreaker, type HealthStatus } from "./health-checker"; +import { getPulseWindowSnapshot, type PulseWindowSnapshot } from "./live-pulse-aggregator"; + +export interface LivePulseGatewayHealth { + /** Number of upstreams currently marked healthy. */ + healthyUpstreams: number; + /** Total number of upstreams counted (active upstreams). */ + totalUpstreams: number; + /** Number of upstreams whose circuit breaker is currently open. */ + openCircuitBreakers: number; +} + +export interface LivePulseSnapshot extends PulseWindowSnapshot { + /** ISO timestamp when this snapshot was assembled. */ + generatedAt: string; + /** Gateway-specific health signals. */ + gateway: LivePulseGatewayHealth; +} + +const EMPTY_GATEWAY_HEALTH: LivePulseGatewayHealth = { + healthyUpstreams: 0, + totalUpstreams: 0, + openCircuitBreakers: 0, +}; + +/** + * Reduce a list of upstream health statuses into gateway health counters. + * Open and half-open circuit breakers are never counted as closed. + */ +export function summarizeGatewayHealth(statuses: HealthStatus[]): LivePulseGatewayHealth { + let healthyUpstreams = 0; + let openCircuitBreakers = 0; + + for (const status of statuses) { + if (status.isHealthy) { + healthyUpstreams += 1; + } + if (status.circuitBreaker?.state === CircuitBreakerStateEnum.OPEN) { + openCircuitBreakers += 1; + } + } + + return { + healthyUpstreams, + totalUpstreams: statuses.length, + openCircuitBreakers, + }; +} + +/** + * Build the full live pulse snapshot for the current moment. + * Window metrics are always returned; if the gateway health lookup fails the + * snapshot degrades to zeroed gateway counters rather than failing the request. + */ +export async function getLivePulseSnapshot(nowMs: number = Date.now()): Promise { + const window = getPulseWindowSnapshot(nowMs); + + let gateway = EMPTY_GATEWAY_HEALTH; + try { + const statuses = await getAllHealthStatusWithCircuitBreaker(true, true); + gateway = summarizeGatewayHealth(statuses); + } catch { + // Keep window metrics usable even when upstream health cannot be read. + } + + return { + ...window, + generatedAt: new Date(nowMs).toISOString(), + gateway, + }; +} diff --git a/src/lib/services/request-logger.ts b/src/lib/services/request-logger.ts index cea64508..bd2d6282 100644 --- a/src/lib/services/request-logger.ts +++ b/src/lib/services/request-logger.ts @@ -9,6 +9,7 @@ import type { } from "@/types/api"; import { extractNormalizedUsage, type HeaderDiff } from "./proxy-client"; import { publishRequestLogLiveUpdate } from "./request-log-live-updates"; +import { recordPulseSample } from "./live-pulse-aggregator"; import { calculateAndPersistRequestBillingSnapshot } from "./billing-cost-service"; import { createLogger } from "@/lib/utils/logger"; import { isRequestThinkingConfig } from "@/lib/utils/request-thinking-config"; @@ -295,6 +296,25 @@ function notifyRequestLogChange( }); } +/** + * Feed a finalized request log into the live pulse rolling window. + * Only terminal entries (with a known status code) are sampled; in-progress + * entries created by logRequestStart carry a null status and are skipped. + */ +function recordPulseSampleFromLog( + logEntry: Pick | null | undefined +): void { + if (!logEntry || logEntry.statusCode == null) { + return; + } + + recordPulseSample({ + statusCode: logEntry.statusCode, + durationMs: logEntry.durationMs ?? null, + totalTokens: logEntry.totalTokens ?? null, + }); +} + function normalizeBillingStatus(value: string | null | undefined): "billed" | "unbilled" | null { if (value === "billed" || value === "unbilled") { return value; @@ -458,6 +478,12 @@ export async function updateRequestLog( notifyRequestLogChange(updated ?? null); + // Sample only when this update carries the terminal status code, so a single + // request is counted once at finalization rather than on every mid-stream update. + if (input.statusCode !== undefined) { + recordPulseSampleFromLog(updated); + } + return updated ?? null; } @@ -514,6 +540,7 @@ export async function logRequest(input: LogRequestInput): Promise { // Request logged to database - details available via admin API notifyRequestLogChange(logEntry); + recordPulseSampleFromLog(logEntry); return logEntry; } diff --git a/tests/unit/services/live-pulse-service.test.ts b/tests/unit/services/live-pulse-service.test.ts new file mode 100644 index 00000000..753b6bdb --- /dev/null +++ b/tests/unit/services/live-pulse-service.test.ts @@ -0,0 +1,108 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { CircuitBreakerStateEnum } from "@/lib/services/circuit-breaker"; +import type { CircuitBreakerStatus, HealthStatus } from "@/lib/services/health-checker"; + +const { getAllHealthStatusWithCircuitBreakerMock } = vi.hoisted(() => ({ + getAllHealthStatusWithCircuitBreakerMock: vi.fn(), +})); + +vi.mock("@/lib/services/health-checker", () => ({ + getAllHealthStatusWithCircuitBreaker: getAllHealthStatusWithCircuitBreakerMock, +})); + +import { getLivePulseSnapshot, summarizeGatewayHealth } from "@/lib/services/live-pulse-service"; +import { recordPulseSample, resetPulseWindow } from "@/lib/services/live-pulse-aggregator"; + +function makeStatus(isHealthy: boolean, state?: CircuitBreakerStateEnum): HealthStatus { + return { + upstreamId: "u", + upstreamName: "n", + isHealthy, + lastCheckAt: null, + lastSuccessAt: null, + failureCount: 0, + latencyMs: null, + errorMessage: null, + circuitBreaker: state ? ({ state } as unknown as CircuitBreakerStatus) : null, + }; +} + +const BASE = 1_000_000_000_000; + +describe("live-pulse-service", () => { + beforeEach(() => { + vi.clearAllMocks(); + resetPulseWindow(); + }); + + describe("summarizeGatewayHealth", () => { + it("counts healthy upstreams and open breakers, never counting open/half-open as closed", () => { + const result = summarizeGatewayHealth([ + makeStatus(true, CircuitBreakerStateEnum.CLOSED), + makeStatus(true, CircuitBreakerStateEnum.HALF_OPEN), + makeStatus(false, CircuitBreakerStateEnum.OPEN), + makeStatus(false, CircuitBreakerStateEnum.OPEN), + ]); + + expect(result).toEqual({ + healthyUpstreams: 2, + totalUpstreams: 4, + openCircuitBreakers: 2, + }); + }); + + it("treats missing circuit breaker info as not open", () => { + const result = summarizeGatewayHealth([makeStatus(true), makeStatus(false)]); + + expect(result).toEqual({ + healthyUpstreams: 1, + totalUpstreams: 2, + openCircuitBreakers: 0, + }); + }); + }); + + describe("getLivePulseSnapshot", () => { + it("merges rolling-window metrics with gateway health counts", async () => { + recordPulseSample({ statusCode: 200, durationMs: 100, totalTokens: 10, occurredAt: BASE }); + recordPulseSample({ + statusCode: 500, + durationMs: 50, + totalTokens: 0, + occurredAt: BASE + 1_000, + }); + + getAllHealthStatusWithCircuitBreakerMock.mockResolvedValue([ + makeStatus(true, CircuitBreakerStateEnum.CLOSED), + makeStatus(false, CircuitBreakerStateEnum.OPEN), + ]); + + const snapshot = await getLivePulseSnapshot(BASE + 1_000); + + expect(snapshot.requestsPerMinute).toBe(2); + expect(snapshot.errorRatePct).toBe(50); + expect(snapshot.avgLatencyMs).toBe(100); + expect(snapshot.gateway).toEqual({ + healthyUpstreams: 1, + totalUpstreams: 2, + openCircuitBreakers: 1, + }); + expect(snapshot.generatedAt).toBe(new Date(BASE + 1_000).toISOString()); + }); + + it("degrades to zeroed gateway health when the health lookup fails", async () => { + recordPulseSample({ statusCode: 200, durationMs: 100, totalTokens: 10, occurredAt: BASE }); + getAllHealthStatusWithCircuitBreakerMock.mockRejectedValue(new Error("db down")); + + const snapshot = await getLivePulseSnapshot(BASE); + + expect(snapshot.requestsPerMinute).toBe(1); + expect(snapshot.gateway).toEqual({ + healthyUpstreams: 0, + totalUpstreams: 0, + openCircuitBreakers: 0, + }); + }); + }); +}); From db0685193fdadadb083ce1c803f5874077df4afa Mon Sep 17 00:00:00 2001 From: umaru Date: Sat, 30 May 2026 11:18:53 +0800 Subject: [PATCH 04/12] =?UTF-8?q?feat(live-pulse):=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=AE=9E=E6=97=B6=E8=84=89=E6=90=8F=20SSE=20=E7=AB=AF=E7=82=B9?= =?UTF-8?q?=E4=B8=8E=E7=BD=91=E5=85=B3=E5=81=A5=E5=BA=B7=E7=9F=AD=E7=BC=93?= =?UTF-8?q?=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 /api/admin/stats/live:管理员鉴权后默认以 SSE 推送 live-pulse 快照(连接即一帧、其后每 2 秒一帧、15 秒心跳、断开清理);?mode=snapshot 返回一次性 JSON 快照供前端降级拉取;未授权返回 401 不泄露指标。 拼装快照时为网关健康加 2 秒共享缓存,避免多页常驻连接放大数据库读取。 --- openspec/changes/live-pulse-bar/tasks.md | 8 +- src/app/api/admin/stats/live/route.ts | 103 ++++++++++++++++++ src/lib/services/live-pulse-service.ts | 28 ++++- .../unit/services/live-pulse-service.test.ts | 7 +- 4 files changed, 139 insertions(+), 7 deletions(-) create mode 100644 src/app/api/admin/stats/live/route.ts diff --git a/openspec/changes/live-pulse-bar/tasks.md b/openspec/changes/live-pulse-bar/tasks.md index cc0c1cc1..171815af 100644 --- a/openspec/changes/live-pulse-bar/tasks.md +++ b/openspec/changes/live-pulse-bar/tasks.md @@ -16,10 +16,10 @@ ## 3. 实时脉搏 SSE 端点 -- [ ] 3.1 新增 `src/app/api/admin/stats/live/route.ts`,鉴权复用 `validateAdminAuth`(`ADMIN_TOKEN` Bearer),与 `/api/admin/logs/live` 一致;`runtime = "nodejs"`、`dynamic = "force-dynamic"`。 -- [ ] 3.2 连接建立即推送一帧 `live-pulse` 快照,随后约每 2 秒推送一帧;保留约 15 秒心跳注释行;正确清理定时器与中止监听,避免断开后写入。 -- [ ] 3.3 提供降级拉取路径:支持以普通 GET 返回一次性快照(供前端 fallback 使用),或在同端点区分一次性快照请求;缺失/无效凭据返回 401 且不泄露任何指标。 -- [ ] 3.4 运行 `pnpm lint` 与 `pnpm exec tsc --noEmit` 通过;提交本阶段。 +- [x] 3.1 新增 `src/app/api/admin/stats/live/route.ts`,鉴权复用 `validateAdminAuth`(`ADMIN_TOKEN` Bearer),与 `/api/admin/logs/live` 一致;`runtime = "nodejs"`、`dynamic = "force-dynamic"`。 +- [x] 3.2 连接建立即推送一帧 `live-pulse` 快照,随后约每 2 秒推送一帧;保留约 15 秒心跳注释行;正确清理定时器与中止监听,避免断开后写入。 +- [x] 3.3 提供降级拉取路径:`?mode=snapshot` 以普通 GET 返回一次性快照(供前端 fallback 使用);缺失/无效凭据返回 401 且不泄露任何指标。 +- [x] 3.4 运行 `pnpm lint` 与 `pnpm exec tsc --noEmit` 通过;提交本阶段。 - 验收:未授权请求返回 401 且无指标数据;端点能稳定推送 `live-pulse` 帧并按断开清理资源。 ## 4. 前端实时脉搏客户端与桌面状态条 diff --git a/src/app/api/admin/stats/live/route.ts b/src/app/api/admin/stats/live/route.ts new file mode 100644 index 00000000..13a3880f --- /dev/null +++ b/src/app/api/admin/stats/live/route.ts @@ -0,0 +1,103 @@ +import { NextRequest, NextResponse } from "next/server"; +import { errorResponse } from "@/lib/utils/api-auth"; +import { validateAdminAuth } from "@/lib/utils/auth"; +import { getLivePulseSnapshot } from "@/lib/services/live-pulse-service"; +import { createLogger } from "@/lib/utils/logger"; + +export const runtime = "nodejs"; +export const dynamic = "force-dynamic"; + +const log = createLogger("admin-stats-live"); +const KEEPALIVE_INTERVAL_MS = 15000; +const PULSE_INTERVAL_MS = 2000; + +function formatSseEvent(eventName: string, data: unknown): string { + return `event: ${eventName}\ndata: ${JSON.stringify(data)}\n\n`; +} + +/** + * Stream live pulse snapshots over Server-Sent Events for authenticated admins. + * + * Default mode streams a `live-pulse` snapshot frame on connect and every + * PULSE_INTERVAL_MS thereafter. `?mode=snapshot` returns a single snapshot as + * JSON, used by the frontend fallback polling when SSE is unavailable. + */ +export async function GET(request: NextRequest) { + const authHeader = request.headers.get("authorization"); + if (!validateAdminAuth(authHeader)) { + return errorResponse("Unauthorized", 401); + } + + if (request.nextUrl.searchParams.get("mode") === "snapshot") { + try { + const snapshot = await getLivePulseSnapshot(); + return NextResponse.json(snapshot); + } catch (error) { + log.error({ err: error }, "failed to build live pulse snapshot"); + return errorResponse("Internal server error", 500); + } + } + + const encoder = new TextEncoder(); + let cleanup = () => undefined; + + const stream = new ReadableStream({ + start(controller) { + const send = (chunk: string) => { + controller.enqueue(encoder.encode(chunk)); + }; + + const pushSnapshot = async () => { + try { + const snapshot = await getLivePulseSnapshot(); + send(formatSseEvent("live-pulse", snapshot)); + } catch (error) { + log.debug({ err: error }, "live pulse snapshot push failed"); + } + }; + + const keepalive = setInterval(() => { + try { + send(`: keep-alive ${new Date().toISOString()}\n\n`); + } catch { + // Ignore write errors after disconnect. + } + }, KEEPALIVE_INTERVAL_MS); + + const pulseTimer = setInterval(() => { + void pushSnapshot(); + }, PULSE_INTERVAL_MS); + + const abortHandler = () => { + cleanup(); + try { + controller.close(); + } catch { + // Controller may already be closed. + } + }; + + request.signal.addEventListener("abort", abortHandler, { once: true }); + + cleanup = () => { + clearInterval(keepalive); + clearInterval(pulseTimer); + request.signal.removeEventListener("abort", abortHandler); + }; + + // Push the first snapshot immediately so the bar shows data on connect. + void pushSnapshot(); + }, + cancel() { + cleanup(); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream; charset=utf-8", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + }, + }); +} diff --git a/src/lib/services/live-pulse-service.ts b/src/lib/services/live-pulse-service.ts index b7627dd3..0dbf9632 100644 --- a/src/lib/services/live-pulse-service.ts +++ b/src/lib/services/live-pulse-service.ts @@ -30,6 +30,31 @@ const EMPTY_GATEWAY_HEALTH: LivePulseGatewayHealth = { openCircuitBreakers: 0, }; +// Reading gateway health hits the database once per upstream. The pulse bar is +// pinned on every page, so many open dashboards would each poll it every couple +// of seconds. A short shared cache keeps those reads bounded regardless of how +// many connections are open. +const GATEWAY_CACHE_TTL_MS = 2000; +let gatewayCache: { value: LivePulseGatewayHealth; expiresAt: number } | null = null; + +async function loadGatewayHealth(nowMs: number): Promise { + if (gatewayCache && gatewayCache.expiresAt > nowMs) { + return gatewayCache.value; + } + + const statuses = await getAllHealthStatusWithCircuitBreaker(true, true); + const value = summarizeGatewayHealth(statuses); + gatewayCache = { value, expiresAt: nowMs + GATEWAY_CACHE_TTL_MS }; + return value; +} + +/** + * Clear the cached gateway health. Intended for tests. + */ +export function resetLivePulseCache(): void { + gatewayCache = null; +} + /** * Reduce a list of upstream health statuses into gateway health counters. * Open and half-open circuit breakers are never counted as closed. @@ -64,8 +89,7 @@ export async function getLivePulseSnapshot(nowMs: number = Date.now()): Promise< let gateway = EMPTY_GATEWAY_HEALTH; try { - const statuses = await getAllHealthStatusWithCircuitBreaker(true, true); - gateway = summarizeGatewayHealth(statuses); + gateway = await loadGatewayHealth(nowMs); } catch { // Keep window metrics usable even when upstream health cannot be read. } diff --git a/tests/unit/services/live-pulse-service.test.ts b/tests/unit/services/live-pulse-service.test.ts index 753b6bdb..53ea345d 100644 --- a/tests/unit/services/live-pulse-service.test.ts +++ b/tests/unit/services/live-pulse-service.test.ts @@ -11,7 +11,11 @@ vi.mock("@/lib/services/health-checker", () => ({ getAllHealthStatusWithCircuitBreaker: getAllHealthStatusWithCircuitBreakerMock, })); -import { getLivePulseSnapshot, summarizeGatewayHealth } from "@/lib/services/live-pulse-service"; +import { + getLivePulseSnapshot, + resetLivePulseCache, + summarizeGatewayHealth, +} from "@/lib/services/live-pulse-service"; import { recordPulseSample, resetPulseWindow } from "@/lib/services/live-pulse-aggregator"; function makeStatus(isHealthy: boolean, state?: CircuitBreakerStateEnum): HealthStatus { @@ -34,6 +38,7 @@ describe("live-pulse-service", () => { beforeEach(() => { vi.clearAllMocks(); resetPulseWindow(); + resetLivePulseCache(); }); describe("summarizeGatewayHealth", () => { From 84bcfe54ab750bdf8efe05cb6db474f134f29fb4 Mon Sep 17 00:00:00 2001 From: umaru Date: Sat, 30 May 2026 11:31:07 +0800 Subject: [PATCH 05/12] =?UTF-8?q?feat(live-pulse):=20=E5=89=8D=E7=AB=AF?= =?UTF-8?q?=E5=AE=9E=E6=97=B6=E8=84=89=E6=90=8F=E7=8A=B6=E6=80=81=E6=9D=A1?= =?UTF-8?q?=E5=85=A8=E5=B1=80=E5=B8=B8=E9=A9=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 use-live-pulse 钩子(SSE 三态 + 断线降级轮询)与 live-pulse-provider, 在仪表盘布局层建立单条共享连接,避免逐页重连。新增 live-pulse-bar 展示组件 (完整版 / 紧凑版,在线指示灯按三态着色,错误率与熔断打开按既有强调样式提示, 数字跟随 next-intl 当前语言)。顶栏右侧承载状态条,移动端以紧凑窄条常驻。 补充 livePulse 中英文文案与组件单元测试。 --- openspec/changes/live-pulse-bar/tasks.md | 10 +- src/app/[locale]/(dashboard)/layout.tsx | 69 ++++--- src/components/admin/live-pulse-bar.tsx | 159 ++++++++++++++ src/components/admin/mobile-pulse-strip.tsx | 27 +++ src/components/admin/topbar.tsx | 24 ++- src/hooks/use-live-pulse.ts | 194 ++++++++++++++++++ src/messages/en.json | 14 ++ src/messages/zh-CN.json | 14 ++ src/providers/live-pulse-provider.tsx | 28 +++ .../components/admin/live-pulse-bar.test.tsx | 62 ++++++ 10 files changed, 563 insertions(+), 38 deletions(-) create mode 100644 src/components/admin/live-pulse-bar.tsx create mode 100644 src/components/admin/mobile-pulse-strip.tsx create mode 100644 src/hooks/use-live-pulse.ts create mode 100644 src/providers/live-pulse-provider.tsx create mode 100644 tests/components/admin/live-pulse-bar.test.tsx diff --git a/openspec/changes/live-pulse-bar/tasks.md b/openspec/changes/live-pulse-bar/tasks.md index 171815af..d1c8e3fe 100644 --- a/openspec/changes/live-pulse-bar/tasks.md +++ b/openspec/changes/live-pulse-bar/tasks.md @@ -24,11 +24,11 @@ ## 4. 前端实时脉搏客户端与桌面状态条 -- [ ] 4.1 新增 `src/hooks/use-live-pulse.ts`,结构对齐 `use-request-log-live.ts`:连接 `/api/admin/stats/live`,解析 `live-pulse` 事件,维护快照与 `connecting/live/fallback` 三态;断线降级为定时拉取一次性快照。 -- [ ] 4.2 新增 `src/components/admin/live-pulse-bar.tsx`,纯展示组件:呈现在线指示灯(按三态着色)、req/min、错误率、平均延迟、TPM、健康上游数/总数、熔断打开数;错误率超阈值与熔断打开数 >0 时按既有错误/警示样式强调。 -- [ ] 4.3 扩展 `src/components/admin/topbar.tsx`,在右栏承载状态条(桌面端);所有管理页因复用 `Topbar` 自动获得。 -- [ ] 4.4 在 `src/messages/en.json` 与 `src/messages/zh.json` 新增实时脉搏相关文案键(指标标签、在线/降级提示等)。 -- [ ] 4.5 运行 `pnpm lint`、`pnpm exec tsc --noEmit`、`pnpm format:check` 通过;提交本阶段。 +- [x] 4.1 新增 `src/hooks/use-live-pulse.ts`,结构对齐 `use-request-log-live.ts`:连接 `/api/admin/stats/live`,解析 `live-pulse` 事件,维护快照与 `connecting/live/fallback` 三态;断线降级为定时拉取一次性快照。新增 `src/providers/live-pulse-provider.tsx`,在布局层只建立一条共享连接,供顶栏与移动端窄条共用,避免逐页重连。 +- [x] 4.2 新增 `src/components/admin/live-pulse-bar.tsx`,纯展示组件:呈现在线指示灯(按三态着色)、req/min、错误率、平均延迟、TPM、健康上游数/总数、熔断打开数;错误率超阈值与熔断打开数 >0 时按既有错误/警示样式强调;数字格式跟随 next-intl 当前语言。含组件单元测试。 +- [x] 4.3 扩展 `src/components/admin/topbar.tsx`,在右栏承载状态条(桌面端完整版,窄屏退化为紧凑版);所有管理页因复用 `Topbar` 自动获得。 +- [x] 4.4 在 `src/messages/en.json` 与 `src/messages/zh-CN.json` 新增 `livePulse` 命名空间文案键(指标标签、在线/降级提示等)。 +- [x] 4.5 运行 `eslint`、`pnpm exec tsc --noEmit`、`prettier --check` 通过;提交本阶段。 - 验收:桌面端各管理页顶栏常驻状态条,指标随推送更新,降级时指示灯转琥珀色并持续刷新;无流量时显示零值。 ## 5. 移动端紧凑形态 diff --git a/src/app/[locale]/(dashboard)/layout.tsx b/src/app/[locale]/(dashboard)/layout.tsx index 8d6c02d8..c4bab51f 100644 --- a/src/app/[locale]/(dashboard)/layout.tsx +++ b/src/app/[locale]/(dashboard)/layout.tsx @@ -5,9 +5,11 @@ import { useTranslations } from "next-intl"; import { ChevronLeft } from "lucide-react"; import { Sidebar } from "@/components/admin/sidebar"; +import { MobilePulseStrip } from "@/components/admin/mobile-pulse-strip"; import { usePathname, useRouter } from "@/i18n/navigation"; import { cn } from "@/lib/utils"; import { useAuth } from "@/providers/auth-provider"; +import { LivePulseProvider } from "@/providers/live-pulse-provider"; import { Button } from "@/components/ui/button"; const MOBILE_ROOT_ROUTES = ["/dashboard", "/keys", "/upstreams", "/logs", "/settings"] as const; @@ -87,38 +89,41 @@ export default function DashboardLayout({ children }: { children: React.ReactNod } return ( -
- setIsSidebarCollapsed((value) => !value)} - /> + +
+ setIsSidebarCollapsed((value) => !value)} + /> -
- {!isMobileRootRoute && ( -
-
- -
-
- )} - {children} -
-
+
+ + {!isMobileRootRoute && ( +
+
+ +
+
+ )} + {children} +
+
+ ); } diff --git a/src/components/admin/live-pulse-bar.tsx b/src/components/admin/live-pulse-bar.tsx new file mode 100644 index 00000000..4cc52679 --- /dev/null +++ b/src/components/admin/live-pulse-bar.tsx @@ -0,0 +1,159 @@ +"use client"; + +import { useFormatter, useTranslations } from "next-intl"; +import { Server, Zap } from "lucide-react"; + +import { cn } from "@/lib/utils"; +import type { LivePulseConnectionState, LivePulseSnapshot } from "@/hooks/use-live-pulse"; + +const ERROR_RATE_WARN_THRESHOLD_PCT = 5; + +const ZERO_SNAPSHOT: LivePulseSnapshot = { + requestsPerMinute: 0, + errorRatePct: 0, + avgLatencyMs: 0, + tokensPerMinute: 0, + sampleCount: 0, + windowSeconds: 60, + generatedAt: "", + gateway: { healthyUpstreams: 0, totalUpstreams: 0, openCircuitBreakers: 0 }, +}; + +interface LivePulseBarProps { + snapshot: LivePulseSnapshot | null; + connectionState: LivePulseConnectionState; + variant?: "full" | "compact"; + className?: string; +} + +function StatusDot({ connectionState }: { connectionState: LivePulseConnectionState }) { + const dotClass = + connectionState === "live" + ? "bg-status-success animate-log-badge-live motion-reduce:animate-none" + : connectionState === "connecting" + ? "bg-status-info animate-log-badge-connect motion-reduce:animate-none" + : "bg-status-warning"; + + return