feat(nixl): add pega rdma v1 pull connector#368
Conversation
4f3d7d2 to
d2af963
Compare
d2af963 to
e559099
Compare
xiaguan
left a comment
There was a problem hiding this comment.
Review: feat(nixl): add pega rdma v1 pull connector
聚焦 handshake 路径和新增字段的必要性。
Finding 1: block 数据应该留在 Rust,不要在 Python 留影子副本
问题
PR 在 NixlBaseConnectorWorker(基类)新增了 4 个字段缓存 NIXL block 的原始 (addr, len, device_id):
# base_worker.py:385, :393, :399, :400
self.local_registered_blocks_data: list[tuple[int, int, int, str]] = []
self.src_blocks_data_by_block_size: dict[int, list[tuple[int, int, int]]] = {}
self.dst_blocks_data = defaultdict[EngineId, dict[int, list[tuple[int, int, int]]]](dict)
self.src_blocks_data_by_tp_ratio: dict[int, list[list[tuple[int, int, int]]]] = {}这些数据在 NIXL register_local_xfer_handler / add_remote_agent 时产生,但纯 NIXL pull/push 路径从不消费——只有 Pega RDMA v1 子类的热路径用。子类通过 _local_blocks_table_for_handle(pull_worker.py:1087)和 _remote_blocks_table(pull_worker.py:1103)搬运进 Rust 的 register_blocks_table,之后 Python 侧影子副本就再也不读了。
数据存了三层:
- Python 基类的影子副本(上面 4 个字段)
- Pega 子类的
_pega_block_tables: dict[object, int](pull_worker.py:775)—— key 到 Rust handle 的缓存 - Rust 的
block_tables: HashMap<u64, Vec<BlockEntry>>(rdma_v1.rs:82)—— 真正的数据持有者
第 1 层纯粹是搬运中转,注册进 Rust 后就变成垃圾。
根因
数据在基类方法里产生(register_local_xfer_handler / add_remote_agent),但 Rust engine 在 Pega 子类手里。基类不知道 Rust engine 存在,只能先存起来等子类热路径时搬运。
改法
基类在数据产生点暴露 hook,Pega 子类 override hook 直接注册到 Rust,不在 Python 存任何副本。
Rust rdma_v1.rs: 把所有状态合并到一个 struct,一把 Mutex。所有方法整体释放 GIL,Mutex 只在短临界区持有,RDMA I/O 在锁外:
struct EngineState {
pending_handshakes: HashMap<String, HandshakeMetadata>,
pending_reads: HashMap<u64, PendingRead>,
next_handle: u64,
local_block_tables: HashMap<u64, Vec<BlockEntry>>, // nixl handle → blocks
remote_block_tables: HashMap<(String, usize), Vec<BlockEntry>>, // (engine_id, rank) → blocks
}
#[pyclass]
struct PegaRdmaV1Engine {
engine: Arc<TransferEngine>,
state: Mutex<EngineState>,
}每个方法整体 py.allow_threads,内部临界区取/写状态,I/O 不持锁:
fn read_async_indices(&self, py: Python<'_>, remote_addr: String, local_nixl_handle: u64, remote_engine_id: &str, remote_tp_rank: usize, local_desc_ids: Vec<usize>, remote_desc_ids: Vec<usize>) -> PyResult<(u64, usize, usize)> {
py.allow_threads(|| {
// 临界区:读 block_tables 构建 descriptors
let prepared = {
let state = self.state.lock().map_err(...)?;
let local_table = state.local_block_tables.get(&local_nixl_handle).ok_or(...)?;
let remote_table = state.remote_block_tables.get(&(remote_engine_id.into(), remote_tp_rank)).ok_or(...)?;
// build TransferDesc vec + bytes...
};
// 锁外:submit RDMA I/O
let receivers = self.engine.batch_transfer_async(TransferOp::Read, &remote_addr, &prepared.native).map_err(...)?;
// 临界区:分配 handle + 存 pending_reads
let handle = {
let mut state = self.state.lock().map_err(...)?;
let handle = state.next_handle;
state.next_handle = handle.checked_add(1).ok_or(...)?;
state.pending_reads.insert(handle, PendingRead { receivers: receivers.into_iter().map(Some).collect(), bytes_done: 0 });
handle
};
Ok((handle, prepared.bytes, prepared.native.len()))
})
}
fn check_read(&self, py: Python<'_>, handle: u64) -> PyResult<String> {
py.allow_threads(|| {
// 临界区:取出 receiver
let receiver = {
let mut state = self.state.lock().map_err(...)?;
let read = state.pending_reads.get_mut(&handle).ok_or(...)?;
read.receivers.iter_mut().find_map(Option::take)
};
// 锁外:try_recv
let result = receiver.map(|rx| rx.try_recv());
// 临界区:更新 bytes_done / 清理
let mut state = self.state.lock().map_err(...)?;
// match result, update bytes_done, check if all done...
})
}
fn prepare_handshake(&self, py: Python<'_>, remote_addr: String) -> PyResult<PegaRdmaV1Handshake> {
py.allow_threads(|| {
// 锁外:engine I/O
let status = self.engine.get_or_prepare(&remote_addr).map_err(...)?;
// 临界区:缓存 local metadata
let mut state = self.state.lock().map_err(...)?;
// match status, insert into pending_handshakes if Prepared...
})
}
fn register_local_blocks(&self, nixl_handle: u64, blocks: Vec<(u64, u64, u64)>) -> PyResult<()> {
// 纯状态写入,无 I/O,一把锁即可
let mut state = self.state.lock().map_err(...)?;
state.local_block_tables.insert(nixl_handle, /* build BlockEntry vec */);
Ok(())
}删除 register_blocks_table / drop_blocks_table / next_table_handle——Rust 持有真实数据,shutdown 时 engine drop 自然清理。
基类 base_worker.py: 删 4 个影子字段,加两个空 hook,在数据产生点调用:
def _on_local_blocks_registered(self, nixl_handle: int, blocks_data: list[tuple[int,int,int]]) -> None:
"""Called after local KV blocks are registered with NIXL."""
def _on_remote_blocks_registered(self, engine_id: str, tp_rank: int, blocks_data: list[tuple[int,int,int]]) -> None:
"""Called after remote agent blocks are registered with NIXL."""调用点就是现有的 register_local_xfer_handler(base_worker.py:929、:1156、:1516)和 add_remote_agent(base_worker.py:1556)返回处。
Pega 子类 pull_worker.py: override hook 直接注册到 Rust,删除 3 个 helper 和缓存层:
def _on_local_blocks_registered(self, nixl_handle, blocks_data):
self._pega_rdma.register_local_blocks(nixl_handle, blocks_data)
def _on_remote_blocks_registered(self, engine_id, tp_rank, blocks_data):
self._pega_rdma.register_remote_blocks(engine_id, tp_rank, blocks_data)热路径直接传原始标识符:
self._pega_rdma.read_async_indices(
peer_key,
local_xfer_side_handle, # u64
dst_engine_id, remote_rank, # &str, usize
local_block_descs_ids.tolist(),
remote_block_descs_ids.tolist(),
)净效果
删除:基类 4 个影子字段 + 所有赋值/清理点、Pega 子类 3 个 helper(_local_blocks_data_for_handle / _local_blocks_table_for_handle / _remote_blocks_table)+ _pega_block_tables 缓存、Rust 侧 register_blocks_table / drop_blocks_table / next_table_handle。
Rust 侧从 6 个锁字段收敛到 1 把 Mutex<EngineState>,所有方法释放 GIL,Mutex 只在短临界区持有。
基类只多两个空 hook,不含任何 Pega 状态,纯 NIXL 路径不受影响。
Finding 2: finish_handshake_inner 失败时 connecting 状态泄漏
问题
rdma_v1.rs:414 finish_handshake_inner 先从 pending_handshakes remove 取出 local metadata,再调 complete_handshake:
// rdma_v1.rs:414-430
fn finish_handshake_inner(&self, remote_addr: &str, remote_metadata: &[u8]) -> PyResult<()> {
let remote = HandshakeMetadata::from_bytes(remote_metadata)?;
let local = self
.pending_handshakes
.lock()...remove(remote_addr) // ← 先 remove
.ok_or_else(...)?;
self.engine
.complete_handshake(remote_addr, &local, &remote)
.map_err(|err| rdma_v1_error(...)) // ← 失败后 local 已不在 pending_handshakes
}如果 complete_handshake 失败(比如 rc_backend/mod.rs:330 的 session.connect() 失败),complete_handshake_for 通过 `?` 提前返回 Err,不会执行到 `rc_backend/mod.rs:346` 的 `state.connecting.remove(remote_addr)`。
此时 Python 侧 _handle_handshake_request_failure(base_worker.py:644)被调用,它调 abort_handshake(rdma_v1.rs:169):
```rust
fn abort_handshake(&self, remote_addr: String) -> PyResult<()> {
if let Some(local) = self
.pending_handshakes
.lock()...remove(&remote_addr) // ← 已经被 finish_handshake_inner remove 空了
{
self.engine.abort_handshake(&remote_addr, &local); // ← 走不到这里
}
Ok(()) // ← no-op
}
```
`pending_handshakes` 已经被 `finish_handshake_inner` remove 空了,`abort_handshake` 的 `if let Some` 不匹配,变成 no-op,不会调到 `engine.abort_handshake` 去清理 `connecting` set。
后果:`connecting` 残留 → 后续所有到同一 peer 的 `get_or_prepare` 永远返回 `AlreadyConnecting` → 连接永久卡死,只有进程重启能恢复。
改法
先 get clone 不 remove,`complete_handshake` 成功后才 remove,失败时用 local 调 abort:
```rust
fn finish_handshake(&self, py: Python<'>, remote_addr: String, remote_metadata: Vec) -> PyResult<()> {
py.allow_threads(|| {
let remote = HandshakeMetadata::from_bytes(&remote_metadata)?;
// 临界区:get clone,不 remove
let local = {
let state = self.state.lock().map_err(...)?;
state.pending_handshakes.get(&remote_addr).cloned()
.ok_or_else(|| PegaFlowError::new_err(...))?
};
// 锁外:complete I/O,失败时 abort
self.engine.complete_handshake(&remote_addr, &local, &remote)
.inspect_err(|| {
self.engine.abort_handshake(&remote_addr, &local);
})
.map_err(...)?;
// 临界区:成功后才 remove
self.state.lock().map_err(...)?.pending_handshakes.remove(&remote_addr);
Ok(())
})
}
```
xiaguan
left a comment
There was a problem hiding this comment.
Review: feat(nixl): add pega rdma v1 pull connector(第二轮)
第一轮两个 finding 改得干净利落:Rust 6 锁→1 锁、finish_handshake get-clone→失败 abort→成功才 remove、基类 hook + 子类直注 Rust。但第一轮修完后又引入/暴露了新的问题。
致命伤(必须修复)
accept_handshake_inner 失败时 connecting 泄漏 — Finding 2 的双胞胎 bug
accept_handshake_inner(python/src/rdma_v1.rs:425-457):
invalidate_connection(remote_addr) ← 只清 addr_connections,不清 connecting
get_or_prepare(remote_addr) ← 插入 connecting,返回 Prepared(local)
complete_handshake(remote_addr, &local, &remote) ← 失败时 connecting 不清理
complete_handshake → complete_handshake_for(pegaflow-transfer/src/rc_backend/mod.rs:274)。当 session.connect() 失败(mod.rs:330,网络抖动/远端崩溃/QP 异常),? 提前返回 Err,不会执行到 mod.rs:346 的 state.connecting.remove(remote_addr)。
此时 local 是局部变量,函数返回后 drop。pending_handshakes 从未插入这条记录(只有 prepare_handshake_inner 会插入,rdma_v1.rs:411)。后续没有任何路径能清理这个 connecting 条目——abort_handshake(rdma_v1.rs:194)需要从 pending_handshakes 取 local,但那里是空的。
后果链:
connecting残留 → 后续到同一 peer 的所有get_or_prepare永远返回AlreadyConnectingaccept_handshake_inner永远走ConnectionStatus::Connecting分支(rdma_v1.rs:447),返回Err("handshake to {remote_addr} already in progress")- P 侧 accept 路径永久卡死,只有进程重启能恢复
- D 侧
_handle_handshake_request_failure会调abort_handshake清理 D 自己的状态,但 P 侧无人补救
触发条件:session.connect() 失败。RDMA 网络抖动、远端 worker 崩溃后重启、QP 状态异常——都是生产环境的真实场景。
改法和 finish_handshake_inner(rdma_v1.rs:479-483)完全对称:
if let Err(err) = py.detach(|| self.engine.complete_handshake(remote_addr, &local, &remote))
{
py.detach(|| self.engine.abort_handshake(remote_addr, &local));
return Err(rdma_v1_error("accept_handshake complete failed", err));
}硬伤(不改会后悔)
id() 作为跨语言 key — 无清理路径 + GC 重用风险
_on_local_blocks_registered(pull_worker.py:871)用 id(nixl_handle) 作为 Rust local_block_tables 的 key,热路径用 id(local_xfer_side_handle)(pull_worker.py:1101)查找。
问题 1:Rust 侧 local_block_tables / remote_block_tables 从不清理(没有 remove 路径)。_cleanup_remote_engine(base_worker.py:2299)释放 NIXL handle 但不通知 Rust。_release_local_xfer_handlers 同理。stale 条目永久残留,多次 P/D 拓扑变化后缓慢泄漏内存。
问题 2:add_remote_agent 中 block_size_ratio > 1 时覆盖 src_xfer_handles_by_block_size[key](base_worker.py:1579),旧 handle 如果被 GC,新 handle 可能复用同一 id(),静默覆盖 Rust 中的旧条目。
改法方向:要么在 Rust 侧加 remove_local_blocks / remove_remote_blocks 并在 cleanup 路径调用,要么在基类 hook 中传入一个稳定的整数标识符(如递增序号)而非 id()。
软伤(看着碍眼)
二元组 handshake 兼容分支是死代码
base_scheduler.py:300-307 的 len(decoded) == 2 分支:
if len(decoded) == 2:
msg, target_tp_rank = decoded
request_extensions = None这是为纯 NIXL pull/push connector(NixlPullConnector / NixlPushConnector)保留的兼容路径。但这个仓库是 PegaFlow vendored connector,部署约束是"要么纯 Pega 要么不用",纯 NIXL connector 是死代码——它们仍注册在 vllm_plugin.py:60-68、导出在 __init__.py,但不会在生产中使用。
同理,base_worker.py:551-554 的二元组编码分支:
if request_extensions is None:
msg = msgspec.msgpack.encode((GET_META_MSG, remote_rank))
else:
msg = msgspec.msgpack.encode((GET_META_MSG, remote_rank, request_extensions))PegaNixlPullConnectorWorker._build_handshake_request_extensions 在 status == "existing" 时返回 None(pull_worker.py:970-972),所以 Pega connector 自己也会走二元组。但既然 P 侧永远只有一个 Pega scheduler,且 extension_handler 永远是 _handle_handshake_extensions(非空),应该让扩展永远存在(空 dict {} 也行),统一走三元组,删掉二元组分支。
pull_worker.py 从 ~350 行膨胀到 1240 行
PegaRdmaV1Config + NIC 解析 + PegaRdmaV1BrokerState + PegaRdmaV1Perf + broker 通信函数全是 Pega RDMA v1 基础设施,不是 NIXL pull worker 逻辑。PR description 说"diff stays local to the NIXL pull connector path"——但这不能让一个文件超过 1k 行。基础设施部分应独立到 pega_rdma_v1.py,pull_worker.py 只留 PegaNixlPullConnectorWorker。
结论
Request Changes — Finding 2 只修了发起方,响应方的同款 connecting 泄漏原封不动地留在了 accept_handshake_inner。id() 当跨语言 key + 从不清理的 block table 又是一颗哑弹。先把 accept 路径的泄漏补上,其余的看着办。
6d34a1c to
9020199
Compare
|
Update after the latest review fixes. What changed after review
Benchmark summaryRun setup: vLLM 0.24.0, DeepSeek-V2-Lite-Chat ( Baseline / vLLM NIXL pull / vLLM NIXL push are from:
|
Summary
This PR adds
PegaNixlPullConnector, a PegaFlow-backed pull connector variant for the vendored vLLM NIXL connector.Key changes:
PegaRdmaV1Enginesupport for RDMA v1 memory registration, native prepare/accept/finish/abort handshake state, async READ submission, polling, release, and connection invalidation.pull_scheduler.pyandpull_worker.py) so the diff stays local to the NIXL pull connector path.Modified by PegaFlow contributors in 2026.Design Notes
Validation
Additional validation run during the PR:
python -m py_compile python/pegaflow/nixl_connector/*.py python/pegaflow/vllm_plugin.py ruff check python/pegaflow/nixl_connector python/pegaflow/vllm_plugin.py python/pegaflow/pegaflow.pyi git diff --check origin/master...HEAD cargo check -p pegaflow-py --no-default-features --features cuda-13Remote native smoke run during the PR:
The final file-local refactor was validated with
py_compile,ruff check, andgit diff --check. The GPU benchmark was not rerun after that refactor because SSH access to the GPU hosts was unavailable; the refactor only moved helper code into the existing pull modules.Benchmark
Environment:
mlx5_ib7s400p*pega_nixl_pullnixl_pullnixl_pushPega RDMA v1 perf summary after the block-index fast path:
read_async_submit:n=80,avg=0.570ms,max=0.713msread_async_submit:n=80,avg=0.575ms,max=0.764msbuild_descs ~2.0ms/read + read_async_submit ~0.98ms/read.Review Notes
Manual review focused on: