Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions frontend/src/pages/admin/traces/RagTraceDetailPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,20 @@ const copyToClipboard = (text: string, label: string) => {

// ============ 状态颜色 ============

type StatusType = "success" | "failed" | "running" | "default";
type StatusType = "success" | "failed" | "error" | "running" | "cancelled" | "default";

const STATUS_COLORS: Record<StatusType, { dot: string; bar: string }> = {
success: { dot: "bg-emerald-500", bar: "bg-emerald-400" },
failed: { dot: "bg-red-500", bar: "bg-red-400" },
error: { dot: "bg-red-500", bar: "bg-red-400" },
running: { dot: "bg-amber-500", bar: "bg-amber-400" },
cancelled: { dot: "bg-slate-400", bar: "bg-slate-300" },
default: { dot: "bg-slate-300", bar: "bg-slate-300" }
};

const getStatusColors = (status?: string | null) => {
const normalized = normalizeStatus(status) as StatusType | null;
return STATUS_COLORS[normalized || "default"];
const normalized = normalizeStatus(status) as StatusType;
return STATUS_COLORS[normalized] ?? STATUS_COLORS.default;
};

// ============ 子组件 ============
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,25 +144,43 @@ protected StreamCancellationHandle doStreamChat(ChatRequest request, StreamCallb

// 在调用线程开 stream span,使后续 first-packet 子节点能正确归属父节点;
// 该 span 由 SSE 终态(onComplete / onError)或 cancel 时收尾,记录真实端到端耗时
// 注意:此处不 detach,由调用方(RoutingLLMService)在 awaitFirstPacket 之后调 handle.detach()
StreamSpan span = streamTraceSupport.beginStreamNode(provider() + "-stream-chat", "LLM_PROVIDER");
StreamSpanCallback wrappedCallback;
try {
wrappedCallback = new StreamSpanCallback(callback, span);
StreamCancellationHandle inner = StreamAsyncExecutor.submit(
modelStreamExecutor,
call,
wrappedCallback,
cancelled -> doStream(call, wrappedCallback, cancelled, reasoningEnabled)
);
return () -> {
try {
inner.cancel();
} finally {
wrappedCallback.onCancel();
}
};
} finally {
// 同步部分结束:把节点从当前线程的 NODE_STACK 弹出,避免污染兄弟节点的父节点链
StreamSpanCallback wrappedCallback = new StreamSpanCallback(callback, span);
StreamCancellationHandle inner = StreamAsyncExecutor.submit(
modelStreamExecutor,
call,
wrappedCallback,
cancelled -> doStream(call, wrappedCallback, cancelled, reasoningEnabled)
);
return new StreamChatHandle(() -> {
try {
inner.cancel();
} finally {
wrappedCallback.onCancel();
}
}, span);
}

/**
* 携带 StreamSpan 的取消句柄,供调用方在首包探测完成后调 detach() 弹出 NODE_STACK
*/
public static final class StreamChatHandle implements StreamCancellationHandle {
private final StreamCancellationHandle delegate;
private final StreamSpan span;

public StreamChatHandle(StreamCancellationHandle delegate, StreamSpan span) {
this.delegate = delegate;
this.span = span;
}

@Override
public void cancel() {
delegate.cancel();
}

@Override
public void detach() {
span.detach();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,14 @@ public StreamCancellationHandle streamChat(ChatRequest request, StreamCallback c
continue;
}

ProbeStreamBridge.ProbeResult result = awaitFirstPacket(bridge, handle, callback);
ProbeStreamBridge.ProbeResult result;
try {
result = awaitFirstPacket(bridge, handle, callback);
} finally {
// 首包探测完成后(无论成功失败)弹出 LLM_PROVIDER 节点,
// 确保 TTFT 节点已正确归属到 provider 下
handle.detach();
}

if (result.isSuccess()) {
healthStore.markSuccess(target.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,12 @@ public interface StreamCancellationHandle {
* - 调用后应该不会再继续产生 onContent() 回调
*/
void cancel();

/**
* 将该 handle 关联的 trace 节点从当前线程的 NODE_STACK 弹出
* <p>
* 仅由内部实现覆写(如 StreamChatHandle),用于延迟 detach 跨线程 span,
* 使首包探测等同步子节点能正确归属到该 provider 节点下
*/
default void detach() {}
}