Authoritative on: how off-thread code (HTTP listener, worker threads, slot poller) gets work onto the game thread, how trampolines drain that work, and how
DrainSitecollapses the boilerplate.
Many UE engine APIs (UObject::ProcessEvent, anything that
triggers replication, blueprint events, network RPCs) are only
safe to call from the game thread. The HTTP listener runs on a
worker thread, so op handlers that need to invoke those APIs
enqueue a job and block on a reply channel until the
game thread drains it.
ueforge::Queue is the core type. Lock-free empty-check,
mutex-protected push/drain, re-entrance guard, reply channels.
use ueforge::Queue;
static PE_QUEUE: Queue = Queue::new();let result = PE_QUEUE.enqueue(
move || run_on_game_thread(cmd),
Duration::from_secs(5),
);enqueue returns Result<Json, String>:
Ok(json). The job ran on the game thread, here's its result.Err("...timed out..."). The drain didn't fire within the timeout. The job did NOT run.Err("queue closed"). The reply channel was dropped (process shutting down).
The caller blocks on the reply. Pick the timeout based on how often your drain site fires. A damage multicast fires every player tick; 5s is generous. A character tick fires every frame; 1s is plenty. A reload-only drain might need 30s+.
let stats = PE_QUEUE.drain();drain() semantics:
- Atomic empty-check. If empty, returns
DrainStats::default()immediately (one load + branch). - CAS the
DRAININGflag. If already set (re-entrance), returnDrainStats { reentered: true, .. }. The outer drain will process this work. - Take the queue contents, run each job, send the result to its
reply channel, return
DrainStats { peak, drained, reentered: false }.
Re-entrance: a job itself can trigger ProcessEvent fan-out that re-enters the same trampoline draining us. The DRAINING flag prevents recursion. The outer drain finishes the original batch; the inner returns immediately.
pub struct DrainStats {
pub peak: usize, // queue length before drain
pub drained: usize, // jobs run
pub reentered: bool, // true if drain skipped due to re-entrance
}Hot trampolines call drain() every fire. The empty path must
be cheap:
fn drain(&self) -> DrainStats {
if self.size.load(Ordering::Acquire) == 0 {
return DrainStats::default(); // one atomic load + branch
}
// ... slow path ...
}size is an AtomicUsize shadow updated on enqueue / drain. No
mutex on the empty path. This means a trampoline draining a
queue that's empty 99% of the time pays ~5ns per fire.
Each enqueued job carries its own crossbeam_channel::bounded(1)
reply channel. The enqueue side sends once, the drain side
recv's once, single-slot allocation. This replaces the older
std::sync::mpsc::channel shape; the bounded(1) form is the
canonical oneshot and avoids a multi-slot allocator on the hot
enqueue path.
pe_queue ships four tests that pin the documented behaviour:
| Test | Asserts |
|---|---|
drain_empty_is_noop |
empty queue drain returns DrainStats::default() |
enqueue_then_drain_returns_result |
enqueue + drain round-trips the job result |
reentrant_drain_is_skipped |
drain called while DRAINING returns reentered: true without recursing |
cancelled_jobs_are_skipped |
jobs whose enqueue caller timed out are dropped on the floor instead of running |
ueforge::pe_queue::DrainSite wraps Queue with the standard
performance-counter quad (drain calls, drained cmds, peak depth,
time_ns spent). One static replaces a Queue plus four hand-
declared counter! statics plus a 20-line drain_pending
wrapper.
use ueforge::pe_queue::DrainSite;
pub(crate) static PE_QUEUE: DrainSite = DrainSite::new();
// Off-thread: enqueue via .queue()
PE_QUEUE.queue().enqueue(move || work(), Duration::from_secs(5))?;
// Game-thread: drain
PE_QUEUE.drain(); // bumps drain_calls; bumps drained_cmds + peak only on actual drainCounters surfaced through accessors:
PE_QUEUE.drain_calls() // u64: total drain calls (including empty fast-paths)
PE_QUEUE.drained_cmds() // u64: total commands drained
PE_QUEUE.peak() // usize: highest queue depth observed
PE_QUEUE.time_ns() // u64: nanoseconds spent inside non-empty drainsIn your snapshot endpoint:
"drain_pending_calls": PE_QUEUE.drain_calls(),
"drain_pending_drained_cmds": PE_QUEUE.drained_cmds(),
"pe_queue_peak": PE_QUEUE.peak(),
"time_ns_drain_pending": PE_QUEUE.time_ns(),The "game thread" is whatever thread drains your queue. ueforge
doesn't pick one for you. You choose, by deciding which
ProcessEvent trampoline calls PE_QUEUE.drain() at the top.
The site needs to:
- Run on the game thread. Anything called via
ProcessEventfrom native game code does. (NetMulticast trampolines, BP event dispatcher targets, tick functions all qualify.) - Fire frequently. A queue drained once per second has 1-second-latency ops. A queue drained on every tick has per-frame latency. Pick based on what your timeouts tolerate.
- Run in foreground gameplay. A drain site behind a
if skill_activecheck stalls when the skill is off. Bbp learned this when animpact_resistancemask blocked multicast traffic and starved its drain.
| Drain site | Cadence | Caveat |
|---|---|---|
Damage multicast (e.g. MulticastHandleEffectsWithDamageFlags) |
per player damage tick | stalls when player isn't taking/dealing damage |
Character tick (e.g. ASurvivalCharacter::ReceiveTick) |
per frame | hooked but rarely targeted |
| Inventory open/close | event-driven | only fires when UI opens |
Global PE pre-callback (RegisterProcessEventPreCallback) |
every PE in the process | requires UE4SS shim glue (Wave E1, deferred) |
The global PE pre-callback is the future right answer. Until that lands, the damage multicast is g2rpg's choice; OWS may pick the character tick.
Multiple DrainSite instances, one per drain. They don't share
state. A status-effect snapshot drain might live on a different
site than a debug call-op drain. Both work; just don't share
one DrainSite across multiple unrelated workflows.
Re-entrance is real, not theoretical. Examples:
- Heal calls into damage trampoline. A queued
AddHealthcall setsCurrentDamage, which firesOnRep_CurrentDamage, which dispatches via PE. Back into the damage hook that's draining us. - AddEffect re-enters effect-tick handler. Adding a status effect can fire its initial tick UFunction, which fans out into PE.
- Inventory mutation triggers UI update. Setting a slot
count can fire
OnInventoryChanged, which dispatches PE on the widget you're hooking.
The DRAINING flag handles all of these correctly: the inner
drain returns reentered: true, the outer drain finishes its
batch, the recursion never deepens.
If your handler logic depends on whether you're inside a drain,
read Queue::is_draining (TODO: add this; right now it's
internal). Usually you don't care. Drain semantics are
self-correcting.
Queue::enqueue allocates: one Box<dyn FnOnce> for the job +
one Sender<Result> channel half. This is fine; enqueue is the
worker-thread-side cold path. Don't enqueue from a hot
trampoline. You'll allocate per fire.
Queue::drain allocates: one Vec<Pending> to take the queue
contents under the lock and process them outside. The Vec is
the queue's existing buffer (via mem::take), not fresh.
Per-job cost is whatever your job closure does + one channel
send. Channel send for a Result<Json, String> is one heap
write + one wakeup; ~100ns budget.
Queue is bounded. The default cap is DEFAULT_MAX_DEPTH = 1024;
override via Queue::with_capacity(n). enqueue past the cap
returns Err("ueforge: PE queue full ({len} >= cap {cap})")
without enqueuing. Without the cap a misbehaving HTTP client
(or a stuck drain site) could pile entries until the host
process runs out of memory.
1024 is ~100x the largest realistic burst from our test client today. Tune down for memory-tight builds; tune up only if a legitimate workflow brushes the ceiling.
enqueue(job, timeout) blocks the caller on a one-shot reply
channel. If recv_timeout fires, the caller used to return
"timed out" while the job stayed queued. The drain ran it
later, the result was dropped (the receiver was gone), and the
client's natural retry double-executed non-idempotent ops
like spend_points, write_bytes, call.
Each Pending now carries an Arc<AtomicBool> cancelled. On
recv_timeout the enqueue side flips the flag; on the next
drain, Queue::drain skips any pending whose flag is set
before invoking the closure. The semantics:
- Caller times out -> job marked cancelled -> never runs.
- Caller times out AFTER the drain already started the closure -> closure runs to completion (we don't yank a job mid-execution); reply send fails silently because the receiver is gone.
- Drain completes within the timeout -> reply delivered, normal path.
Cancellation is best-effort. It plugs the common race (client retried after timeout, original job still queued) but doesn't make non-idempotent ops magically safe; design for idempotency where you can.
- hooks.md. Where you install the trampoline that drains
- counters.md. The counter conventions DrainSite uses
- http.md. How the HTTP listener uses
Queue::enqueuefor game-thread ops - PERFORMANCE.md. Empty-path discipline rule