Skip to content

feat: disk-backed fallback when memory exceeds 80% RSS#3

Open
StrongWind1 wants to merge 6 commits into
mainfrom
feat/disk-backed-fallback
Open

feat: disk-backed fallback when memory exceeds 80% RSS#3
StrongWind1 wants to merge 6 commits into
mainfrom
feat/disk-backed-fallback

Conversation

@StrongWind1
Copy link
Copy Markdown
Owner

When process RSS reaches 80% of system RAM, automatically switch heavy stores to disk-backed mode. Small captures stay fully in-memory; large ones spill automatically. No new dependencies.

Changes

Memory monitor (src/mem_monitor.rs)

  • RSS-based 80% threshold with sticky disk_mode flag
  • Checks every 50K packets + every file transition
  • would_exceed() predicts whether a large allocation would cross the threshold
  • WPAWOLF_MEM_THRESHOLD env var override for testing (integer percent)

MessageStore disk fallback (src/store/messages.rs, src/store/disk_messages.rs)

  • Binary serialization format for EapolMessage (99-byte fixed header + optional 57-byte FtFields + variable frame)
  • flush_to_disk() serializes all groups to temp file, replaces Vec<EapolMessage> with lightweight Vec<MessageRef> (8 bytes vs ~228)
  • New messages route directly to disk via add_to_disk()
  • group_keys() + load_group() for lazy Phase 4 iteration (one group in memory at a time)
  • canonicalize_pairs() rewrites index keys without loading message data

PmkidStore disk fallback (src/store/pmkid.rs)

  • Same pattern as MessageStore
  • Per-pair HashSet<[u8;16]> tracks seen PMKID values in disk mode (prevents duplicate insertion without scanning the disk file)
  • Populated during flush_to_disk() so entries serialized pre-flush are tracked

Phase 4 disk-mode pairing (src/pair/mod.rs)

  • pair_all_groups_disk(): single-threaded iteration via group_keys() + load_group()
  • Each group loaded, paired, emitted, then dropped before the next loads
  • estimate_total_cost() returns 0 in disk mode (dedup pre-sizing skipped)

PerSinkDedup reserve() fix (src/output/dedup.rs)

  • reserve() now takes an active_sinks mask, only pre-sizes HashSets for configured sinks
  • Fixes 9x over-allocation when only 1-2 of the 9 sinks are active

Disk-backed dedup (src/output/disk_dedup.rs)

  • Write-through mode: hash lines go directly to output files (accepting temporary duplicates)
  • Fingerprints recorded in 256 partitioned bucket files per sink (fingerprint % 256, 16 bytes per record)
  • Post-emission cleaning pass: sort each bucket, find duplicate fingerprints, rewrite output files without duplicates
  • Mid-emission switchover: flush in-memory HashSet to buckets with sentinel line numbers, drain HashSet, continue in write-through mode
  • Drop impl ensures bucket files are cleaned up

Main integration (src/main.rs)

  • MemMonitor checks every 50K packets + every file transition
  • Both stores flush when disk_mode activates
  • Disk writers flushed before Phase 4 emit
  • Temp files cleaned up on shutdown

Testing

  • 916 tests pass (839 unit + 77 integration), 0 failures
  • make clean && make check-all passes clean on both branches
  • Corpus test (2038 files, 70.8M packets): sorted-content SHA-256 identical between in-memory and forced disk mode
  • Peak RSS drops 52% in disk mode (681 MiB -> 324 MiB on test corpus)
  • Runtime: ~9s in-memory, ~34s in disk mode (I/O bound, single-threaded pairing)

Add automatic memory pressure detection via MemMonitor (80% RSS
threshold). When triggered, MessageStore and PmkidStore flush their
in-memory data to temp files and switch to disk-backed mode for the
remainder of the run.

MessageStore disk mode:
- Binary serialization format for EapolMessage (99-byte fixed header +
  optional 57-byte FtFields + variable eapol_frame)
- flush_to_disk() serializes all groups, replaces Vec<EapolMessage>
  with Vec<MessageRef> (8 bytes per message vs ~228)
- New messages route directly to disk via add_to_disk()
- group_keys() + load_group() for lazy Phase 4 iteration
- canonicalize_pairs() rewrites index keys without loading data

PmkidStore disk mode:
- Same pattern: binary serialization, flush, disk-backed add/iter
- iter() returns owned PmkidEntry values (Box<dyn Iterator>)

Pairing engine (pair/mod.rs):
- Disk mode: single-threaded iteration via group_keys() + load_group()
- Memory mode: unchanged rayon parallel path
- estimate_total_cost() returns 0 in disk mode (skip dedup pre-sizing)

Output pipeline:
- PerSinkDedup::reserve() now takes active_sinks mask, only pre-sizes
  HashSets for configured sinks (fixes 9x over-allocation bug)

Main integration:
- MemMonitor checks every 50K packets + every file transition
- Flush both stores when disk_mode activates
- Cleanup temp files on shutdown
Add memory pressure check before PerSinkDedup::reserve(). When the
estimated allocation (12 bytes × estimated_hashes × active_sink_count)
would push RSS past the 80% threshold, skip pre-sizing entirely and
let sets grow incrementally. This prevents the single-shot allocation
OOM that occurs when the estimated cost is in the billions.

Pass MemMonitor through OutputContext::emit() and run_output() so the
output pipeline can check memory pressure before pre-sizing.

Fix non-ASCII em-dashes in mem_monitor.rs and disk_messages.rs.
When the dedup HashSet would exceed the 80% RSS threshold, switch to
write-through mode: hash lines go directly to output files (accepting
temporary duplicates) while fingerprints are recorded in 256 partitioned
bucket files per sink (fingerprint % 256, 16 bytes per record).

After emission completes, a cleaning pass processes buckets one at a
time: sort by fingerprint, identify runs with count > 1, collect line
numbers to remove (keep first occurrence), then rewrite each output
file without the duplicate lines.

Mid-emission switchover is supported: if memory pressure activates
during Phase 4 output, the in-memory HashSet is flushed to bucket
files with sentinel line numbers (u64::MAX), the HashSet is drained
to free memory, and emission continues in write-through mode. The
cleaning pass handles the mixed state correctly.

New module: src/output/disk_dedup.rs
  - DiskDedup: coordinator with per-sink bucket state
  - DiskDedupSink: 256 bucket file writers per sink
  - build_removal_set(): sort-based duplicate detection
  - rewrite_without_lines(): line-number-based output filter
  - Drop impl ensures bucket files are cleaned up
  - 5 unit tests covering no-dups, dups, sentinels, cleanup

Modified: src/output/dedup.rs
  - SinkId::from_index() for index-to-enum conversion
  - PerSinkDedup::flush_to_buckets() for mid-emission switchover
  - PerSinkDedup::drain() to free HashSet memory

Modified: src/output/mod.rs
  - fan_out() accepts Option<DiskDedup> for write-through mode
  - OutputContext holds disk_dedup state
  - Cleaning pass runs in finalize() before auxiliary outputs
  - HashSinks::path() accessor for cleaning pass
Two bugs found during forced disk-mode testing (WPAWOLF_MEM_THRESHOLD=1):

1. BufWriter flush: add_to_disk() writes through a BufWriter, but
   load_group()/iter() opens the file for reading independently. Records
   still in the BufWriter buffer were invisible to the reader, causing
   124 PMKID lines to be silently lost. Fix: add flush_disk_writer()
   methods to both MessageStore and PmkidStore, called before Phase 4.

2. PMKID dedup in disk mode: add_to_disk() skipped the per-pair
   byte-equality check, allowing duplicate PMKIDs through. Fix: add
   a disk_seen HashMap<MacPair, HashSet<[u8;16]>> that tracks seen
   PMKID values per pair. Populated during flush_to_disk() for
   already-stored entries. Costs ~20 bytes per unique PMKID.

Also adds WPAWOLF_MEM_THRESHOLD env var override (integer percent)
for testing disk fallback without needing a machine at 80% RSS.

Verified: WPAWOLF_MEM_THRESHOLD=1 produces sorted-content-identical
output to the in-memory path (SHA-256 match, 0 diff lines).
The disk-backed fallback makes --per-file unnecessary -- memory pressure
is handled automatically by spilling stores to disk. Remove the flag,
all per-file code paths (per-file emit loop, per-file WDS resolve,
per-file MLD canonicalization), and the per-file integration test.

--strict now bundles 4 filters instead of 5: --eapoltimeout=5,
--rc-drift=8, --dedup-hash-combos, --nc-dedup.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant