diff --git a/.gitignore b/.gitignore index ebe488d..43edf98 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,9 @@ __pycache__ CLAUDE.md .claude/ .vscode/ +.mcp.json +.vouch +*.md # Rust/Cargo build artifacts target/ @@ -29,3 +32,8 @@ tmp/ data/ CLAUDE.md + +# Local tooling / scratch +.mcp.json +*.md +.vouch/ diff --git a/allways/__init__.py b/allways/__init__.py index 17a1bcb..55a5dd3 100644 --- a/allways/__init__.py +++ b/allways/__init__.py @@ -1,3 +1,3 @@ -__version__ = '1.0.10' +__version__ = '1.0.11' version_split = __version__.split('.') __spec_version__ = (1000 * int(version_split[0])) + (10 * int(version_split[1])) + (1 * int(version_split[2])) diff --git a/allways/chain_providers/base.py b/allways/chain_providers/base.py index af5292d..9c0fd12 100644 --- a/allways/chain_providers/base.py +++ b/allways/chain_providers/base.py @@ -31,6 +31,11 @@ class ChainProvider(ABC): 3. Add {ENV_PREFIX}_* vars to .env """ + # True if this provider's RPCs hit the shared substrate websocket, so + # callers must serialise them under axon_lock. HTTP-backed providers leave + # this False and stay lock-free. + uses_substrate: bool = False + @abstractmethod def get_chain(self) -> ChainDefinition: ... diff --git a/allways/chain_providers/bitcoin.py b/allways/chain_providers/bitcoin.py index 2706aad..8bc3a72 100644 --- a/allways/chain_providers/bitcoin.py +++ b/allways/chain_providers/bitcoin.py @@ -4,10 +4,11 @@ from urllib.parse import urlparse import base58 -import bech32 import bittensor as bt import requests from bitcoin_message_tool.bmt import sign_message, verify_message +from embit.networks import NETWORKS +from embit.script import address_to_scriptpubkey from allways.chain_providers.base import ChainProvider, ProviderUnreachableError, TransactionInfo from allways.chains import CHAIN_BTC, ChainDefinition @@ -53,20 +54,15 @@ def to_mainnet_wif(wif: str) -> str: def to_mainnet_address(address: str) -> str: - """Convert a testnet/regtest address to mainnet equivalent for verification.""" - if address.startswith('bcrt1') or address.startswith('tb1'): - hrp, data = bech32.bech32_decode(address) - if data is not None: - return bech32.bech32_encode('bc', data) - if address.startswith(('m', 'n')): - decoded = base58.b58decode_check(address) - if decoded[0] == 0x6F: - return base58.b58encode_check(bytes([0x00]) + decoded[1:]).decode() - if address.startswith('2'): - decoded = base58.b58decode_check(address) - if decoded[0] == 0xC4: - return base58.b58encode_check(bytes([0x05]) + decoded[1:]).decode() - return address + """Convert a testnet/regtest address to mainnet equivalent for verification. + + Re-encodes the scriptPubKey under the mainnet network (handles legacy, segwit + v0, and Taproot); returns the address unchanged if it can't be parsed. + """ + try: + return address_to_scriptpubkey(address).address(NETWORKS['main']) + except Exception: + return address def parse_esplora_urls(raw: str, auth_header: str = 'Authorization') -> list[tuple[str, Optional[dict]]]: @@ -530,15 +526,11 @@ def api_get_balance(self, address: str) -> int: return 0 def is_valid_address(self, address: str) -> bool: - """Validate BTC address format without RPC (bech32/base58 decode).""" + """Validate BTC address format without RPC (embit decode; all types incl. Taproot).""" if not address or not isinstance(address, str): return False try: - if address.lower().startswith(('bc1', 'tb1', 'bcrt1')): - hrp, data = bech32.bech32_decode(address) - return data is not None - decoded = base58.b58decode_check(address) - return len(decoded) == 21 and decoded[0] in (0x00, 0x05, 0x6F, 0xC4) + return address_to_scriptpubkey(address) is not None except Exception: return False diff --git a/allways/chain_providers/subtensor.py b/allways/chain_providers/subtensor.py index cdfa998..38e3707 100644 --- a/allways/chain_providers/subtensor.py +++ b/allways/chain_providers/subtensor.py @@ -20,6 +20,9 @@ class SubtensorProvider(ChainProvider): clear error if they attempt to send. """ + # RPCs run on the shared substrate websocket — callers serialise via axon_lock. + uses_substrate = True + # Balances pallet index and transfer call indices on Subtensor _BALANCES_PALLET = 5 _TRANSFER_CALLS = {0: 'transfer_allow_death', 3: 'transfer_keep_alive', 7: 'transfer_all'} diff --git a/allways/constants.py b/allways/constants.py index 96eb6a3..41ac63f 100644 --- a/allways/constants.py +++ b/allways/constants.py @@ -57,10 +57,11 @@ ('tao', 'btc'): 0.5, ('btc', 'tao'): 0.5, } -# 100% → 1.0, 90% → 0.729, 80% → 0.512, 50% → 0.125 -SUCCESS_EXPONENT: int = 3 -# Idle-crown penalty: 0 = none, 1 = pure volume share, 0.5 = half-credit floor. -VOLUME_WEIGHT_ALPHA: float = 0.5 +# λ in reward = credibility · (λ·qvol_share + (1−λ)·crown_term). Crown-DOMINANT +# on purpose: realized volume is sybil/wash-inflatable until swap counterparties +# are verifiable, so the volume share is kept small. Raising λ toward volume is +# gated on wash filtering — do not crank this without it. +QVOL_REWARD_WEIGHT: float = 0.3 # Closed swaps required for full credibility (0 → 100% linear ramp). CREDIBILITY_RAMP_OBSERVATIONS: int = 10 # More than this many timed-out swaps within CREDIBILITY_WINDOW_BLOCKS hard-zeros @@ -68,6 +69,21 @@ # out of the rolling window. 0-2 tolerated; the 3rd timeout zeros credibility. CREDIBILITY_MAX_TIMEOUTS: int = 2 +# ─── Depth / Quality ───────────────────────────────────── +# Weight reward by how far a rate beats the per-direction "market" reference (a +# trimmed, volume-weighted, recency-decayed average of the subnet's own +# completed-swap clearing rates) — applied to both crown availability and the +# quality-weighted-volume term. Floored at QUALITY_FLOOR so it's forgiving. +# Reference and factor are scoring-side only — no contract change. +QUALITY_EMA_HALF_LIFE_BLOCKS: int = 7_200 # ~1 day — clearing-rate weight halves each +QUALITY_TRIM_PCT: float = 0.10 # Drop top/bottom 10% by rate before averaging (kills wash/outlier extremes) +QUALITY_PER_MINER_CAP: float = 0.30 # No single hotkey contributes >30% of the reference weight +QUALITY_N_MIN: int = 20 # Below this many post-trim observations → reference disabled (factor 1.0) +# Improvement past the reference that earns the full bonus. THE tuning knob — +# calibrate against real live rate dispersion before trusting it. +QUALITY_ANCHOR: float = 0.05 # 5% better than market → full bonus +QUALITY_FLOOR: float = 0.5 # Crown holder never earns below half on rate quality alone + # ─── Emission Recycling ──────────────────────────────────── RECYCLE_UID = 53 # Subnet owner UID @@ -136,3 +152,8 @@ DEFAULT_MIN_SWAP_AMOUNT_RAO = 100_000_000 # 0.1 TAO DEFAULT_MAX_SWAP_AMOUNT_RAO = 500_000_000 # 0.5 TAO RESERVATION_TTL_BLOCKS = 50 # ~10 min + +# Blocks past a retained entry's last-known timeout_block before discard. Sized for the contract's worst case: +# MAX_EXTENSIONS_PER_SWAP extensions each push the deadline up to MAX_EXTENSION_BLOCKS further (not cumulative). +# A smaller margin can discard a still-active twice-extended swap and re-send on rediscovery (#461). ~550 ≈ 1.8h. +SENT_CACHE_DISCARD_MARGIN_BLOCKS = MAX_EXTENSIONS_PER_SWAP * MAX_EXTENSION_BLOCKS + DEFAULT_FULFILLMENT_TIMEOUT_BLOCKS diff --git a/allways/miner/fulfillment.py b/allways/miner/fulfillment.py index 0605f64..bde1af7 100644 --- a/allways/miner/fulfillment.py +++ b/allways/miner/fulfillment.py @@ -9,7 +9,7 @@ from allways.chain_providers.base import ChainProvider, ProviderUnreachableError from allways.classes import Swap -from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS +from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS, SENT_CACHE_DISCARD_MARGIN_BLOCKS from allways.contract_client import AllwaysContractClient, ContractError, is_contract_rejection from allways.utils.logging import log_on_change from allways.utils.rate import expected_swap_amounts @@ -23,11 +23,16 @@ class SentSwap: True after the contract accepts ``mark_fulfilled``. A retry after crash finds this record, skips re-sending (prevents double-sends), and only re-calls mark_fulfilled if it didn't already succeed. + + ``timeout_block`` is the swap's last-known (possibly extended) deadline, + snapshotted so ``cleanup_stale_sends`` can bound how long an unmarked entry + is retained. 0 means unknown (legacy cache entry) — never deadline-discarded. """ to_tx_hash: str to_tx_block: int marked_fulfilled: bool + timeout_block: int = 0 class SwapFulfiller: @@ -62,6 +67,7 @@ def __init__( self.sent: Dict[int, SentSwap] = {} self.mark_fulfilled_attempts: Dict[int, int] = {} self.cushion_warned: Set[int] = set() + self.unmarked_stale_warned: Set[int] = set() self.sent_cache_path = sent_cache_path self.load_sent_cache() @@ -76,6 +82,7 @@ def load_sent_cache(self): to_tx_hash=entry[0], to_tx_block=entry[1], marked_fulfilled=bool(entry[2]), + timeout_block=entry[3] if len(entry) > 3 else 0, ) if self.sent: bt.logging.info(f'Restored {len(self.sent)} cached send(s) from disk') @@ -88,7 +95,10 @@ def save_sent_cache(self): return try: self.sent_cache_path.parent.mkdir(parents=True, exist_ok=True) - data = {str(swap_id): [s.to_tx_hash, s.to_tx_block, s.marked_fulfilled] for swap_id, s in self.sent.items()} + data = { + str(swap_id): [s.to_tx_hash, s.to_tx_block, s.marked_fulfilled, s.timeout_block] + for swap_id, s in self.sent.items() + } tmp = self.sent_cache_path.with_suffix('.tmp') tmp.write_text(json.dumps(data)) tmp.rename(self.sent_cache_path) @@ -96,21 +106,63 @@ def save_sent_cache(self): bt.logging.error(f'CRITICAL: Failed to persist sent cache: {e}') def cleanup_stale_sends(self, active_swap_ids: Set[int]): - """Remove cached send results for swaps no longer active.""" + """Drop cached send results that are safe to forget. + + A transient ``get_swap`` gap can make the poller drop a still-active + swap, so an unmarked send (dest funds out, ``mark_fulfilled`` not yet + landed) must be retained: dropping it would let a rediscovered swap send + funds a second time. We keep unmarked entries until either they're + marked fulfilled, or the chain is provably past their last-known + deadline (``SENT_CACHE_DISCARD_MARGIN_BLOCKS`` beyond ``timeout_block``), + at which point the swap can't still be active and retention only leaks. + """ stale = [sid for sid in self.sent if sid not in active_swap_ids] - unmarked = [sid for sid in stale if not self.sent[sid].marked_fulfilled] - for sid in stale: + removable = [sid for sid in stale if self.sent[sid].marked_fulfilled] + unmarked_stale = [sid for sid in stale if not self.sent[sid].marked_fulfilled] + + current_block = None + if unmarked_stale: + try: + current_block = self.subtensor.get_current_block() + except Exception as e: + # Without a block height we can't prove a deadline has passed — + # retain everything rather than risk discarding a live entry. + bt.logging.debug(f'cleanup_stale_sends: get_current_block failed, retaining unmarked sends: {e}') + + expired = [] + if current_block is not None: + expired = [ + sid + for sid in unmarked_stale + if self.sent[sid].timeout_block > 0 + and current_block > self.sent[sid].timeout_block + SENT_CACHE_DISCARD_MARGIN_BLOCKS + ] + + for sid in removable + expired: self.sent.pop(sid) self.mark_fulfilled_attempts.pop(sid, None) + self.unmarked_stale_warned.discard(sid) self.cushion_warned -= self.cushion_warned - active_swap_ids - if stale: - bt.logging.info(f'Cleaned up stale send cache for {len(stale)} swap(s): {stale}') - if unmarked: - bt.logging.warning( - f'Stale send(s) without confirmed mark_fulfilled — funds may have been sent without ' - f'on-chain credit: {unmarked}' - ) + self.unmarked_stale_warned -= active_swap_ids + + if removable or expired: self.save_sent_cache() + if removable: + bt.logging.info(f'Cleaned up stale send cache for {len(removable)} marked swap(s): {removable}') + if expired: + bt.logging.warning( + f'Discarded stale send(s) past deadline without confirmed mark_fulfilled — funds may have ' + f'been sent without on-chain credit: {expired}' + ) + + retained = [sid for sid in unmarked_stale if sid not in expired] + newly_retained = [sid for sid in retained if sid not in self.unmarked_stale_warned] + if newly_retained: + bt.logging.warning( + f'Retaining unmarked send(s) to avoid duplicate destination sends if the swap reappears: ' + f'{newly_retained}' + ) + self.unmarked_stale_warned.update(newly_retained) def verify_swap_safety(self, swap: Swap) -> Optional[Tuple[int, str]]: """Verify the swap is safe to fulfill. @@ -226,9 +278,10 @@ def process_swap(self, swap: Swap) -> bool: Idempotent across forward steps — the ``sent`` cache tracks both the dest-tx outcome and whether ``mark_fulfilled`` has landed, so retry - polls never double-send and never double-call the contract. Cache + polls never double-send and never double-call the contract. Marked entries live until ``cleanup_stale_sends`` drops them once the swap - leaves the active set. + leaves the active set; unmarked entries are retained (up to a deadline + margin) to keep a rediscovered swap from sending destination funds again. Three possible starting states when this runs: - no prior record → send dest funds, then mark fulfilled @@ -244,36 +297,48 @@ def process_swap(self, swap: Swap) -> bool: bt.logging.info(f'Processing swap {swap.id}: {swap.from_chain} -> {swap.to_chain}') - # Step 1: Verify swap safety (timeout, rate, collateral) - safety_result = self.verify_swap_safety(swap) - if safety_result is None: - bt.logging.warning(f'Swap {swap.id}: failed safety checks, skipping') - return False - - user_receives_amount, my_source_address = safety_result + if sent is None: + # First pass — gate the send on safety (timeout cushion, rate, + # source funds), then send. The cushion blocks STARTING a fulfill + # with too little runway left for a rescue extension. + safety_result = self.verify_swap_safety(swap) + if safety_result is None: + bt.logging.warning(f'Swap {swap.id}: failed safety checks, skipping') + return False + user_receives_amount, my_source_address = safety_result - # Step 2: Verify user sent source funds - if not self.verify_user_sent_funds(swap, my_source_address): - return False + if not self.verify_user_sent_funds(swap, my_source_address): + return False - # Step 3: Send destination funds — unless we already did on a previous - # pass, in which case we skip straight to the mark_fulfilled retry. - if sent is None: send_result = self.send_dest_funds(swap, user_receives_amount) if not send_result: bt.logging.error(f'Swap {swap.id}: failed to send dest funds') return False to_tx_hash, to_tx_block = send_result - sent = SentSwap(to_tx_hash=to_tx_hash, to_tx_block=to_tx_block, marked_fulfilled=False) + sent = SentSwap( + to_tx_hash=to_tx_hash, + to_tx_block=to_tx_block, + marked_fulfilled=False, + timeout_block=swap.timeout_block, + ) self.sent[swap.id] = sent self.save_sent_cache() else: + # Funds are already out — skip the cushion/safety gate (scoped to + # STARTING a fulfill); retrying mark_fulfilled to the deadline only + # helps and avoids a timeout slash of a miner that paid (#462). + # Recompute the post-fee amount (rate is snapshotted on the swap). + _, user_receives_amount = expected_swap_amounts(swap, self.fee_divisor) + # Keep the retained deadline current with any extension seen while + # active, so cleanup never discards a still-extendable swap early. + if swap.timeout_block > sent.timeout_block: + sent.timeout_block = swap.timeout_block + self.save_sent_cache() bt.logging.info(f'Swap {swap.id}: retrying mark_fulfilled for cached send tx {sent.to_tx_hash[:16]}...') - # Step 4: Mark fulfilled on contract. We pass ``user_receives_amount`` - # as ``to_amount`` because at mark_fulfilled time the contract stores - # the actual sent amount (post-fee), which is what ``swap.to_amount`` - # becomes after the call. + # Mark fulfilled on contract. ``user_receives_amount`` is the post-fee + # amount the contract stores as ``to_amount`` (what ``swap.to_amount`` + # becomes after the call). try: self.client.mark_fulfilled( wallet=self.wallet, diff --git a/allways/validator/axon_handlers.py b/allways/validator/axon_handlers.py index 302a2bb..3e53f43 100644 --- a/allways/validator/axon_handlers.py +++ b/allways/validator/axon_handlers.py @@ -304,6 +304,13 @@ async def handle_swap_reserve( ) try: + # Halt blocks reservations contract-side; fast-reject here so a halt + # can't flood doomed vote_reserve extrinsics that starve confirm/timeout + # votes. halted() fails open, so an RPC blip falls through to the contract. + if validator.bounds_cache.halted(): + reject_synapse(synapse, 'System is halted — reservations paused', ctx) + return synapse + # Cheap, local checks BEFORE axon_lock — invalid signatures, missing fields, # and bad direction are rejected without serializing on the substrate websocket. if not synapse.from_address or not synapse.from_address_proof: @@ -325,13 +332,6 @@ async def handle_swap_reserve( reject_synapse(synapse, 'Invalid source address proof', ctx) return synapse - # Source-chain RPC — separate connection from substrate, so it doesn't - # need axon_lock and shouldn't block the substrate websocket. - balance = provider.get_balance(synapse.from_address) - if balance < synapse.from_amount: - reject_synapse(synapse, 'Insufficient source balance', ctx) - return synapse - # Pure-local crypto — compute the request hash outside the lock as a cheap pre-check. from_addr_bytes = synapse.from_address.encode('utf-8') miner_bytes = bytes.fromhex(Keypair(ss58_address=miner).public_key.hex()) @@ -347,7 +347,11 @@ async def handle_swap_reserve( ) ) - # Everything below touches substrate (commitment read, contract reads, vote). + # Substrate early-reject checks (commitment / slippage / already-reserved / + # cooldown) run BEFORE the source-balance lookup. The balance call is the + # only external dependency on this path — for a BTC source it is an uncached + # Esplora HTTP request — so doing it last means spam destined for any of these + # cheap rejections never reaches it, capping per-request amplification. with validator.axon_lock: commitment = load_swap_commitment(validator, miner) if commitment is None: @@ -454,6 +458,26 @@ async def handle_swap_reserve( ) return synapse + # Source balance is the most expensive gate, so it runs last — only after a + # request has cleared every cheap rejection. A TAO source reads balance over + # the shared substrate websocket, so it must serialise under axon_lock; a BTC + # source is HTTP and stays lock-free to avoid stalling the forward loop behind + # a slow Esplora call. + if provider.uses_substrate: + with validator.axon_lock: + balance = provider.get_balance(synapse.from_address) + else: + balance = provider.get_balance(synapse.from_address) + if balance < synapse.from_amount: + reject_synapse(synapse, 'Insufficient source balance', ctx) + return synapse + + # Submit the reserve vote. The contract is the atomic gate; the handler + # checks above are best-effort early-rejects. Moving the balance lookup + # ahead of the vote opens a small window in which a concurrent request could + # reserve this miner first — that race costs at most one doomed vote_reserve, + # which the contract rejects, so the early-reject guarantee is unchanged. + with validator.axon_lock: bt.logging.info( f'{ctx}: preflight ok — collateral={collateral} reserved_until={reserved_until} ' f'cur_block={cur_block} → submitting vote_reserve' diff --git a/allways/validator/event_watcher.py b/allways/validator/event_watcher.py index fe3b52f..eb04667 100644 --- a/allways/validator/event_watcher.py +++ b/allways/validator/event_watcher.py @@ -720,6 +720,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None tao = int(values.get('tao_amount') or 0) fee = int(values.get('fee_amount') or 0) from_chain, to_chain = self._lookup_swap_direction(swap_id) + clearing_rate = self._lookup_swap_clearing_rate(swap_id) self.state_store.insert_swap_outcome( swap_id=swap_id, miner_hotkey=miner, @@ -728,6 +729,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None tao_amount=tao, from_chain=from_chain, to_chain=to_chain, + clearing_rate=clearing_rate, ) # The contract's apply_collateral_penalty deducts ``fee_amount`` # from collateral without emitting a CollateralWithdrawn event, @@ -835,6 +837,22 @@ def _lookup_swap_direction(self, swap_id: int) -> Tuple[str, str]: return '', '' return (swap.from_chain or '').lower(), (swap.to_chain or '').lower() + def _lookup_swap_clearing_rate(self, swap_id: int) -> float: + """Clearing rate (canonical TAO/BTC) for a just-completed swap, read + from the tracker's still-live Swap (resolve() runs after we record the + outcome). Snapshotted from the miner's commitment at initiation, so it's + the rate the swap actually cleared at. Returns 0.0 when unknown or + unparseable — excluded from the depth reference, same as a legacy row.""" + if self.swap_tracker is None: + return 0.0 + swap = self.swap_tracker.active.get(swap_id) + if swap is None: + return 0.0 + try: + return float(swap.rate) if swap.rate else 0.0 + except (TypeError, ValueError): + return 0.0 + def record_reservation_pin(self, block_num: int, miner: str, reserved_until: int) -> None: """Pin the miner's commitment as of the reservation block ``block_num``. @@ -874,19 +892,29 @@ def record_reservation_pin(self, block_num: int, miner: str, reserved_until: int f'{block_num} — no pin written, will fall back' ) return - self.state_store.upsert_reservation_pin( - ReservationPin( - miner_hotkey=miner, - reserve_block=block_num, - from_chain=commitment.from_chain, - to_chain=commitment.to_chain, - rate_str=commitment.rate_str, - counter_rate_str=commitment.counter_rate_str, - miner_from_address=commitment.from_address, - miner_to_address=commitment.to_address, - reserved_until=reserved_until, + # Backfill only: keep handle_swap_reserve's synchronous pin (the rate the + # quote was validated against), but key on reserved_until so a stale pin + # from a prior reservation is still overwritten. See PR #451. + existing = self.state_store.get_reservation_pin(miner) + if existing is None or existing.reserved_until != reserved_until: + self.state_store.upsert_reservation_pin( + ReservationPin( + miner_hotkey=miner, + reserve_block=block_num, + from_chain=commitment.from_chain, + to_chain=commitment.to_chain, + rate_str=commitment.rate_str, + counter_rate_str=commitment.counter_rate_str, + miner_from_address=commitment.from_address, + miner_to_address=commitment.to_address, + reserved_until=reserved_until, + ) + ) + else: + bt.logging.info( + f'EventWatcher: reserve-time pin already present for {miner[:8]} ' + f'at block {block_num} — preserving synchronous pin (not overwriting)' ) - ) # Emit pin lifecycle events into the scoring overlay. The reservation # locks in BOTH offered directions for this miner (the contract takes # the miner offline for any new swap until this reservation resolves), @@ -981,6 +1009,14 @@ def _emit_reservation_pin_ends(self, block_num: int, miner: str) -> None: rate=0.0, ) + def expire_stale_reservation_pins(self) -> int: + """Close crown pins for reservations that lapsed without a swap (no + contract event fires on natural expiry), then purge them. End emitted at + ``reserved_until + 1`` so crown stops at the last live block. Returns rows purged.""" + for pin in self.state_store.get_expired_reservation_pins(): + self._emit_reservation_pin_ends(pin.reserved_until + 1, pin.miner_hotkey) + return self.state_store.purge_expired_reservation_pins() + def record_active_transition(self, block_num: int, hotkey: str, active: bool) -> None: """Apply an on-chain active-flag transition to both the current-state snapshot and the historical event log. A no-op if the flag already diff --git a/allways/validator/forward.py b/allways/validator/forward.py index de2d0c2..49d15e4 100644 --- a/allways/validator/forward.py +++ b/allways/validator/forward.py @@ -80,7 +80,10 @@ async def forward(self: Validator) -> None: f'forward: purged {purged} expired pending_confirms (reservations elapsed without tx confirmation)' ) - purged_pins = self.state_store.purge_expired_reservation_pins() + # Emits crown pin-end events for expired reservations *before* purging, so a + # reservation that lapses without a swap can't keep earning crown at its + # pinned rate. Runs before scoring in this same forward pass. + purged_pins = self.event_watcher.expire_stale_reservation_pins() if purged_pins: bt.logging.info(f'forward: purged {purged_pins} expired reservation_pins (reservations elapsed without a swap)') diff --git a/allways/validator/scoring.py b/allways/validator/scoring.py index 0e34a6c..8786b98 100644 --- a/allways/validator/scoring.py +++ b/allways/validator/scoring.py @@ -1,15 +1,22 @@ """Crown-time scoring pipeline. -Reward per miner is ``pool × crown_share × sr³ × ramp × capacity × -volume_factor``; the credibility ramp is applied linearly, not cubed. Any -shortfall recycles to ``RECYCLE_UID``. Entry point is +Reward per miner is ``pool × credibility × (λ·qvol_share + (1−λ)·crown_share × +capacity × quality)`` — a crown-dominant blend of *availability* (holding the +best rate over time, capacity- and depth-weighted) and *realized service* +(``qvol_share``: each miner's share of the network's quality-weighted swap +volume). ``λ = QVOL_REWARD_WEIGHT`` is intentionally small (crown-dominant); +shifting it toward volume is gated on wash filtering, since unfiltered volume is +sybil/wash-inflatable. ``credibility`` is the single reliability term (ramp + +timeout cliff). Any shortfall recycles to ``RECYCLE_UID``. Entry point is ``score_and_reward_miners(validator)``. """ from __future__ import annotations +import math from dataclasses import dataclass, field from enum import IntEnum +from functools import partial from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Set, Tuple import bittensor as bt @@ -22,10 +29,15 @@ CREDIBILITY_WINDOW_BLOCKS, DIRECTION_POOLS, MAX_SCORING_BACKFILL_BLOCKS, + QUALITY_ANCHOR, + QUALITY_EMA_HALF_LIFE_BLOCKS, + QUALITY_FLOOR, + QUALITY_N_MIN, + QUALITY_PER_MINER_CAP, + QUALITY_TRIM_PCT, + QVOL_REWARD_WEIGHT, RECYCLE_UID, SCORING_WINDOW_BLOCKS, - SUCCESS_EXPONENT, - VOLUME_WEIGHT_ALPHA, ) from allways.utils.rate import is_executable_rate, min_executable_tao_leg from allways.validator.event_watcher import ContractEventWatcher @@ -41,8 +53,10 @@ class DirectionTrace: pool: float = 0.0 crown_blocks: Dict[str, float] = field(default_factory=dict) cap_weighted_blocks: Dict[str, float] = field(default_factory=dict) + quality_weighted_blocks: Dict[str, float] = field(default_factory=dict) unfilled_blocks: int = 0 best_rate: float = 0.0 + quality_reference: Optional[float] = None def due_for_scoring(current_block: int, last_scored_block: int, initial_scoring_done: bool) -> bool: @@ -145,13 +159,17 @@ def prune_swap_outcomes(self: Validator) -> None: def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: - """Replay the crown-time event stream, derive per-miner rewards - (pool × crown_share × sr³ × ramp × capacity × volume_factor), recycle the rest. + """Replay the crown-time event stream, derive per-miner rewards as a + crown-dominant blend of availability and realized service, recycle the rest: - Volume weighting is *per direction*: a miner earning crown on btc→tao is - compared only to btc→tao volume on the network, not to the total of both - directions. Otherwise heavy tao→btc flow from other miners would dilute - a btc→tao earner's vol_share even though they own that direction.""" + reward = pool · credibility · (λ·qvol_share + (1−λ)·crown_share·cap·quality) + + where ``λ = QVOL_REWARD_WEIGHT`` (small — crown-dominant), ``qvol_share`` is + the miner's share of the network's quality-weighted swap volume, and the + crown term is availability (best-rate hold time) scaled by capacity and the + per-block depth quality. Both volume and quality are evaluated *per + direction*: a btc→tao earner is compared only to btc→tao flow, so heavy + tao→btc volume from others can't dilute their share.""" n_uids = self.metagraph.n.item() if n_uids == 0: return np.array([], dtype=np.float32), set() @@ -171,8 +189,7 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: unweighted_rewards = np.zeros(n_uids, dtype=np.float32) credibility_since = max(0, self.block - CREDIBILITY_WINDOW_BLOCKS) success_stats = self.state_store.get_success_rates_since(credibility_since) - success_rates = {hk: success_rate(success_stats.get(hk)) for hk in rewardable_hotkeys} - credibility_ramps = {hk: credibility_ramp(success_stats.get(hk)) for hk in rewardable_hotkeys} + credibilities = {hk: credibility(success_stats.get(hk)) for hk in rewardable_hotkeys} direction_traces: Dict[Tuple[str, str], DirectionTrace] = {} weighting_traces: Dict[str, WeightingTrace] = {} @@ -193,8 +210,10 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: min_swap_amount = 0 miner_volume_total: Dict[str, int] = {} miner_crown_total: Dict[str, float] = {} + miner_qvol_total: Dict[str, float] = {} network_volume_total: int = 0 network_crown_total: float = 0.0 + network_qvol_total: float = 0.0 for (from_chain, to_chain), pool in DIRECTION_POOLS.items(): trace = DirectionTrace(pool=pool) @@ -203,6 +222,13 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: if storage_enabled: intervals = [] intervals_by_dir[(from_chain, to_chain)] = intervals + # Per-direction depth reference: trimmed, volume/recency-weighted average + # of recent completed-swap clearing rates. None until enough swaps accrue + # (bootstrap), in which case the quality factor is a 1.0 no-op. Same 30-day + # lookback the table is pruned to; the EMA half-life down-weights old swaps. + clearing_obs = self.state_store.get_clearing_rates_by_direction_since(credibility_since, from_chain, to_chain) + quality_reference = compute_quality_reference(clearing_obs, window_end) + trace.quality_reference = quality_reference crown_blocks = replay_crown_time_window( store=self.state_store, event_watcher=self.event_watcher, @@ -215,68 +241,85 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: intervals_out=intervals, min_swap_rao=min_swap_amount, max_swap_rao=max_swap_amount, + quality_reference=quality_reference, ) total_crown_dir = sum(crown_blocks.values()) volumes_dir = self.state_store.get_volume_by_direction_since(window_start, from_chain, to_chain) total_volume_dir = sum(volumes_dir.values()) + # Quality-weighted realized volume over the scoring window: each + # completed swap's TAO leg scaled by how far its clearing rate beat the + # depth reference. This is the *service* half of the reward — what the + # miner actually moved, at what rate — distinct from crown availability. + canon_from, _ = canonical_pair(from_chain, to_chain) + lower_rate_wins = from_chain != canon_from + window_obs = self.state_store.get_clearing_rates_by_direction_since(window_start, from_chain, to_chain) + qvol_dir = quality_weighted_volume(window_obs, quality_reference, lower_rate_wins) + network_qvol_dir = math.fsum(qvol_dir.values()) for hk, v in volumes_dir.items(): miner_volume_total[hk] = miner_volume_total.get(hk, 0) + int(v) network_volume_total += int(total_volume_dir) for hk, blk in crown_blocks.items(): miner_crown_total[hk] = miner_crown_total.get(hk, 0.0) + blk network_crown_total += total_crown_dir + for hk, q in qvol_dir.items(): + miner_qvol_total[hk] = miner_qvol_total.get(hk, 0.0) + q + network_qvol_total += network_qvol_dir bt.logging.debug( - f'V1 scoring [{from_chain}→{to_chain}]: ' - f'total_crown={total_crown_dir:.1f} blk, total_volume_rao={total_volume_dir}' + f'V1 scoring [{from_chain}→{to_chain}]: total_crown={total_crown_dir:.1f} blk, ' + f'total_volume_rao={total_volume_dir}, network_qvol={network_qvol_dir:.1f}' ) - if total_crown_dir == 0: + has_crown = total_crown_dir > 0 + has_qvol = network_qvol_dir > 0 + if not has_crown and not has_qvol: continue # empty bucket — pool recycles via the remainder below - - for hotkey, blocks in crown_blocks.items(): + # Adaptive blend: λ splits the pool between realized volume and crown + # availability, but only when both have recipients. With no volume to + # reward (bootstrap), crown takes the full pool; with volume but no + # crown holder, volume takes it — so an empty component never silently + # recycles its share. + lam = QVOL_REWARD_WEIGHT if (has_crown and has_qvol) else (1.0 if has_qvol else 0.0) + + for hotkey in set(crown_blocks) | set(qvol_dir): uid = hotkey_to_uid.get(hotkey) if uid is None: continue # dereg'd mid-window; credit forfeited - # Capacity is integrated per-block during the replay, so the - # effective multiplier is the time-weighted average over the - # miner's crown intervals. Reading current collateral here - # would let a post-window top-up retroactively boost credit - # already earned (#409). - cap_blocks = trace.cap_weighted_blocks.get(hotkey, 0.0) - cap = (cap_blocks / blocks) if blocks > 0 else 0.0 + blocks = crown_blocks.get(hotkey, 0.0) + # Capacity + depth quality are integrated per-block during the + # replay, so each is the time-weighted average over the miner's + # crown intervals. Reading current collateral here would let a + # post-window top-up retroactively boost credit already earned + # (#409). Non-crown (volume-only) miners contribute no crown term. + cap = (trace.cap_weighted_blocks.get(hotkey, 0.0) / blocks) if blocks > 0 else 0.0 + quality = (trace.quality_weighted_blocks.get(hotkey, 0.0) / blocks) if blocks > 0 else 1.0 + crown_share_dir = (blocks / total_crown_dir) if total_crown_dir > 0 else 0.0 + qvol_share_dir = (qvol_dir.get(hotkey, 0.0) / network_qvol_dir) if network_qvol_dir > 0 else 0.0 + crown_term = crown_share_dir * cap * quality + blend = lam * qvol_share_dir + (1.0 - lam) * crown_term + cred = credibilities[hotkey] + # unweighted = pre-credibility blend, so the trace's effective + # multiplier reads back as the credibility applied. + unweighted_rewards[uid] += pool * blend + rewards[uid] += pool * cred * blend + wt = weighting_traces.setdefault(hotkey, WeightingTrace()) wt.record_capacity(factor=cap) + wt.record_quality(factor=quality) wt.record_credibility( closed_swaps=sum(success_stats.get(hotkey, (0, 0))), ramp_target=CREDIBILITY_RAMP_OBSERVATIONS, timed_out=success_stats.get(hotkey, (0, 0))[1], ) - crown_share_dir = blocks / total_crown_dir - vol_dir = volumes_dir.get(hotkey, 0) - vol_share_dir = (vol_dir / total_volume_dir) if total_volume_dir > 0 else 0.0 - vol_factor = volume_factor(vol_dir, total_volume_dir, crown_share_dir) - base = ( - pool * crown_share_dir * (success_rates[hotkey] ** SUCCESS_EXPONENT) * credibility_ramps[hotkey] * cap - ) - unweighted_rewards[uid] += base - rewards[uid] += base * vol_factor - if vol_factor < 1.0: - bt.logging.debug( - f'V1 scoring [{from_chain}→{to_chain}] {hotkey[:8]}: ' - f'crown_share={crown_share_dir:.3f} vol_share={vol_share_dir:.3f} ' - f'vol_factor={vol_factor:.3f}' - ) record_volume_traces( weighting_traces=weighting_traces, hotkey_to_uid=hotkey_to_uid, - rewards=rewards, - unweighted_rewards=unweighted_rewards, miner_volume_total=miner_volume_total, miner_crown_total=miner_crown_total, - network_volume_total=network_volume_total, + miner_qvol_total=miner_qvol_total, network_crown_total=network_crown_total, + network_qvol_total=network_qvol_total, ) recycle_uid = RECYCLE_UID if RECYCLE_UID < n_uids else 0 @@ -290,7 +333,7 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: window_end=window_end, direction_traces=direction_traces, rewards=rewards, - success_rates=success_rates, + credibilities=credibilities, distributed=distributed, recycled=recycled, weighting_traces=weighting_traces, @@ -332,65 +375,149 @@ def capacity_factor(collateral_rao: int, max_swap_amount_rao: int) -> float: return min(1.0, collateral_rao / max_swap_amount_rao) -def volume_factor( - vol_rao: int, - total_volume_rao: int, - crown_share: float, - alpha: float = VOLUME_WEIGHT_ALPHA, +def quality_weighted_volume( + observations: List[Tuple[float, int, int, str]], + reference: Optional[float], + lower_rate_wins: bool, +) -> Dict[str, float]: + """Per-miner quality-weighted realized volume for one direction. + + ``observations`` are ``(clearing_rate, tao_amount, resolved_block, + miner_hotkey)`` for completed swaps in the scoring window. Each swap's TAO + leg is scaled by ``quality_factor(clearing_rate)`` — so volume cleared + deeper than the reference counts full, volume at/below market counts only + the floor. During bootstrap (``reference is None``) quality is 1.0, so this + reduces to raw volume. This is the *service* signal: real TAO moved, at the + rate it actually cleared.""" + qvol: Dict[str, float] = {} + for rate, vol, _block, miner in observations: + if vol <= 0 or rate <= 0: + continue + qvol[miner] = qvol.get(miner, 0.0) + float(vol) * quality_factor(rate, reference, lower_rate_wins) + return qvol + + +def compute_quality_reference( + observations: List[Tuple[float, int, int, str]], + now_block: int, + *, + half_life_blocks: int = QUALITY_EMA_HALF_LIFE_BLOCKS, + trim_pct: float = QUALITY_TRIM_PCT, + per_miner_cap: float = QUALITY_PER_MINER_CAP, + n_min: int = QUALITY_N_MIN, +) -> Optional[float]: + """Per-direction "market" reference from completed-swap clearing rates. + + ``observations`` are ``(rate, tao_amount, resolved_block, miner_hotkey)``. + Trimmed (top/bottom ``trim_pct`` by rate), volume- and recency-weighted + (weight halves every ``half_life_blocks``), with each miner's summed weight + capped at ``per_miner_cap`` of the total. Returns ``None`` when fewer than + ``n_min`` observations survive the trim — caller treats that as "depth off" + (factor 1.0), matching the self-bootstrapping rollout. + + DETERMINISM: this feeds emissions → consensus, so every validator must get a + byte-identical result. We sort by a stable total-order key and sum with + ``math.fsum`` over that fixed order — never reduce over an unordered set. + ``now_block`` must be an on-chain block (window_end), never wall-clock. + """ + rows = [o for o in observations if o[0] > 0] + # Stable TOTAL order so trim drops identical rows on every validator: rate, + # then block, miner, volume. Two rows equal on all four are interchangeable. + rows.sort(key=lambda o: (o[0], o[2], o[3], o[1])) + n = len(rows) + drop = int(math.floor(n * trim_pct)) + trimmed = rows[drop : n - drop] if drop > 0 else rows + if len(trimmed) < n_min: + return None + + weights: List[float] = [] + by_miner: Dict[str, float] = {} + for rate, vol, block, miner in trimmed: + recency = 2.0 ** (-(now_block - block) / half_life_blocks) + w = max(0.0, float(vol)) * recency + weights.append(w) + by_miner[miner] = by_miner.get(miner, 0.0) + w + + total_w = math.fsum(weights) + if total_w <= 0: + return None + + # Per-miner cap: scale down any miner whose summed weight exceeds the cap so + # no single operator (or sybil cluster sharing a hotkey) defines the market. + cap_w = per_miner_cap * total_w + scale = {m: (cap_w / w if w > cap_w else 1.0) for m, w in by_miner.items()} + + capped = [w * scale[trimmed[i][3]] for i, w in enumerate(weights)] + denom = math.fsum(capped) + if denom <= 0: + return None + numer = math.fsum(capped[i] * trimmed[i][0] for i in range(len(trimmed))) + return numer / denom + + +def direction_aware_improvement(rate: float, reference: float, lower_rate_wins: bool) -> float: + """Fractional improvement of ``rate`` past the reference, signed so deeper + is positive in either direction. btc→tao: higher TAO/BTC is the better deal; + tao→btc (``lower_rate_wins``): lower TAO/BTC is. Returns 0 for a non-positive + reference.""" + if reference <= 0: + return 0.0 + return (reference - rate) / reference if lower_rate_wins else (rate - reference) / reference + + +def quality_factor( + rate: float, + reference: Optional[float], + lower_rate_wins: bool, + *, + anchor: float = QUALITY_ANCHOR, + floor: float = QUALITY_FLOOR, ) -> float: - """(1-α) + α·min(1, vol_share/crown_share). Idle network or no crown → 1.0 - (no penalty); cap kills any over-serve bonus.""" - if total_volume_rao <= 0 or crown_share <= 0: + """Depth multiplier in ``[floor, 1.0]``. At/below market → floor; at or past + ``anchor`` improvement → 1.0; linear between. ``reference is None`` (bootstrap) + → 1.0, so depth is a no-op until the reference exists.""" + if reference is None or anchor <= 0: return 1.0 - participation = min(1.0, (vol_rao / total_volume_rao) / crown_share) - return (1.0 - alpha) + alpha * participation + improvement = direction_aware_improvement(rate, reference, lower_rate_wins) + depth_ramp = min(1.0, max(0.0, improvement / anchor)) + return floor + (1.0 - floor) * depth_ramp def record_volume_traces( *, weighting_traces: Dict[str, WeightingTrace], hotkey_to_uid: Dict[str, int], - rewards: np.ndarray, - unweighted_rewards: np.ndarray, miner_volume_total: Dict[str, int], miner_crown_total: Dict[str, float], - network_volume_total: int, + miner_qvol_total: Dict[str, float], network_crown_total: float, + network_qvol_total: float, ) -> None: - """Populate the per-miner volume rows of the scoring log. Volume gating is - already applied inline in ``calculate_miner_rewards``; this only records - aggregate counters and the effective per-miner multiplier - (weighted / unweighted) for human-readable diagnosis.""" + """Record per-miner aggregate shares (crown + quality-weighted volume) and + raw volume for the scoring log. Shares are summed across both directions, + purely for human-readable diagnosis; the reward itself is computed per + direction inline in ``calculate_miner_rewards``.""" for hotkey, wt in weighting_traces.items(): - uid = hotkey_to_uid.get(hotkey) - if uid is None: + if hotkey_to_uid.get(hotkey) is None: continue - unweighted = float(unweighted_rewards[uid]) - weighted = float(rewards[uid]) - effective = (weighted / unweighted) if unweighted > 0 else 1.0 crown = miner_crown_total.get(hotkey, 0.0) crown_share = (crown / network_crown_total) if network_crown_total > 0 else 0.0 - wt.record_volume( - vol_rao=miner_volume_total.get(hotkey, 0), - total_volume_rao=network_volume_total, + qvol = miner_qvol_total.get(hotkey, 0.0) + qvol_share = (qvol / network_qvol_total) if network_qvol_total > 0 else 0.0 + wt.record_shares( + volume_rao=miner_volume_total.get(hotkey, 0), crown_share=crown_share, - factor=effective, + qvol_share=qvol_share, ) -def success_rate(stats: Optional[Tuple[int, int]]) -> float: - """Raw completed / closed ratio. Cubed in the reward. Zero observations → 0.""" - if not stats or stats == (0, 0): - return 0.0 - completed, timed_out = stats - return completed / (completed + timed_out) - - -def credibility_ramp(stats: Optional[Tuple[int, int]]) -> float: - """Linear ramp to full credibility at CREDIBILITY_RAMP_OBSERVATIONS closed - swaps. Applied linearly to the reward, not cubed. Zero observations → 0. - More than CREDIBILITY_MAX_TIMEOUTS timed-out swaps in the window hard-zeros - credibility — a rolling penalty that recovers as old timeouts age out.""" +def credibility(stats: Optional[Tuple[int, int]]) -> float: + """The single reliability term (replaces the old success_rate³ × ramp pair). + Linear ramp to full at CREDIBILITY_RAMP_OBSERVATIONS closed swaps; zero + observations → 0. More than CREDIBILITY_MAX_TIMEOUTS timed-out swaps in the + window hard-zeros it — a rolling penalty that recovers as old timeouts age + out. Note the cliff is an absolute count, so it's volume-unaware; scaling it + with volume is a follow-up once volume becomes a larger reward share.""" if not stats or stats == (0, 0): return 0.0 _completed, timed_out = stats @@ -512,6 +639,39 @@ def merge_replay_events( return events +def crown_can_fund(hotkey, rate, from_chain, to_chain, min_swap_rao, max_swap_rao, collaterals): + """Boundary-squat gate: a miner whose own rate forces a TAO leg larger than + their collateral earns no crown. Fail open on unknown collateral (absent != + zero) so a missing baseline doesn't silently drop them.""" + if hotkey not in collaterals: + return True + min_leg = min_executable_tao_leg(rate, from_chain, to_chain, min_swap_rao, max_swap_rao) + return min_leg == 0 or collaterals[hotkey] >= min_leg + + +def make_crown_predicates(from_chain, to_chain, min_swap_rao, max_swap_rao, collaterals): + """Crown-eligibility predicates ``(executable_check, can_fund)`` shared by the + scoring replay and the live snapshot, so the live crown view can never diverge + from the rewarded ledger. Both are the shared rate utils with this direction's + bounds/collateral bound in.""" + executable_check = partial( + is_executable_rate, + from_chain=from_chain, + to_chain=to_chain, + min_swap_rao=min_swap_rao, + max_swap_rao=max_swap_rao, + ) + can_fund = partial( + crown_can_fund, + from_chain=from_chain, + to_chain=to_chain, + min_swap_rao=min_swap_rao, + max_swap_rao=max_swap_rao, + collaterals=collaterals, + ) + return executable_check, can_fund + + def replay_crown_time_window( store: ValidatorStateStore, event_watcher: ContractEventWatcher, @@ -524,6 +684,7 @@ def replay_crown_time_window( intervals_out: Optional[List[Tuple[int, int, List[str], float]]] = None, min_swap_rao: int = 0, max_swap_rao: int = 0, + quality_reference: Optional[float] = None, ) -> Dict[str, float]: """Walk the merged event stream, return ``{hotkey: crown_blocks_float}``. Ties at the same rate split credit evenly. A miner qualifies for crown @@ -555,11 +716,11 @@ def replay_crown_time_window( canon_from, _ = canonical_pair(from_chain, to_chain) lower_rate_wins = from_chain != canon_from - def executable_check(rate: float) -> bool: - return is_executable_rate(rate, from_chain, to_chain, min_swap_rao, max_swap_rao) + executable_check, can_fund = make_crown_predicates(from_chain, to_chain, min_swap_rao, max_swap_rao, collaterals) crown_blocks: Dict[str, float] = {} cap_weighted_blocks: Dict[str, float] = {} + quality_weighted_blocks: Dict[str, float] = {} prev_block = window_start def effective_rates() -> Dict[str, float]: @@ -575,19 +736,6 @@ def effective_rates() -> Dict[str, float]: bounds_set = min_swap_rao > 0 or max_swap_rao > 0 - def can_fund(hotkey: str, rate: float) -> bool: - # Boundary-squat per-block gate: a miner whose own rate forces a TAO - # leg larger than their collateral_at_block earns no crown for that - # block. Cascades to the next-best rate via crown_holders_at_instant. - # Fail open when collateral is *unknown* (no event ever recorded): - # absent != zero. The contract auto-deactivates anyone below - # min_collateral, so an active miner always holds enough; treating a - # missing baseline as 0 would silently drop them from crown. - if hotkey not in collaterals: - return True - min_leg = min_executable_tao_leg(rate, from_chain, to_chain, min_swap_rao, max_swap_rao) - return min_leg == 0 or collaterals[hotkey] >= min_leg - def credit_interval(interval_start: int, interval_end: int) -> None: duration = interval_end - interval_start if duration <= 0: @@ -613,12 +761,16 @@ def credit_interval(interval_start: int, interval_end: int) -> None: if intervals_out is not None: intervals_out.append((interval_start, interval_end, list(holders), winner_rate)) split = duration / len(holders) + # All holders are tied at winner_rate, so the depth factor is shared. + # None reference (bootstrap) → 1.0, leaving credit unchanged. + qf = quality_factor(winner_rate, quality_reference, lower_rate_wins) for hk in holders: crown_blocks[hk] = crown_blocks.get(hk, 0.0) + split # Unknown collateral (no event recorded) → capacity 1.0, matching # can_fund's fail-open. Only a known value scales capacity down. cap = capacity_factor(collaterals[hk], max_swap_rao) if hk in collaterals else 1.0 cap_weighted_blocks[hk] = cap_weighted_blocks.get(hk, 0.0) + split * cap + quality_weighted_blocks[hk] = quality_weighted_blocks.get(hk, 0.0) + split * qf def apply_event(event: ReplayEvent) -> None: if event.kind is EventKind.RATE: @@ -651,6 +803,7 @@ def apply_event(event: ReplayEvent) -> None: if trace is not None: trace.crown_blocks = dict(crown_blocks) trace.cap_weighted_blocks = dict(cap_weighted_blocks) + trace.quality_weighted_blocks = dict(quality_weighted_blocks) return crown_blocks @@ -706,20 +859,12 @@ def snapshot_current_crown_holders( if pinned_rates: rates = {**rates, **pinned_rates} - def executable_check(rate: float, from_chain=from_chain, to_chain=to_chain) -> bool: - return is_executable_rate(rate, from_chain, to_chain, min_swap_amount, max_swap_amount) - - def can_fund( - hotkey: str, rate: float, from_chain=from_chain, to_chain=to_chain, collaterals=collaterals - ) -> bool: - # Mirror the scoring path's boundary-squat gate so the live table - # never credits a holder whose collateral can't fund their own - # smallest legal leg, which the ledger drops. Fail open on unknown - # collateral (absent != zero) to match the scoring path exactly. - if hotkey not in collaterals: - return True - min_leg = min_executable_tao_leg(rate, from_chain, to_chain, min_swap_amount, max_swap_amount) - return min_leg == 0 or collaterals[hotkey] >= min_leg + # Same predicates the scoring replay uses, so the live table never + # credits a holder the ledger drops. Built per direction so each + # closure captures the right chain pair. + executable_check, can_fund = make_crown_predicates( + from_chain, to_chain, min_swap_amount, max_swap_amount, collaterals + ) holders = crown_holders_at_instant( rates, diff --git a/allways/validator/scoring_trace.py b/allways/validator/scoring_trace.py index fa5c36d..e93ebbb 100644 --- a/allways/validator/scoring_trace.py +++ b/allways/validator/scoring_trace.py @@ -39,28 +39,28 @@ class WeightingTrace: capacity_factor: float = 1.0 volume_rao: int = 0 crown_share: float = 0.0 - volume_share: float = 0.0 - participation: float = 1.0 - volume_factor: float = 1.0 + qvol_share: float = 0.0 closed_swaps: int = 0 - credibility_ramp: float = 0.0 + credibility: float = 0.0 + quality_factor: float = 1.0 def record_capacity(self, factor: float) -> None: self.capacity_factor = factor - def record_volume(self, vol_rao: int, total_volume_rao: int, crown_share: float, factor: float) -> None: - self.volume_rao = vol_rao + def record_quality(self, factor: float) -> None: + self.quality_factor = factor + + def record_shares(self, volume_rao: int, crown_share: float, qvol_share: float) -> None: + self.volume_rao = volume_rao self.crown_share = crown_share - self.volume_share = (vol_rao / total_volume_rao) if total_volume_rao > 0 else 0.0 - self.participation = min(1.0, self.volume_share / crown_share) if crown_share > 0 else 1.0 - self.volume_factor = factor + self.qvol_share = qvol_share def record_credibility(self, closed_swaps: int, ramp_target: int, timed_out: int = 0) -> None: self.closed_swaps = closed_swaps if timed_out > CREDIBILITY_MAX_TIMEOUTS: - self.credibility_ramp = 0.0 + self.credibility = 0.0 else: - self.credibility_ramp = min(1.0, closed_swaps / ramp_target) if ramp_target > 0 else 1.0 + self.credibility = min(1.0, closed_swaps / ramp_target) if ramp_target > 0 else 1.0 def log_scoring_trace( @@ -70,7 +70,7 @@ def log_scoring_trace( window_end: int, direction_traces: Dict[Tuple[str, str], DirectionTrace], rewards: np.ndarray, - success_rates: Dict[str, float], + credibilities: Dict[str, float], distributed: float, recycled: float, weighting_traces: Optional[Dict[str, 'WeightingTrace']] = None, @@ -102,18 +102,19 @@ def log_scoring_trace( if uid == recycle_uid and crown_blk == 0: continue crown_reward = float(rewards[uid]) - (recycled if uid == recycle_uid else 0.0) - sr = success_rates.get(hk, 0.0) + cred = credibilities.get(hk, 0.0) wt = weighting_traces.get(hk) extras = '' if wt is not None: extras = ( - f' ({wt.closed_swaps}/{CREDIBILITY_RAMP_OBSERVATIONS} closed, ramp={wt.credibility_ramp:.2f})' + f' ({wt.closed_swaps}/{CREDIBILITY_RAMP_OBSERVATIONS} closed)' f' cap={wt.capacity_factor:.2f}' - f' vol={wt.volume_rao / TAO_TO_RAO:g}t vol_share={wt.volume_share:.2f}' - f' crown_share={wt.crown_share:.2f} vol_f={wt.volume_factor:.2f}' + f' vol={wt.volume_rao / TAO_TO_RAO:g}t' + f' crown_share={wt.crown_share:.2f} qvol_share={wt.qvol_share:.2f}' + f' quality_f={wt.quality_factor:.2f}' ) lines.append( - f' uid={uid} hotkey={hk[:8]}.. crown_blk={crown_blk:.0f} sr={sr:.3f}{extras} reward={crown_reward:.3f}' + f' uid={uid} hotkey={hk[:8]}.. crown_blk={crown_blk:.0f} cred={cred:.3f}{extras} reward={crown_reward:.3f}' ) # Collateral as-of window_start mirrors the scoring replay's starting @@ -127,7 +128,7 @@ def log_scoring_trace( window_start, window_end, rewards, - success_rates, + credibilities, direction_traces, recycle_uid, collaterals, @@ -153,7 +154,7 @@ def non_earner_lines( window_start: int, window_end: int, rewards: np.ndarray, - success_rates: Dict[str, float], + credibilities: Dict[str, float], direction_traces: Dict[Tuple[str, str], DirectionTrace], recycle_uid: int, collaterals: Optional[Dict[str, int]] = None, @@ -178,11 +179,11 @@ def non_earner_lines( latest_rates = rates_by_hotkey.get(hk, {}) if not latest_rates and hk not in ever_active: continue - sr = success_rates.get(hk, 1.0) + cred = credibilities.get(hk, 1.0) reason = diagnose_non_earner( - hk, latest_rates, sr, ever_active, direction_traces, collaterals, min_swap_rao, max_swap_rao + hk, latest_rates, cred, ever_active, direction_traces, collaterals, min_swap_rao, max_swap_rao ) - out.append(f' uid={uid} hotkey={hk[:8]}.. crown_blk=0 reason="{reason}" sr={sr:.3f}') + out.append(f' uid={uid} hotkey={hk[:8]}.. crown_blk=0 reason="{reason}" cred={cred:.3f}') if len(out) >= NON_EARNER_LINE_CAP: break return out @@ -191,7 +192,7 @@ def non_earner_lines( def diagnose_non_earner( hotkey: str, latest_rates: Dict[Tuple[str, str], float], - sr: float, + cred: float, ever_active: Set[str], direction_traces: Dict[Tuple[str, str], DirectionTrace], collaterals: Optional[Dict[str, int]] = None, @@ -208,8 +209,8 @@ def diagnose_non_earner( return 'no_rate_posted' if hotkey not in ever_active: return 'not_active_during_window' - if sr <= 0: - return 'credibility_zero' # zero observations OR all-timeout history + if cred <= 0: + return 'credibility_zero' # zero observations OR timeout cliff tripped outbid_parts: List[str] = [] for (from_c, to_c), own in latest_rates.items(): diff --git a/allways/validator/state_store.py b/allways/validator/state_store.py index 970b081..d4c1e3b 100644 --- a/allways/validator/state_store.py +++ b/allways/validator/state_store.py @@ -267,6 +267,22 @@ def update_reservation_pin_reserved_until(self, miner_hotkey: str, reserved_unti (reserved_until, miner_hotkey), ) + def get_expired_reservation_pins(self) -> List[ReservationPin]: + """Pins whose reservation has lapsed as of the current block. + + Read before ``purge_expired_reservation_pins`` so the caller can emit a + scoring pin-end event per expired pin — otherwise the crown overlay's + 'start' outlives the on-chain reservation and keeps earning crown at the + pinned rate after expiry. + """ + if self.current_block_fn is None: + return [] + rows = self._fetchall( + 'SELECT * FROM reservation_pins WHERE reserved_until < ?', + (self.current_block_fn(),), + ) + return [self.row_to_reservation_pin(row) for row in rows] + def purge_expired_reservation_pins(self) -> int: """Drop pins whose reservation has already expired.""" if self.current_block_fn is None: @@ -498,16 +514,18 @@ def insert_swap_outcome( tao_amount: int = 0, from_chain: str = '', to_chain: str = '', + clearing_rate: float = 0.0, ) -> None: # Direction is normalized to lowercase on write so the per-direction # volume query is robust to upstream case drift. SQLite text # comparisons are case-sensitive and DIRECTION_POOLS keys are - # lowercase. + # lowercase. ``clearing_rate`` (canonical TAO/BTC) feeds the depth + # reference; 0.0 means "no usable observation" (timed-out or legacy). self._execute( """ INSERT OR REPLACE INTO swap_outcomes - (swap_id, miner_hotkey, completed, resolved_block, tao_amount, from_chain, to_chain) - VALUES (?, ?, ?, ?, ?, ?, ?) + (swap_id, miner_hotkey, completed, resolved_block, tao_amount, from_chain, to_chain, clearing_rate) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( swap_id, @@ -517,6 +535,7 @@ def insert_swap_outcome( int(tao_amount or 0), (from_chain or '').lower(), (to_chain or '').lower(), + float(clearing_rate or 0.0), ), ) @@ -576,6 +595,32 @@ def get_volume_by_direction_since(self, since_block: int, from_chain: str, to_ch ) return {r['miner_hotkey']: int(r['total'] or 0) for r in rows} + def get_clearing_rates_by_direction_since( + self, since_block: int, from_chain: str, to_chain: str + ) -> List[Tuple[float, int, int, str]]: + """Completed-swap clearing rates for one direction, for the depth + reference. Each row is ``(clearing_rate, tao_amount, resolved_block, + miner_hotkey)``. Rows with ``clearing_rate = 0`` (timed-out swaps, + pre-migration legacy) are excluded — they carry no usable rate, same + as the volume query excludes ``tao_amount = 0``. Direction is + lowercased to match ``insert_swap_outcome``.""" + rows = self._fetchall( + """ + SELECT clearing_rate, tao_amount, resolved_block, miner_hotkey + FROM swap_outcomes + WHERE resolved_block >= ? + AND completed = 1 + AND clearing_rate > 0 + AND from_chain = ? + AND to_chain = ? + """, + (since_block, (from_chain or '').lower(), (to_chain or '').lower()), + ) + return [ + (float(r['clearing_rate']), int(r['tao_amount'] or 0), int(r['resolved_block']), r['miner_hotkey']) + for r in rows + ] + def prune_swap_outcomes_older_than(self, cutoff_block: int) -> None: if cutoff_block <= 0: return @@ -877,7 +922,8 @@ def init_db(self) -> None: resolved_block INTEGER NOT NULL, tao_amount INTEGER NOT NULL DEFAULT 0, from_chain TEXT NOT NULL DEFAULT '', - to_chain TEXT NOT NULL DEFAULT '' + to_chain TEXT NOT NULL DEFAULT '', + clearing_rate REAL NOT NULL DEFAULT 0.0 ); CREATE INDEX IF NOT EXISTS idx_swap_outcomes_hotkey ON swap_outcomes(miner_hotkey); @@ -976,6 +1022,7 @@ def init_db(self) -> None: ('swap_outcomes', 'tao_amount', 'INTEGER NOT NULL DEFAULT 0'), ('swap_outcomes', 'from_chain', "TEXT NOT NULL DEFAULT ''"), ('swap_outcomes', 'to_chain', "TEXT NOT NULL DEFAULT ''"), + ('swap_outcomes', 'clearing_rate', 'REAL NOT NULL DEFAULT 0.0'), ): try: conn.execute(f'ALTER TABLE {table} ADD COLUMN {column} {ddl}') diff --git a/pyproject.toml b/pyproject.toml index 53b65de..33766a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,6 @@ dependencies = [ "rich", "embit", "base58", - "bech32", "pycryptodome", "numpy", "psycopg[binary]==3.3.4", diff --git a/tests/test_axon_handlers.py b/tests/test_axon_handlers.py index e460da5..fa98f22 100644 --- a/tests/test_axon_handlers.py +++ b/tests/test_axon_handlers.py @@ -588,6 +588,7 @@ def make_reserve_validator( validator.bounds_cache.min_collateral.return_value = 0 validator.bounds_cache.min_swap_amount.return_value = 0 validator.bounds_cache.max_swap_amount.return_value = 0 + validator.bounds_cache.halted.return_value = False validator.wallet = MagicMock() return validator @@ -889,6 +890,87 @@ def test_handle_swap_reserve_rejects_sentinel_rate(self): validator.axon_contract_client.vote_reserve.assert_not_called() +class TestSourceBalanceLock: + """The source-balance check must serialise on axon_lock for a substrate + source (TAO) but stay lock-free for an HTTP source (BTC) — otherwise the + TAO get_balance races the lock-protected readers and trips the substrate + `cannot call recv while another thread is already running recv` error.""" + + def test_provider_uses_substrate_flags(self): + """TAO provider hits the shared websocket; BTC is HTTP and lock-free.""" + from allways.chain_providers.base import ChainProvider + from allways.chain_providers.bitcoin import BitcoinProvider + from allways.chain_providers.subtensor import SubtensorProvider + + assert ChainProvider.uses_substrate is False + assert SubtensorProvider.uses_substrate is True + assert BitcoinProvider.uses_substrate is False + + def test_tao_source_balance_check_holds_axon_lock(self): + """A TAO-sourced reserve must acquire axon_lock around get_balance.""" + from allways.utils.rate import derive_tao_leg + from allways.validator.axon_handlers import recompute_reserve_amounts + + validator = make_reserve_validator() + lock = validator.axon_lock + + tao = MagicMock() + tao.uses_substrate = True + tao.verify_from_proof.return_value = True + # Record whether the lock is held at the moment get_balance runs. + held = {} + + def _get_balance(_addr): + held['locked'] = not lock.acquire(blocking=False) + if not held['locked']: + lock.release() + return 10**18 + + tao.get_balance.side_effect = _get_balance + validator.axon_chain_providers = {'tao': tao, 'btc': MagicMock()} + + # The balance lookup now runs after the quote checks, so the request must + # carry a self-consistent quote to reach it. Derive the amounts from the + # same functions the handler uses so it passes exactly. + commitment = make_commitment(from_chain='tao', to_chain='btc') + from_amount = _RESERVE_TAO_AMOUNT + to_amount = recompute_reserve_amounts(commitment, 'tao', 'btc', from_amount) + tao_amount = derive_tao_leg('tao', from_amount, 'btc', to_amount) + synapse = make_reserve_synapse( + from_chain='tao', + to_chain='btc', + from_address='5user', + from_amount=from_amount, + to_amount=to_amount, + tao_amount=tao_amount, + ) + run_reserve_handler(validator, synapse, commitment=commitment) + + assert held.get('locked') is True + + def test_btc_source_balance_check_is_lock_free(self): + """A BTC-sourced reserve must NOT hold axon_lock during get_balance, so + a slow Esplora call can't stall the lock-protected forward loop.""" + validator = make_reserve_validator() + lock = validator.axon_lock + + btc = validator.axon_chain_providers['btc'] + btc.uses_substrate = False + held = {} + + def _get_balance(_addr): + held['locked'] = not lock.acquire(blocking=False) + if not held['locked']: + lock.release() + return 10**18 + + btc.get_balance.side_effect = _get_balance + + run_reserve_handler(validator, make_reserve_synapse()) + + assert held.get('locked') is False + + class TestMinerActivateExecutability: def _activate_synapse( self, hotkey: str = '5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY' @@ -929,3 +1011,22 @@ def test_handle_miner_activate_rejects_sentinel_commitment(self): assert mock_read.call_args.kwargs['min_swap_rao'] == 500_000_000 assert mock_read.call_args.kwargs['max_swap_rao'] == 5_000_000_000 validator.axon_contract_client.vote_activate.assert_not_called() + + +class TestHaltFastReject: + """A halted system rejects reservations without submitting any extrinsic.""" + + def test_halted_rejects_without_voting(self): + validator = make_reserve_validator() + validator.bounds_cache.halted.return_value = True + result = run_reserve_handler(validator, make_reserve_synapse()) + assert result.accepted is False + assert 'halt' in (result.rejection_reason or '').lower() + validator.axon_contract_client.vote_reserve.assert_not_called() + + def test_halted_short_circuits_before_substrate_work(self): + validator = make_reserve_validator() + validator.bounds_cache.halted.return_value = True + with patch('allways.validator.axon_handlers.read_miner_commitment') as read_cmt: + asyncio.run(handle_swap_reserve(validator, make_reserve_synapse())) + read_cmt.assert_not_called() diff --git a/tests/test_bitcoin_signing.py b/tests/test_bitcoin_signing.py index e174f42..c8f0190 100644 --- a/tests/test_bitcoin_signing.py +++ b/tests/test_bitcoin_signing.py @@ -13,6 +13,7 @@ ADDR_TYPE_P2WPKH, BitcoinProvider, detect_address_type, + to_mainnet_address, ) # Known test WIF (compressed) @@ -271,3 +272,84 @@ def test_fresh_provider_starts_with_empty_set(self): consumed tx hash can't leak across processes.""" provider = make_lightweight_provider() assert provider.broadcasted_txids == set() + + +class TestIsValidAddress: + """BTC address format validation — must accept every standard type + (legacy, segwit v0, and Taproot/bech32m) and reject malformed input. + + Taproot regression guard: the old bech32-only check rejected all `bc1p…` + addresses, blocking TAO->BTC payouts to Taproot wallets (issue #448). + """ + + # mainnet / testnet / regtest × P2PKH / P2SH / P2WPKH / P2WSH / P2TR + VALID = [ + '1BvBMSEYstWetqTFn5Au4m4GFg7xJaNVN2', # mainnet P2PKH + '3J98t1WpEZ73CNmQviecrnyiWrnqRhWNLy', # mainnet P2SH + 'bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4', # mainnet P2WPKH + 'bc1qrp33g0q5c5txsp9arysrx4k6zdkfs4nce4xj0gdcccefvpysxf3qccfmv3', # mainnet P2WSH + 'bc1p5cyxnuxmeuwuvkwfem96lqzszd02n6xdcjrs20cac6yqjjwudpxqkedrcr', # mainnet P2TR + 'mipcBbFg9gMiCh81Kj8tqqdgoZub1ZJRfn', # testnet P2PKH + '2MzQwSSnBHWHqSAqtTVQ6v47XtaisrJa1Vc', # testnet P2SH + 'tb1qw508d6qejxtdg4y5r3zarvary0c5xw7kxpjzsx', # testnet P2WPKH + 'tb1pqqqqp399et2xygdj5xreqhjjvcmzhxw4aywxecjdzew6hylgvsesf3hn0c', # testnet P2TR (BIP-350) + 'bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080', # regtest P2WPKH + 'bcrt1p0xlxvlhemja6c4dqv22uapctqupfhlxm9h8z3k2e72q4k9hcz7vqc8gma6', # regtest P2TR + ] + + INVALID = [ + '', + 'notanaddress', + '0x71C7656EC7ab88b098defB751B7401B5f6d8976F', # ETH address + 'bc1p5cyxnuxmeuwuvkwfem96lqzszd02n6xdcjrs20cac6yqjjwudpxqkedrXXX', # bad checksum + 'bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3XX', # corrupted v0 + ] + + def test_accepts_all_standard_types(self): + provider = make_lightweight_provider() + for addr in self.VALID: + assert provider.is_valid_address(addr), f'should accept {addr}' + + def test_accepts_taproot(self): + """Explicit #448 guard: Taproot payout addresses must validate.""" + provider = make_lightweight_provider() + assert provider.is_valid_address('bc1p5cyxnuxmeuwuvkwfem96lqzszd02n6xdcjrs20cac6yqjjwudpxqkedrcr') + + def test_rejects_malformed(self): + provider = make_lightweight_provider() + for addr in self.INVALID: + assert not provider.is_valid_address(addr), f'should reject {addr!r}' + + def test_rejects_non_string(self): + provider = make_lightweight_provider() + assert not provider.is_valid_address(None) + + +class TestToMainnetAddress: + """testnet/regtest -> mainnet re-encoding used by the BIP-137 verify path. + Conversions must be byte-identical to the prior bech32/base58 behavior.""" + + def test_testnet_p2pkh(self): + assert to_mainnet_address('mipcBbFg9gMiCh81Kj8tqqdgoZub1ZJRfn') == '14JetYAhLevTRaePcAAX1vRMwaJt2QGRzp' + + def test_testnet_p2sh(self): + assert to_mainnet_address('2MzQwSSnBHWHqSAqtTVQ6v47XtaisrJa1Vc') == '38rjNhr9g3nVEPDLnMnEJ78GgEWi6yM4He' + + def test_testnet_p2wpkh(self): + assert ( + to_mainnet_address('tb1qw508d6qejxtdg4y5r3zarvary0c5xw7kxpjzsx') + == 'bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4' + ) + + def test_regtest_p2wpkh(self): + assert ( + to_mainnet_address('bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080') + == 'bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4' + ) + + def test_mainnet_passthrough(self): + addr = 'bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4' + assert to_mainnet_address(addr) == addr + + def test_unparseable_returns_unchanged(self): + assert to_mainnet_address('notanaddress') == 'notanaddress' diff --git a/tests/test_event_watcher.py b/tests/test_event_watcher.py index 3ce5a0b..0c1b1ba 100644 --- a/tests/test_event_watcher.py +++ b/tests/test_event_watcher.py @@ -430,6 +430,83 @@ def test_miner_reserved_writes_expected_pin(self, tmp_path: Path): assert pin.miner_to_address == '5miner' w.state_store.close() + def test_existing_synchronous_pin_is_not_overwritten(self, tmp_path: Path): + """The reserve handler's synchronous pin captures the rate the user's + quote was validated against and is authoritative. A later MinerReserved + re-read at the inclusion block must NOT clobber it, even when the miner + moved its commitment in between — that divergence short-changed the user + in swap 2405 (reserved at 370, overwritten to 280).""" + from allways.validator.state_store import ReservationPin + + w = make_watcher(tmp_path, netuid=2, subtensor=MagicMock()) + # Synchronous pin as handle_swap_reserve writes it at quote time. + w.state_store.upsert_reservation_pin( + ReservationPin( + miner_hotkey='hk_a', + reserve_block=898, + from_chain='btc', + to_chain='tao', + rate_str='370', + counter_rate_str='0.0029', + miner_from_address='bc1-miner', + miner_to_address='5miner', + reserved_until=1000, + ) + ) + # The inclusion-block read sees the miner's oscillated-down rate. + moved = make_pinned_commitment(rate_str='280') + with patch( + 'allways.validator.event_watcher.read_miner_commitment', + return_value=moved, + ): + w.apply_event(900, 'MinerReserved', {'miner': 'hk_a', 'reserved_until': 1000}) + + pin = w.state_store.get_reservation_pin('hk_a') + assert pin.rate_str == '370' # preserved — not overwritten with 280 + assert pin.reserve_block == 898 + w.state_store.close() + + def test_stale_prior_reservation_pin_is_backfilled(self, tmp_path: Path): + """A pin left over from a PRIOR reservation (different reserved_until, + e.g. one abandoned without a swap and not yet purged) must not be + inherited by the current reservation. The guard keys on reserved_until, + so a MinerReserved for a new reservation backfills over the stale pin — + otherwise a failed synchronous write or a fresh-DB replay would settle + the new swap against the old reservation's rate and addresses.""" + from allways.validator.state_store import ReservationPin + + w = make_watcher(tmp_path, netuid=2, subtensor=MagicMock()) + # Stale pin from a prior, abandoned reservation (reserved_until=1000). + w.state_store.upsert_reservation_pin( + ReservationPin( + miner_hotkey='hk_a', + reserve_block=898, + from_chain='btc', + to_chain='tao', + rate_str='370', + counter_rate_str='0.0029', + miner_from_address='bc1-OLD', + miner_to_address='5old', + reserved_until=1000, + ) + ) + # A new reservation (reserved_until=1200) with no synchronous pin — + # the inclusion-block read is the only source, so it must write through. + fresh = make_pinned_commitment(rate_str='345', from_address='bc1-NEW', to_address='5new') + with patch( + 'allways.validator.event_watcher.read_miner_commitment', + return_value=fresh, + ): + w.apply_event(1100, 'MinerReserved', {'miner': 'hk_a', 'reserved_until': 1200}) + + pin = w.state_store.get_reservation_pin('hk_a') + assert pin.reserved_until == 1200 # current reservation, not the stale one + assert pin.rate_str == '345' # backfilled — stale 370 overwritten + assert pin.reserve_block == 1100 + assert pin.miner_from_address == 'bc1-NEW' + assert pin.miner_to_address == '5new' + w.state_store.close() + def test_commitment_read_raising_writes_no_pin(self, tmp_path: Path): """A transient RPC error or a pruned block must not write a pin and must not let the exception escape apply_event.""" @@ -490,6 +567,66 @@ def test_swap_timed_out_clears_pin(self, tmp_path: Path): assert w.state_store.get_reservation_pin('hk_a') is None w.state_store.close() + def test_expired_reservation_emits_pin_end_and_purges(self, tmp_path: Path): + """A reservation that lapses without a swap emits no contract event, so + nothing closes the crown pin. ``expire_stale_reservation_pins`` must + emit an 'end' (at reserved_until + 1) for every open direction and purge + the synchronous pin row — otherwise the miner keeps earning crown at the + pinned rate with no live reservation.""" + w = make_watcher(tmp_path, netuid=2, subtensor=MagicMock()) + w.state_store.current_block_fn = lambda: 1001 # past reserved_until=1000 + with patch( + 'allways.validator.event_watcher.read_miner_commitment', + return_value=make_pinned_commitment(), + ): + w.apply_event(900, 'MinerReserved', {'miner': 'hk_a', 'reserved_until': 1000}) + + purged = w.expire_stale_reservation_pins() + + assert purged == 1 + assert w.state_store.get_reservation_pin('hk_a') is None + ends = [ev for ev in w.reservation_pin_events if ev.hotkey == 'hk_a' and ev.kind == 'end'] + # One 'end' per pinned direction (primary btc→tao + counter tao→btc), + # at reserved_until + 1 so crown is credited through the last live block. + assert {(ev.from_chain, ev.to_chain) for ev in ends} == {('btc', 'tao'), ('tao', 'btc')} + assert all(ev.block_num == 1001 for ev in ends) + w.state_store.close() + + def test_live_reservation_not_closed_by_expiry_sweep(self, tmp_path: Path): + """A reservation whose TTL is still in the future must be left alone — + no pin-end, no purge.""" + w = make_watcher(tmp_path, netuid=2, subtensor=MagicMock()) + w.state_store.current_block_fn = lambda: 950 # before reserved_until=1000 + with patch( + 'allways.validator.event_watcher.read_miner_commitment', + return_value=make_pinned_commitment(), + ): + w.apply_event(900, 'MinerReserved', {'miner': 'hk_a', 'reserved_until': 1000}) + + purged = w.expire_stale_reservation_pins() + + assert purged == 0 + assert w.state_store.get_reservation_pin('hk_a') is not None + assert not [ev for ev in w.reservation_pin_events if ev.hotkey == 'hk_a' and ev.kind == 'end'] + w.state_store.close() + + def test_expiry_sweep_is_idempotent(self, tmp_path: Path): + """Re-running after the row is purged emits no further 'end' events.""" + w = make_watcher(tmp_path, netuid=2, subtensor=MagicMock()) + w.state_store.current_block_fn = lambda: 1001 + with patch( + 'allways.validator.event_watcher.read_miner_commitment', + return_value=make_pinned_commitment(), + ): + w.apply_event(900, 'MinerReserved', {'miner': 'hk_a', 'reserved_until': 1000}) + + assert w.expire_stale_reservation_pins() == 1 + ends_after_first = [ev for ev in w.reservation_pin_events if ev.kind == 'end'] + assert w.expire_stale_reservation_pins() == 0 + ends_after_second = [ev for ev in w.reservation_pin_events if ev.kind == 'end'] + assert ends_after_second == ends_after_first + w.state_store.close() + def test_reservation_extension_finalized_bumps_pin_ttl(self, tmp_path: Path): w = make_watcher(tmp_path, netuid=2, subtensor=MagicMock()) with patch( diff --git a/tests/test_fulfillment.py b/tests/test_fulfillment.py index 3b977bf..28ee884 100644 --- a/tests/test_fulfillment.py +++ b/tests/test_fulfillment.py @@ -1,16 +1,26 @@ -"""SwapFulfiller — timeout cushion, sender verification, send-path behavior. +"""SwapFulfiller — timeout cushion, sender verification, send-path, send-cache. -These tests stay at the verify_swap_safety layer, which is the only part -of SwapFulfiller that's exercised on every forward step. +The cushion/safety tests stay at the verify_swap_safety layer. The send-cache +tests lock in the idempotency invariant: once dest funds are sent, an unmarked +cache entry must keep blocking a duplicate send until mark_fulfilled lands or +the swap is provably past its deadline. """ +from pathlib import Path from unittest.mock import MagicMock import pytest from allways.classes import Swap, SwapStatus -from allways.constants import MINER_TIMEOUT_CUSHION_BLOCKS -from allways.miner.fulfillment import SwapFulfiller +from allways.constants import ( + DEFAULT_FULFILLMENT_TIMEOUT_BLOCKS, + MAX_EXTENSION_BLOCKS, + MAX_EXTENSIONS_PER_SWAP, + MINER_TIMEOUT_CUSHION_BLOCKS, + SENT_CACHE_DISCARD_MARGIN_BLOCKS, +) +from allways.miner.fulfillment import SentSwap, SwapFulfiller +from allways.miner.swap_poller import MAX_REFRESH_MISSES, SwapPoller def make_fulfiller() -> SwapFulfiller: @@ -96,5 +106,202 @@ def test_return_is_post_fee_not_pre_fee(self): assert user_receives_amount == 3_415_500_000 +class TestSentCacheCleanup: + """cleanup_stale_sends retains unmarked sends to prevent duplicate dest + sends, but bounds retention so genuinely-resolved swaps don't leak forever.""" + + def test_unmarked_stale_retained_marked_stale_removed_within_deadline(self): + fulfiller = make_fulfiller() + fulfiller.subtensor.get_current_block.return_value = 100 # well within all deadlines + fulfiller.sent = { + 1: SentSwap('unmarked-stale-tx', 101, marked_fulfilled=False, timeout_block=500), + 2: SentSwap('marked-stale-tx', 102, marked_fulfilled=True, timeout_block=500), + 3: SentSwap('active-unmarked-tx', 103, marked_fulfilled=False, timeout_block=500), + } + fulfiller.mark_fulfilled_attempts = {1: 2, 2: 3, 3: 1} + + fulfiller.cleanup_stale_sends(active_swap_ids={3}) + + # marked stale (2) removed; unmarked stale within deadline (1) retained; active (3) untouched + assert set(fulfiller.sent) == {1, 3} + assert fulfiller.mark_fulfilled_attempts == {1: 2, 3: 1} + + def test_mark_fulfilled_retry_not_gated_by_cushion_after_send(self): + # Regression for #462: once dest funds are out, the swap stays Active + # until mark_fulfilled lands, and an Active swap is slashed at timeout. + # The cushion is scoped to STARTING a fulfill, so it must NOT gate the + # post-send mark_fulfilled retry — otherwise a transient mark_fulfilled + # failure inside the final cushion window guarantees a slash of a miner + # that already paid. Uses the REAL verify_swap_safety (not stubbed) so + # the cushion actually runs on the retry path. + from allways.contract_client import ContractError + + swap = make_swap(timeout_block=500) + fulfiller = make_fulfiller() + fulfiller.fee_divisor = 100 + # Inside the cushion window: a first SEND would be gated off here. + fulfiller.subtensor.get_current_block.return_value = 500 - MINER_TIMEOUT_CUSHION_BLOCKS + # Dest funds already sent on a prior pass, mark_fulfilled not yet landed. + fulfiller.sent[swap.id] = SentSwap('already-sent-dest-tx', 777, marked_fulfilled=False, timeout_block=500) + fulfiller.send_dest_funds = MagicMock() + # Transient (non-rejection) failure — keeps the entry retryable. + fulfiller.client.mark_fulfilled.side_effect = ContractError('transient rpc failure') + + result = fulfiller.process_swap(swap) + + assert result is False # mark_fulfilled didn't land this pass + fulfiller.send_dest_funds.assert_not_called() # never re-send + # The retry was attempted despite being inside the cushion window. + fulfiller.client.mark_fulfilled.assert_called_once_with( + wallet=fulfiller.wallet, + swap_id=swap.id, + to_tx_hash='already-sent-dest-tx', + to_amount=3_415_500_000, + to_tx_block=777, + ) + assert fulfiller.sent[swap.id].marked_fulfilled is False # still retryable next pass + + def test_retained_send_blocks_resend_after_poller_misses_and_rediscovery(self): + swap = make_swap(timeout_block=500) + poll_client = MagicMock() + poll_client.get_next_swap_id.return_value = swap.id + 1 + poll_client.get_swap.return_value = None + poller = SwapPoller(contract_client=poll_client, miner_hotkey=swap.miner_hotkey) + poller.active[swap.id] = swap + poller.last_scanned_id = swap.id + + fulfiller = make_fulfiller() + fulfiller.subtensor.get_current_block.return_value = 100 # within deadline → retain + fulfiller.sent[swap.id] = SentSwap('already-sent-dest-tx', 777, marked_fulfilled=False, timeout_block=500) + + # Transient read gap drops the swap from the poller's active set. + for _ in range(MAX_REFRESH_MISSES): + poller.poll() + assert poller.active == {} + + # Cleanup must NOT drop the unmarked entry while it's only transiently gone. + fulfiller.cleanup_stale_sends(active_swap_ids=set(poller.active)) + assert fulfiller.sent[swap.id].to_tx_hash == 'already-sent-dest-tx' + + # Swap reappears; process_swap must retry mark_fulfilled, not resend funds. + poll_client.get_swap.return_value = swap + poller.poll() + assert poller.active == {swap.id: swap} + + fulfiller.verify_swap_safety = MagicMock(return_value=(3_415_500_000, swap.miner_from_address)) + fulfiller.verify_user_sent_funds = MagicMock(return_value=True) + fulfiller.send_dest_funds = MagicMock(return_value=('second-dest-tx', 888)) + + assert fulfiller.process_swap(swap) is True + fulfiller.send_dest_funds.assert_not_called() + fulfiller.client.mark_fulfilled.assert_called_once_with( + wallet=fulfiller.wallet, + swap_id=swap.id, + to_tx_hash='already-sent-dest-tx', + to_amount=3_415_500_000, + to_tx_block=777, + ) + assert fulfiller.sent[swap.id].marked_fulfilled is True + + def test_unmarked_stale_discarded_once_past_deadline_margin(self): + fulfiller = make_fulfiller() + fulfiller.sent = {1: SentSwap('leaked-tx', 50, marked_fulfilled=False, timeout_block=100)} + # Provably past any possible (even fully-extended) deadline → safe to discard. + fulfiller.subtensor.get_current_block.return_value = 100 + SENT_CACHE_DISCARD_MARGIN_BLOCKS + 1 + + fulfiller.cleanup_stale_sends(active_swap_ids=set()) + assert fulfiller.sent == {} + + def test_unmarked_stale_retained_inside_deadline_margin(self): + fulfiller = make_fulfiller() + fulfiller.sent = { + 1: SentSwap('a', 1, marked_fulfilled=False, timeout_block=100), # just past timeout + 2: SentSwap('b', 2, marked_fulfilled=False, timeout_block=100), # exactly at margin boundary + } + # id 1: a few blocks past timeout but well inside the margin → retain. + # id 2: exactly timeout + margin → retain (discard uses strict >). + fulfiller.subtensor.get_current_block.return_value = 100 + SENT_CACHE_DISCARD_MARGIN_BLOCKS + + fulfiller.cleanup_stale_sends(active_swap_ids=set()) + assert set(fulfiller.sent) == {1, 2} + + def test_unmarked_stale_retained_across_two_extensions(self): + # Regression for #461: the contract permits MAX_EXTENSIONS_PER_SWAP (2) + # timeout extensions, each pushing timeout_block forward by up to + # MAX_EXTENSION_BLOCKS relative to its own propose block (not cumulative), + # so a live deadline can reach D0 + 2 * MAX_EXTENSION_BLOCKS. If the cached + # snapshot predates both extensions (a get_swap gap drops the swap from the + # active set, so process_swap never refreshes it), the margin must still + # cover the fully-extended deadline. The old margin (1 * MAX_EXTENSION_BLOCKS + # + DEFAULT_FULFILLMENT_TIMEOUT_BLOCKS) would discard here and re-send on + # rediscovery. + d0 = 100 + old_single_extension_margin = MAX_EXTENSION_BLOCKS + DEFAULT_FULFILLMENT_TIMEOUT_BLOCKS + live_deadline = d0 + MAX_EXTENSIONS_PER_SWAP * MAX_EXTENSION_BLOCKS + # Sanity-check this case actually exercises the gap the fix closes: the + # current block is past the old margin but the swap is still live on-chain. + current = d0 + old_single_extension_margin + 1 + assert current > d0 + old_single_extension_margin # would have been discarded pre-fix + assert current <= live_deadline # but the swap is still active on-chain + + fulfiller = make_fulfiller() + fulfiller.sent = {1: SentSwap('twice-extended-tx', 50, marked_fulfilled=False, timeout_block=d0)} + fulfiller.subtensor.get_current_block.return_value = current + + fulfiller.cleanup_stale_sends(active_swap_ids=set()) + assert set(fulfiller.sent) == {1} + + def test_legacy_entry_without_deadline_never_discarded(self): + fulfiller = make_fulfiller() + fulfiller.sent = {1: SentSwap('legacy-tx', 5, marked_fulfilled=False)} # timeout_block defaults to 0 + fulfiller.subtensor.get_current_block.return_value = 10**9 + + fulfiller.cleanup_stale_sends(active_swap_ids=set()) + assert set(fulfiller.sent) == {1} + + def test_subtensor_failure_during_cleanup_retains_unmarked(self): + fulfiller = make_fulfiller() + fulfiller.sent = {1: SentSwap('tx', 5, marked_fulfilled=False, timeout_block=100)} + fulfiller.subtensor.get_current_block.side_effect = RuntimeError('rpc down') + + # No raise, no wipe — without a block height we can't prove expiry. + fulfiller.cleanup_stale_sends(active_swap_ids=set()) + assert set(fulfiller.sent) == {1} + + def test_cache_persistence_roundtrips_timeout_block(self, tmp_path: Path): + cache_path = tmp_path / 'sent_cache.json' + writer = SwapFulfiller( + contract_client=MagicMock(), + chain_providers={}, + wallet=MagicMock(), + subtensor=MagicMock(), + sent_cache_path=cache_path, + ) + writer.sent = {7: SentSwap('tx7', 123, marked_fulfilled=False, timeout_block=456)} + writer.save_sent_cache() + + reader = SwapFulfiller( + contract_client=MagicMock(), + chain_providers={}, + wallet=MagicMock(), + subtensor=MagicMock(), + sent_cache_path=cache_path, + ) + assert reader.sent[7] == SentSwap('tx7', 123, marked_fulfilled=False, timeout_block=456) + + def test_legacy_three_element_cache_loads_with_zero_timeout(self, tmp_path: Path): + cache_path = tmp_path / 'sent_cache.json' + cache_path.write_text('{"9": ["legacy-tx", 999, false]}') # pre-fix 3-element shape + + reader = SwapFulfiller( + contract_client=MagicMock(), + chain_providers={}, + wallet=MagicMock(), + subtensor=MagicMock(), + sent_cache_path=cache_path, + ) + assert reader.sent[9] == SentSwap('legacy-tx', 999, marked_fulfilled=False, timeout_block=0) + + if __name__ == '__main__': pytest.main([__file__, '-v']) diff --git a/tests/test_scoring_v1.py b/tests/test_scoring_v1.py index 038db3b..e72b92c 100644 --- a/tests/test_scoring_v1.py +++ b/tests/test_scoring_v1.py @@ -8,23 +8,25 @@ from allways.constants import ( MAX_SCORING_BACKFILL_BLOCKS, + QVOL_REWARD_WEIGHT, RECYCLE_UID, SCORING_WINDOW_BLOCKS, - SUCCESS_EXPONENT, ) +from allways.utils.rate import is_executable_rate, min_executable_tao_leg from allways.validator.event_watcher import ActiveEvent, CollateralEvent, ContractEventWatcher from allways.validator.scoring import ( calculate_miner_rewards, - credibility_ramp, + credibility, crown_holders_at_instant, due_for_scoring, + make_crown_predicates, + quality_weighted_volume, replay_crown_time_window, score_and_reward_miners, scoring_window_bounds, snapshot_current_crown_holders, - success_rate, ) -from allways.validator.state_store import ValidatorStateStore +from allways.validator.state_store import ReservationPin, ValidatorStateStore POOL_TAO_BTC = 0.5 POOL_BTC_TAO = 0.5 @@ -153,46 +155,29 @@ def pad_hotkeys_to_cover_recycle(seeds: list[str]) -> list[str]: return hotkeys -class TestSuccessRateHelper: - def test_none_is_pessimistic(self): - """Zero observations earns no trust — the credibility hole closed.""" - assert success_rate(None) == 0.0 +class TestCredibilityHelper: + """The single reliability term: a volume ramp gated by the timeout cliff + (replaces the old success_rate³ × credibility_ramp pair).""" - def test_zero_total_is_pessimistic(self): - assert success_rate((0, 0)) == 0.0 - - def test_raw_ratio_ignores_ramp(self): - """success_rate is the raw completed/closed ratio — no ramp folded in.""" - assert success_rate((1, 0)) == 1.0 - assert success_rate((5, 0)) == 1.0 - assert success_rate((8, 2)) == 0.8 - assert success_rate((90, 10)) == 0.9 - - def test_timed_out_swaps_drop_raw_rate(self): - # 5 completed + 5 timed_out → raw = 0.5 - assert success_rate((5, 5)) == 0.5 - - -class TestCredibilityRampHelper: def test_none_is_zero(self): - assert credibility_ramp(None) == 0.0 - assert credibility_ramp((0, 0)) == 0.0 + assert credibility(None) == 0.0 + assert credibility((0, 0)) == 0.0 def test_scales_linearly_below_threshold(self): - """1 closed swap → ramp 0.1; 5 closed → 0.5. Counts tolerated timeouts too.""" - assert credibility_ramp((1, 0)) == 0.1 - assert credibility_ramp((5, 0)) == 0.5 - assert credibility_ramp((3, 2)) == 0.5 # 2 timeouts tolerated + """1 closed swap → 0.1; 5 closed → 0.5. Counts tolerated timeouts too.""" + assert credibility((1, 0)) == 0.1 + assert credibility((5, 0)) == 0.5 + assert credibility((3, 2)) == 0.5 # 2 timeouts tolerated def test_caps_at_full_ramp(self): - assert credibility_ramp((10, 0)) == 1.0 + assert credibility((10, 0)) == 1.0 def test_excess_timeouts_hard_zero(self): """More than CREDIBILITY_MAX_TIMEOUTS (2) timeouts → 0, regardless of volume.""" - assert credibility_ramp((8, 2)) == 1.0 # 2 tolerated, fully ramped - assert credibility_ramp((8, 3)) == 0.0 # 3rd timeout zeros it - assert credibility_ramp((90, 10)) == 0.0 # high volume can't rescue it - assert credibility_ramp((0, 3)) == 0.0 + assert credibility((8, 2)) == 1.0 # 2 tolerated, fully ramped + assert credibility((8, 3)) == 0.0 # 3rd timeout zeros it + assert credibility((90, 10)) == 0.0 # high volume can't rescue it + assert credibility((0, 3)) == 0.0 class TestCrownHoldersHelper: @@ -783,6 +768,52 @@ def test_boundary_squat_excluded_from_live_table(self, tmp_path: Path): v.state_store.close() +class TestLedgerSnapshotAgreement: + """The #450 invariant end-to-end: the live snapshot and the scoring ledger + must resolve the crown to the same holder when fed identical state. Guards + against a future one-sided edit even if it bypassed make_crown_predicates.""" + + def test_squat_dropped_by_both_paths(self, tmp_path: Path): + # Squatter posts the best rate but can't fund the leg it forces; the + # funded runner-up is the only eligible holder. Both the per-forward + # snapshot and the windowed replay must agree on hk_funded and exclude + # hk_squat — the executability/funding gate applied identically. + v = make_validator( + tmp_path, + ['hk_squat', 'hk_funded'], + block=1100, + min_swap_amount=100_000_000, + max_swap_amount=500_000_000, + collaterals={'hk_squat': 150_000_000, 'hk_funded': 500_000_000}, + ) + conn = v.state_store.require_connection() + for hk, rate in (('hk_squat', 50000.0), ('hk_funded', 326.0)): + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + (hk, 'btc', 'tao', rate, 0), + ) + conn.commit() + + snapshot_holders = [row[2] for row in snapshot_current_crown_holders(v)[('btc', 'tao')]] + ledger = replay_crown_time_window( + store=v.state_store, + event_watcher=v.event_watcher, + from_chain='btc', + to_chain='tao', + window_start=100, + window_end=1100, + rewardable_hotkeys={'hk_squat', 'hk_funded'}, + min_swap_rao=100_000_000, + max_swap_rao=500_000_000, + ) + + assert snapshot_holders == ['hk_funded'] + assert set(ledger) == {'hk_funded'} # squatter credited zero blocks + # The whole point: live view and rewarded ledger name the same holder. + assert snapshot_holders == list(ledger.keys()) + v.state_store.close() + + class TestPinnedRateDuringReservation: """Crown calculation must use the pinned rate during the reserved-not-busy window, not the live rate. Closes the bump-after-pin loophole.""" @@ -886,6 +917,68 @@ def test_pin_end_lets_live_rate_take_over(self, tmp_path: Path): assert crown == {'hk_a': 750.0, 'hk_b': 250.0} store.close() + def test_expired_reservation_pin_stops_earning_crown(self, tmp_path: Path): + """End-to-end: a reservation that lapses without a swap must stop + earning crown at the pinned rate. ``expire_stale_reservation_pins`` + emits the missing pin-end at reserved_until + 1, so once the live rate + reverts to junk after expiry the miner can no longer crown-squat on the + stale pinned rate.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a', 'hk_b'}) + conn = store.require_connection() + for hk in ('hk_a', 'hk_b'): + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + (hk, 'btc', 'tao', 200.0, 0), + ) + # A bumps its live rate to junk at block 700 — only the stale pin would + # keep it winning crown at 200 past expiry. + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + ('hk_a', 'btc', 'tao', 25000.0, 700), + ) + conn.commit() + + # A is reserved at block 300, TTL through 600, and never swaps — the + # synchronous pin row + the scoring 'start' both land, but no terminal + # event ever closes the pin. + store.upsert_reservation_pin( + ReservationPin( + miner_hotkey='hk_a', + reserve_block=300, + from_chain='btc', + to_chain='tao', + rate_str='200', + counter_rate_str='0', + miner_from_address='bc1-a', + miner_to_address='5a', + reserved_until=600, + created_at=1.0, + ) + ) + watcher._record_reservation_pin_event( + block_num=300, hotkey='hk_a', from_chain='btc', to_chain='tao', kind='start', rate=200.0 + ) + + # The forward loop's expiry sweep fires once the block passes the TTL. + store.current_block_fn = lambda: 601 + watcher.expire_stale_reservation_pins() + + crown = replay_crown_time_window( + store=store, + event_watcher=watcher, + from_chain='btc', + to_chain='tao', + window_start=100, + window_end=1100, + rewardable_hotkeys={'hk_a', 'hk_b'}, + ) + # (100, 601]: A pinned at 200, tie with B → 250.5 each. + # (601, 700]: pin closed, A live at 200, tie with B → 49.5 each. + # (700, 1100]: A live at junk 25000 — wins the rate sort, earns 400. + assert crown == {'hk_a': 700.0, 'hk_b': 300.0} + store.close() + def test_pin_only_applies_to_pinned_direction(self, tmp_path: Path): """A reservation pin is direction-specific — pinning btc→tao must not affect crown ranking in the tao→btc direction.""" @@ -1058,9 +1151,10 @@ def test_single_miner_full_pool_with_perfect_success(self, tmp_path: Path): np.testing.assert_allclose(rewards.sum(), 1.0, atol=1e-6) v.state_store.close() - def test_partial_success_reduces_reward_by_cube(self, tmp_path: Path): + def test_partial_credibility_ramp_reduces_reward(self, tmp_path: Path): # Opt out of the fixture's baseline credibility seed so the test's - # explicit 8-completed-2-timed-out profile is the only credibility data. + # explicit 5-completed profile is the only credibility data: 5/10 closed + # → credibility 0.5, scaling the sole crown holder's pool by half. hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) v = make_validator(tmp_path, hotkeys=hotkeys, baseline_credibility=False) conn = v.state_store.require_connection() @@ -1069,14 +1163,13 @@ def test_partial_success_reduces_reward_by_cube(self, tmp_path: Path): ('hk_a', 'tao', 'btc', 0.00020, 0), ) conn.commit() - for i in range(8): + for i in range(5): v.state_store.insert_swap_outcome(i + 1, 'hk_a', True, 100 + i) - for i in range(2): - v.state_store.insert_swap_outcome(100 + i, 'hk_a', False, 200 + i) rewards, _ = calculate_miner_rewards(v) - expected = POOL_TAO_BTC * (0.8**SUCCESS_EXPONENT) + # No clearing-rate volume → λ=0 → reward = pool · credibility · crown_share. + expected = POOL_TAO_BTC * 0.5 np.testing.assert_allclose(rewards[0], expected, atol=1e-6) np.testing.assert_allclose(rewards.sum(), 1.0, atol=1e-6) v.state_store.close() @@ -1678,64 +1771,40 @@ def test_negative_max_swap_is_fail_safe_one(self): assert capacity_factor(100_000_000, -1) == 1.0 -class TestVolumeFactorHelper: - """Direct unit tests for the volume_factor pure function. +class TestQualityWeightedVolumeHelper: + """quality_weighted_volume: per-miner Σ tao_amount × quality_factor(rate). - Signature: volume_factor(vol_rao, total_volume_rao, crown_share, alpha). + Observations are (clearing_rate, tao_amount, resolved_block, miner_hotkey). """ - def test_idle_crown_loses_alpha(self): - """vol=0 of total=1 → vol_share=0, participation=0 → factor = (1-α).""" - from allways.validator.scoring import volume_factor - - assert volume_factor(0, 1_000, crown_share=1.0, alpha=0.5) == 0.5 - - def test_matching_volume_keeps_full_reward(self): - from allways.validator.scoring import volume_factor - - # 500/1000 = 0.5 vol_share, crown_share 0.5 → participation 1.0. - assert volume_factor(500, 1_000, crown_share=0.5, alpha=0.5) == 1.0 - - def test_over_serving_capped_at_one(self): - """Anti-wash-trade: high volume / low crown stays at 1.0.""" - from allways.validator.scoring import volume_factor - - assert volume_factor(900, 1_000, crown_share=0.1, alpha=0.5) == 1.0 - - def test_partial_mismatch_interpolates(self): - """vol_share/crown_share = 0.5 → factor = 0.5 + 0.5*0.5 = 0.75.""" - from allways.validator.scoring import volume_factor - - assert volume_factor(250, 1_000, crown_share=0.5, alpha=0.5) == 0.75 - - def test_zero_crown_share_is_moot(self): - from allways.validator.scoring import volume_factor - - assert volume_factor(500, 1_000, crown_share=0.0, alpha=0.5) == 1.0 - - def test_idle_network_short_circuits_to_one(self): - """total_volume == 0 → factor = 1.0 (no penalty for a quiet window).""" - from allways.validator.scoring import volume_factor - - assert volume_factor(0, 0, crown_share=1.0, alpha=0.5) == 1.0 - - def test_alpha_zero_disables_volume_weighting(self): - from allways.validator.scoring import volume_factor + def test_bootstrap_reference_none_is_raw_volume(self): + # reference None → quality 1.0 → qvol == raw volume. + obs = [(345.0, 100, 0, 'a'), (350.0, 50, 0, 'b'), (345.0, 25, 0, 'a')] + qvol = quality_weighted_volume(obs, None, lower_rate_wins=False) + assert qvol == {'a': 125.0, 'b': 50.0} - for vol in (0, 250, 500, 750, 1_000): - assert volume_factor(vol, 1_000, crown_share=1.0, alpha=0.0) == 1.0 + def test_at_market_floors_volume(self): + # rate == reference → quality = QUALITY_FLOOR (0.5) → half credit. + obs = [(100.0, 200, 0, 'a')] + qvol = quality_weighted_volume(obs, 100.0, lower_rate_wins=False) + np.testing.assert_allclose(qvol['a'], 100.0, atol=1e-6) - def test_alpha_one_is_pure_volume_share(self): - from allways.validator.scoring import volume_factor + def test_deep_rate_full_credit(self): + # btc→tao: 5% above reference → quality 1.0 → full volume credit. + obs = [(105.0, 200, 0, 'a')] + qvol = quality_weighted_volume(obs, 100.0, lower_rate_wins=False) + np.testing.assert_allclose(qvol['a'], 200.0, atol=1e-6) - assert volume_factor(0, 1_000, crown_share=1.0, alpha=1.0) == 0.0 - assert volume_factor(250, 1_000, crown_share=0.5, alpha=1.0) == 0.5 + def test_below_market_floored_not_zero(self): + # Worse than market → still the floor, never below. + obs = [(80.0, 200, 0, 'a')] + qvol = quality_weighted_volume(obs, 100.0, lower_rate_wins=False) + np.testing.assert_allclose(qvol['a'], 100.0, atol=1e-6) - def test_alpha_03_softer_penalty(self): - from allways.validator.scoring import volume_factor - - assert volume_factor(0, 1_000, crown_share=1.0, alpha=0.3) == 0.7 - np.testing.assert_allclose(volume_factor(500, 1_000, crown_share=1.0, alpha=0.3), 0.85, atol=1e-6) + def test_skips_nonpositive_rows(self): + obs = [(0.0, 100, 0, 'a'), (345.0, 0, 0, 'b'), (345.0, 50, 0, 'c')] + qvol = quality_weighted_volume(obs, None, lower_rate_wins=False) + assert qvol == {'c': 50.0} class TestCapacityWeighting: @@ -1908,36 +1977,21 @@ def test_record_capacity_sets_factor(self): wt.record_capacity(factor=0.5) assert wt.capacity_factor == 0.5 - def test_record_volume_computes_share_and_participation(self): + def test_record_shares_stores_crown_and_qvol(self): from allways.validator.scoring_trace import WeightingTrace wt = WeightingTrace() - # vol 250 of total 1000 = 25% share; crown_share 50% → participation 50%. - wt.record_volume(vol_rao=250, total_volume_rao=1_000, crown_share=0.5, factor=0.75) + wt.record_shares(volume_rao=250, crown_share=0.5, qvol_share=0.4) assert wt.volume_rao == 250 - assert wt.volume_share == 0.25 assert wt.crown_share == 0.5 - assert wt.participation == 0.5 - assert wt.volume_factor == 0.75 - - def test_record_volume_idle_network_zeros_share(self): - """total_volume == 0 → volume_share = 0, participation defaults to 1.0 - only when crown_share also 0; otherwise participation = 0.""" - from allways.validator.scoring_trace import WeightingTrace - - wt = WeightingTrace() - wt.record_volume(vol_rao=0, total_volume_rao=0, crown_share=1.0, factor=1.0) - assert wt.volume_share == 0.0 - assert wt.participation == 0.0 # vol_share / crown_share = 0/1 = 0 - assert wt.volume_factor == 1.0 # set by caller (idle-network short-circuit) + assert wt.qvol_share == 0.4 - def test_record_volume_caps_participation_at_one(self): - """Over-serving: vol_share/crown_share > 1 → participation capped.""" + def test_record_shares_defaults_are_zero(self): from allways.validator.scoring_trace import WeightingTrace wt = WeightingTrace() - wt.record_volume(vol_rao=900, total_volume_rao=1_000, crown_share=0.1, factor=1.0) - assert wt.participation == 1.0 # min(1.0, 0.9/0.1) + assert wt.crown_share == 0.0 + assert wt.qvol_share == 0.0 def test_record_credibility_computes_ramp(self): from allways.validator.scoring_trace import WeightingTrace @@ -1945,14 +1999,14 @@ def test_record_credibility_computes_ramp(self): wt = WeightingTrace() wt.record_credibility(closed_swaps=5, ramp_target=10) assert wt.closed_swaps == 5 - assert wt.credibility_ramp == 0.5 + assert wt.credibility == 0.5 def test_record_credibility_caps_at_one(self): from allways.validator.scoring_trace import WeightingTrace wt = WeightingTrace() wt.record_credibility(closed_swaps=20, ramp_target=10) - assert wt.credibility_ramp == 1.0 + assert wt.credibility == 1.0 def test_record_credibility_zero_target_is_unity(self): """Defensive: ramp_target=0 → divide-by-zero guarded → 1.0.""" @@ -1960,16 +2014,16 @@ def test_record_credibility_zero_target_is_unity(self): wt = WeightingTrace() wt.record_credibility(closed_swaps=5, ramp_target=0) - assert wt.credibility_ramp == 1.0 + assert wt.credibility == 1.0 def test_record_credibility_excess_timeouts_zero(self): - """Trace mirrors the reward rule: >3 timeouts → ramp shown as 0.""" + """Trace mirrors the reward rule: >2 timeouts → credibility shown as 0.""" from allways.validator.scoring_trace import WeightingTrace wt = WeightingTrace() wt.record_credibility(closed_swaps=100, ramp_target=10, timed_out=4) assert wt.closed_swaps == 100 - assert wt.credibility_ramp == 0.0 + assert wt.credibility == 0.0 class TestVolumeWeighting: @@ -1995,6 +2049,7 @@ def insert_volume( completed: bool = True, from_chain: str = 'tao', to_chain: str = 'btc', + clearing_rate: float = 0.0, ) -> None: v.state_store.insert_swap_outcome( swap_id=swap_id, @@ -2004,6 +2059,7 @@ def insert_volume( tao_amount=tao_amount, from_chain=from_chain, to_chain=to_chain, + clearing_rate=clearing_rate, ) def test_idle_network_no_penalty(self, tmp_path: Path): @@ -2016,24 +2072,23 @@ def test_idle_network_no_penalty(self, tmp_path: Path): np.testing.assert_allclose(rewards[0], POOL_TAO_BTC, atol=1e-6) v.state_store.close() - def test_idle_crown_holder_loses_alpha(self, tmp_path: Path): - """A holds 100% crown, B serves 100% volume → A factor = (1 - α).""" + def test_idle_crown_holder_yields_volume_share(self, tmp_path: Path): + """A holds 100% crown but serves no volume; B serves all the (quality) + volume with no crown. The λ volume slice goes to B, the (1−λ) crown + slice to A — so A keeps (1−λ)·pool and B earns λ·pool.""" hotkeys = pad_hotkeys_to_cover_recycle(['hk_a', 'hk_b']) v = make_validator(tmp_path, hotkeys) self.seed_tao_btc_crown(v, 'hk_a') - # B doesn't post a rate → never holds crown. - self.insert_volume(v, 'hk_b', tao_amount=1_000_000_000, swap_id=1) + # B never posts a rate (no crown) but serves quality volume. + self.insert_volume(v, 'hk_b', tao_amount=1_000_000_000, swap_id=1, clearing_rate=300.0) rewards, _ = calculate_miner_rewards(v) - # A's vol_share = 0, crown_share = 1.0 → participation = 0 → factor = 0.5. - # B has crown_share = 0 → no crown reward to multiply. - np.testing.assert_allclose(rewards[0], POOL_TAO_BTC * 0.5, atol=1e-6) - assert rewards[1] == 0.0 + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC * (1 - QVOL_REWARD_WEIGHT), atol=1e-6) + np.testing.assert_allclose(rewards[1], POOL_TAO_BTC * QVOL_REWARD_WEIGHT, atol=1e-6) v.state_store.close() - def test_matched_crown_and_volume_full_reward(self, tmp_path: Path): - """Equal crown + equal volume → factor 1.0 for both.""" - from allways.constants import VOLUME_WEIGHT_ALPHA # noqa: F401 — keep imports tidy - + def test_matched_crown_and_volume_split_evenly(self, tmp_path: Path): + """Equal crown + equal quality volume → each earns half the pool, + independent of λ (both terms are 50/50).""" hotkeys = pad_hotkeys_to_cover_recycle(['hk_a', 'hk_b']) v = make_validator(tmp_path, hotkeys) conn = v.state_store.require_connection() @@ -2043,44 +2098,40 @@ def test_matched_crown_and_volume_full_reward(self, tmp_path: Path): (hk, 'tao', 'btc', 0.00020, 0), ) conn.commit() - self.insert_volume(v, 'hk_a', tao_amount=500_000_000, swap_id=1) - self.insert_volume(v, 'hk_b', tao_amount=500_000_000, swap_id=2) + self.insert_volume(v, 'hk_a', tao_amount=500_000_000, swap_id=1, clearing_rate=300.0) + self.insert_volume(v, 'hk_b', tao_amount=500_000_000, swap_id=2, clearing_rate=300.0) rewards, _ = calculate_miner_rewards(v) - # Both 50/50 on crown and volume → participation 1.0 → factor 1.0 each. np.testing.assert_allclose(rewards[0], POOL_TAO_BTC * 0.5, atol=1e-6) np.testing.assert_allclose(rewards[1], POOL_TAO_BTC * 0.5, atol=1e-6) v.state_store.close() - def test_over_serving_capped_no_bonus(self, tmp_path: Path): - """A holds 100% crown but B serves 9× more volume → A's factor still > 0, - B gets nothing (no crown). Verifies cap is one-sided: high volume can't - amplify a low crown holder.""" + def test_volume_server_without_crown_earns_qvol_slice(self, tmp_path: Path): + """A holds 100% crown and serves 10% of volume; B serves 90% with no + crown. A = λ·0.1 + (1−λ)·1.0 of the pool; B = λ·0.9. Volume now *earns* + for a non-crown server — the old one-sided 'over-serve cap' is gone.""" hotkeys = pad_hotkeys_to_cover_recycle(['hk_a', 'hk_b']) v = make_validator(tmp_path, hotkeys) - # btc→tao: higher rate wins (canonical direction). A wins, B loses. conn = v.state_store.require_connection() conn.execute( 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', ('hk_a', 'btc', 'tao', 200.0, 0), ) - conn.execute( - 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', - ('hk_b', 'btc', 'tao', 100.0, 0), - ) conn.commit() - self.insert_volume(v, 'hk_a', tao_amount=100_000_000, swap_id=1, from_chain='btc', to_chain='tao') - self.insert_volume(v, 'hk_b', tao_amount=900_000_000, swap_id=2, from_chain='btc', to_chain='tao') + self.insert_volume( + v, 'hk_a', tao_amount=100_000_000, swap_id=1, from_chain='btc', to_chain='tao', clearing_rate=200.0 + ) + self.insert_volume( + v, 'hk_b', tao_amount=900_000_000, swap_id=2, from_chain='btc', to_chain='tao', clearing_rate=200.0 + ) rewards, _ = calculate_miner_rewards(v) - # A: crown_share = 1.0, vol_share = 0.1, participation = 0.1 → factor = 0.55 - np.testing.assert_allclose(rewards[0], POOL_BTC_TAO * 0.55, atol=1e-6) - # B: crown_share = 0 → factor moot, no reward to multiply. - assert rewards[1] == 0.0 + lam = QVOL_REWARD_WEIGHT + np.testing.assert_allclose(rewards[0], POOL_BTC_TAO * (lam * 0.1 + (1 - lam) * 1.0), atol=1e-6) + np.testing.assert_allclose(rewards[1], POOL_BTC_TAO * (lam * 0.9), atol=1e-6) v.state_store.close() - def test_partial_mismatch_interpolates(self, tmp_path: Path): + def test_partial_mismatch_blends_crown_and_volume(self, tmp_path: Path): """A holds 80% crown / 20% volume, B holds 20% crown / 80% volume. - Uses btc→tao (higher rate wins) so the rate-direction mapping is - self-evident in the test.""" + Reward blends both shares at λ.""" hotkeys = pad_hotkeys_to_cover_recycle(['hk_a', 'hk_b']) v = make_validator(tmp_path, hotkeys) conn = v.state_store.require_connection() @@ -2093,41 +2144,32 @@ def test_partial_mismatch_interpolates(self, tmp_path: Path): ('hk_b', 'btc', 'tao', 150.0, 0), ) conn.commit() - # Window is (9700, 10000]. A busy block 9_800..9_860 (60 blocks within - # the window) so B holds crown 20% of window. We can't use a - # SwapCompleted event here because the direction lookup needs an - # active swap entry in the tracker — easier to just record the - # outcome and the busy delta directly. + # A busy 9_800..9_860 (60 of the 300-block window) so B holds crown 20%. v.event_watcher.apply_busy_delta(9_800, 'hk_a', +1) v.event_watcher.apply_busy_delta(9_860, 'hk_a', -1) - self.insert_volume(v, 'hk_a', tao_amount=200_000_000, swap_id=1, from_chain='btc', to_chain='tao') - self.insert_volume(v, 'hk_b', tao_amount=800_000_000, swap_id=2, from_chain='btc', to_chain='tao') + self.insert_volume( + v, 'hk_a', tao_amount=200_000_000, swap_id=1, from_chain='btc', to_chain='tao', clearing_rate=200.0 + ) + self.insert_volume( + v, 'hk_b', tao_amount=800_000_000, swap_id=2, from_chain='btc', to_chain='tao', clearing_rate=200.0 + ) rewards, _ = calculate_miner_rewards(v) - # Crown: A=240/300=0.8, B=60/300=0.2. Volume: A=0.2, B=0.8. - # A participation = 0.2/0.8 = 0.25 → factor 0.625. - # B participation = min(1.0, 0.8/0.2) = 1.0 → factor 1.0. - np.testing.assert_allclose(rewards[0], POOL_BTC_TAO * 0.8 * 0.625, atol=1e-6) - np.testing.assert_allclose(rewards[1], POOL_BTC_TAO * 0.2 * 1.0, atol=1e-6) + lam = QVOL_REWARD_WEIGHT + # Crown: A=0.8, B=0.2. Volume (qvol_share at the same clearing rate): A=0.2, B=0.8. + np.testing.assert_allclose(rewards[0], POOL_BTC_TAO * (lam * 0.2 + (1 - lam) * 0.8), atol=1e-6) + np.testing.assert_allclose(rewards[1], POOL_BTC_TAO * (lam * 0.8 + (1 - lam) * 0.2), atol=1e-6) v.state_store.close() - def test_timed_out_swaps_dont_count_as_volume(self, tmp_path: Path): - """SwapTimedOut hits credibility, not volume.""" + def test_timed_out_swap_adds_no_qvol(self, tmp_path: Path): + """A timed-out swap is not completed, so it never enters quality-weighted + volume — the sole crown holder still earns the full pool (λ collapses to + crown when there's no realized volume).""" hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) - v = make_validator(tmp_path, hotkeys, baseline_credibility=False) + v = make_validator(tmp_path, hotkeys) # baseline credibility → cred 1.0 self.seed_tao_btc_crown(v, 'hk_a') - self.insert_volume( - v, - 'hk_a', - tao_amount=1_000_000_000, - swap_id=1, - completed=False, - ) + self.insert_volume(v, 'hk_a', tao_amount=1_000_000_000, swap_id=1, completed=False, clearing_rate=300.0) rewards, _ = calculate_miner_rewards(v) - # Volume aggregator returns 0 → idle network short-circuit → factor 1.0. - # sr from (0 completed, 1 timed_out) = 0 → cubed = 0 → reward 0 via - # credibility, confirming the timed-out swap didn't sneak through as - # volume credit. - assert rewards[0] == 0.0 + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC, atol=1e-6) v.state_store.close() def test_volume_split_per_direction(self, tmp_path: Path): @@ -2188,16 +2230,19 @@ def test_legacy_rows_with_zero_tao_amount_tolerated(self, tmp_path: Path): class TestCapacityVolumeInteraction: - """Capacity + volume are independent multipliers — verify they compose.""" + """Capacity discounts only the crown (availability) term; the qvol term is + independent. Verify they compose as ``λ·qvol + (1−λ)·crown·cap``.""" - def test_both_factors_compose_multiplicatively(self, tmp_path: Path): - """Single miner, capacity 0.5, idle on volume → reward = pool * 0.5 * 0.5.""" - hotkeys = pad_hotkeys_to_cover_recycle(['hk_a', 'hk_b']) + def test_capacity_discounts_only_crown_term(self, tmp_path: Path): + """A holds 100% crown at capacity 0.5 and serves 100% of the quality + volume. Capacity halves the crown slice but not the volume slice: + reward = pool · (λ·1.0 + (1−λ)·1.0·0.5).""" + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) v = make_validator( tmp_path, hotkeys, max_swap_amount=500_000_000, - collaterals={'hk_a': 250_000_000}, + collaterals={'hk_a': 250_000_000}, # capacity 0.5 ) conn = v.state_store.require_connection() conn.execute( @@ -2205,19 +2250,19 @@ def test_both_factors_compose_multiplicatively(self, tmp_path: Path): ('hk_a', 'tao', 'btc', 0.00020, 0), ) conn.commit() - # B serves some volume in A's market (tao→btc) so A's vol_share = 0. v.state_store.insert_swap_outcome( swap_id=1, - miner_hotkey='hk_b', + miner_hotkey='hk_a', completed=True, - resolved_block=9_900, # within the default validator's scoring window + resolved_block=9_900, tao_amount=500_000_000, from_chain='tao', to_chain='btc', + clearing_rate=300.0, ) rewards, _ = calculate_miner_rewards(v) - # A: pool 0.5 × crown 1.0 × sr 1.0 × capacity 0.5 × volume_factor 0.5 - np.testing.assert_allclose(rewards[0], 0.5 * 1.0 * 1.0 * 0.5 * 0.5, atol=1e-6) + lam = QVOL_REWARD_WEIGHT + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC * (lam * 1.0 + (1 - lam) * 1.0 * 0.5), atol=1e-6) v.state_store.close() def test_full_pool_conservation_with_all_factors(self, tmp_path: Path): @@ -2314,9 +2359,10 @@ def test_full_ramp_at_threshold(self, tmp_path: Path): np.testing.assert_allclose(rewards[0], POOL_BTC_TAO, atol=1e-6) v.state_store.close() - def test_tolerated_timeouts_advance_ramp_but_hurt_raw_rate(self, tmp_path: Path): - """8 completed + 2 timed_out (within the 2-timeout tolerance) → ramp = 1.0, - raw = 0.8 → 0.8³ × 1.0.""" + def test_tolerated_timeouts_keep_full_credibility(self, tmp_path: Path): + """8 completed + 2 timed_out: 10 closed → full ramp, and 2 timeouts are + within tolerance → credibility 1.0, so the sole crown holder earns the + full pool. (The old success_rate³ ratio penalty is gone.)""" hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) v = make_validator(tmp_path, hotkeys, baseline_credibility=False) self.seed_btc_tao_crown(v, 'hk_a') @@ -2329,7 +2375,7 @@ def test_tolerated_timeouts_advance_ramp_but_hurt_raw_rate(self, tmp_path: Path) swap_id=100 + i, miner_hotkey='hk_a', completed=False, resolved_block=200 + i ) rewards, _ = calculate_miner_rewards(v) - np.testing.assert_allclose(rewards[0], POOL_BTC_TAO * 0.8**3, atol=1e-6) + np.testing.assert_allclose(rewards[0], POOL_BTC_TAO, atol=1e-6) v.state_store.close() def test_excess_timeouts_zero_reward(self, tmp_path: Path): @@ -2700,7 +2746,7 @@ def test_competitive_but_present_zero_is_insufficient_collateral(self): reason = diagnose_non_earner( 'hk', {('tao', 'btc'): 279.3}, - sr=0.98, + cred=0.98, ever_active={'hk'}, direction_traces={('tao', 'btc'): self._trace(280.0)}, collaterals={'hk': 0}, @@ -2715,7 +2761,7 @@ def test_competitive_but_unknown_collateral(self): reason = diagnose_non_earner( 'hk', {('tao', 'btc'): 279.3}, - sr=0.98, + cred=0.98, ever_active={'hk'}, direction_traces={('tao', 'btc'): self._trace(280.0)}, collaterals={}, # absent → unknown @@ -2731,7 +2777,7 @@ def test_genuinely_worse_rate_is_direction_aware_outbid(self): reason = diagnose_non_earner( 'hk', {('tao', 'btc'): 281.0}, - sr=0.98, + cred=0.98, ever_active={'hk'}, direction_traces={('tao', 'btc'): self._trace(280.0)}, collaterals={'hk': 500_000_000}, @@ -2746,7 +2792,7 @@ def test_competitive_and_funded_is_unfilled_not_outbid(self): reason = diagnose_non_earner( 'hk', {('tao', 'btc'): 279.3}, - sr=0.98, + cred=0.98, ever_active={'hk'}, direction_traces={('tao', 'btc'): self._trace(280.0)}, collaterals={'hk': 500_000_000}, @@ -2805,3 +2851,343 @@ def test_fresh_seed_scores_one_trailing_window(self): seed = max(0, block - SCORING_WINDOW_BLOCKS) start, end = scoring_window_bounds(current_block=block, last_scored_block=seed) assert (start, end) == (block - SCORING_WINDOW_BLOCKS, block) + + +class TestCrownPredicateParity: + """make_crown_predicates is the single source of crown eligibility for both + the scoring replay and the live snapshot. Lock its semantics to the shared + rate utils so a future edit can't let the live view drift from the ledger.""" + + # 0.1 / 0.5 TAO — the live on-chain swap bounds. + BOUNDS = [(0, 0), (100_000_000, 500_000_000)] + DIRECTIONS = [('btc', 'tao'), ('tao', 'btc')] + RATES = [0.00015, 1.0, 345.0, 50_000_000.0, 1e10, 0.0, -1.0, float('inf')] + + def _reference(self, from_chain, to_chain, min_rao, max_rao, collaterals): + def exec_ref(rate): + return is_executable_rate(rate, from_chain, to_chain, min_rao, max_rao) + + def fund_ref(hotkey, rate): + if hotkey not in collaterals: + return True + min_leg = min_executable_tao_leg(rate, from_chain, to_chain, min_rao, max_rao) + return min_leg == 0 or collaterals[hotkey] >= min_leg + + return exec_ref, fund_ref + + def test_matches_shared_rate_utils_across_matrix(self): + collaterals = {'hk_rich': 10_000_000_000, 'hk_poor': 1, 'hk_zero': 0} + probe_hotkeys = ['hk_rich', 'hk_poor', 'hk_zero', 'hk_absent'] + for from_chain, to_chain in self.DIRECTIONS: + for min_rao, max_rao in self.BOUNDS: + executable_check, can_fund = make_crown_predicates(from_chain, to_chain, min_rao, max_rao, collaterals) + exec_ref, fund_ref = self._reference(from_chain, to_chain, min_rao, max_rao, collaterals) + for rate in self.RATES: + assert executable_check(rate) == exec_ref(rate), ( + f'executable_check drift dir={from_chain}->{to_chain} bounds=({min_rao},{max_rao}) rate={rate}' + ) + for hk in probe_hotkeys: + assert can_fund(hk, rate) == fund_ref(hk, rate), ( + f'can_fund drift dir={from_chain}->{to_chain} ' + f'bounds=({min_rao},{max_rao}) hk={hk} rate={rate}' + ) + + def test_fail_open_on_absent_collateral(self): + # absent != zero — a miner with no recorded baseline must not be dropped. + _, can_fund = make_crown_predicates('btc', 'tao', 100_000_000, 500_000_000, {}) + assert can_fund('hk_unknown', 345.0) is True + + def test_drops_holder_whose_collateral_cannot_fund_min_leg(self): + # 1-rao collateral can't cover any real in-band leg → boundary-squat drop; + # a richly-funded miner at the same rate passes. + collaterals = {'hk_poor': 1, 'hk_rich': 10_000_000_000} + _, can_fund = make_crown_predicates('btc', 'tao', 100_000_000, 500_000_000, collaterals) + rate = 345.0 + min_leg = min_executable_tao_leg(rate, 'btc', 'tao', 100_000_000, 500_000_000) + assert min_leg > 0 # rate is executable, so the gate is live + assert can_fund('hk_poor', rate) is False + assert can_fund('hk_rich', rate) is True + + +class TestQualityReferenceHelper: + """Unit tests for the depth reference + quality-factor pure functions.""" + + def test_empty_observations_bootstrap_none(self): + from allways.validator.scoring import compute_quality_reference + + assert compute_quality_reference([], now_block=1_000) is None + + def test_below_n_min_bootstrap_none(self): + from allways.validator.scoring import compute_quality_reference + + obs = [(10.0, 1, 0, 'a'), (11.0, 1, 0, 'b')] + assert compute_quality_reference(obs, now_block=0, n_min=5) is None + + def test_volume_weighted_mean(self): + from allways.validator.scoring import compute_quality_reference + + obs = [(10.0, 1, 0, 'a'), (20.0, 99, 0, 'b')] + ref = compute_quality_reference(obs, now_block=0, n_min=2, trim_pct=0.0, per_miner_cap=1.0) + # (1·10 + 99·20) / 100 = 19.9 — big swap dominates. + assert np.isclose(ref, 19.9) + + def test_recency_decay_favors_recent(self): + from allways.validator.scoring import compute_quality_reference + + obs = [(10.0, 1, 0, 'a'), (20.0, 1, 1_000, 'b')] + ref = compute_quality_reference( + obs, now_block=1_000, n_min=2, trim_pct=0.0, per_miner_cap=1.0, half_life_blocks=1_000 + ) + # old weight 2^-1 = 0.5, recent 2^0 = 1.0 → (5 + 20) / 1.5. + assert np.isclose(ref, 25.0 / 1.5) + + def test_trim_drops_outlier(self): + from allways.validator.scoring import compute_quality_reference + + obs = [(10.0, 1, 0, f'm{i}') for i in range(10)] + [(1e6, 1, 0, 'outlier')] + ref = compute_quality_reference(obs, now_block=0, n_min=5, trim_pct=0.10, per_miner_cap=1.0) + # 11 rows, drop 1 each tail → the 1e6 outlier is trimmed away. + assert np.isclose(ref, 10.0) + + def test_per_miner_cap_limits_flood(self): + from allways.validator.scoring import compute_quality_reference + + obs = [(100.0, 1, 0, 'flood') for _ in range(20)] + [(10.0, 1, 0, f'h{i}') for i in range(5)] + ref = compute_quality_reference(obs, now_block=0, n_min=5, trim_pct=0.0, per_miner_cap=0.30) + uncapped = (20 * 100 + 5 * 10) / 25 # 80.4 — flood dominates + assert ref < uncapped + # flood capped to 30% of weight → 800 / 12.5 = 64.0, pulled toward honest. + assert np.isclose(ref, 64.0) + + def test_shuffle_invariance(self): + """Determinism: the reference must be byte-identical regardless of input + order (guards the stable-sort + math.fsum consensus requirement).""" + from allways.validator.scoring import compute_quality_reference + + obs = [(10.0 + i, (i % 3) + 1, i * 10, f'm{i % 4}') for i in range(15)] + ref1 = compute_quality_reference(obs, now_block=200, n_min=3) + ref2 = compute_quality_reference(list(reversed(obs)), now_block=200, n_min=3) + ref3 = compute_quality_reference(obs[7:] + obs[:7], now_block=200, n_min=3) + assert ref1 == ref2 == ref3 + + def test_factor_none_reference_is_noop(self): + from allways.validator.scoring import quality_factor + + assert quality_factor(123.0, None, False) == 1.0 + + def test_factor_at_market_is_floor(self): + from allways.validator.scoring import quality_factor + + assert quality_factor(100.0, 100.0, False) == 0.5 + assert quality_factor(100.0, 100.0, True) == 0.5 + + def test_factor_anchor_saturates_both_directions(self): + from allways.validator.scoring import quality_factor + + # btc→tao: 5% higher = full bonus. tao→btc: 5% lower = full bonus. + assert np.isclose(quality_factor(105.0, 100.0, False), 1.0) + assert np.isclose(quality_factor(95.0, 100.0, True), 1.0) + + def test_factor_half_anchor_interpolates(self): + from allways.validator.scoring import quality_factor + + # 2.5% improvement = half the anchor → 0.5 + 0.5·0.5 = 0.75. + assert np.isclose(quality_factor(102.5, 100.0, False), 0.75) + + def test_factor_below_market_floored(self): + from allways.validator.scoring import quality_factor + + assert quality_factor(98.0, 100.0, False) == 0.5 # worse than market, higher-better + assert quality_factor(105.0, 100.0, True) == 0.5 # worse than market, lower-better + + def test_factor_past_anchor_capped(self): + from allways.validator.scoring import quality_factor + + assert quality_factor(200.0, 100.0, False) == 1.0 + + +class TestQualityWeighting: + """End-to-end depth/quality weighting via calculate_miner_rewards.""" + + def seed_crown(self, v, hotkey, rate, from_chain='tao', to_chain='btc'): + conn = v.state_store.require_connection() + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + (hotkey, from_chain, to_chain, rate, 0), + ) + conn.commit() + + def seed_clearing( + self, v, hotkey, rate, n=25, from_chain='tao', to_chain='btc', tao_amount=100_000_000, swap_id_base=1_000 + ): + for i in range(n): + v.state_store.insert_swap_outcome( + swap_id=swap_id_base + i, + miner_hotkey=hotkey, + completed=True, + resolved_block=9_900, # inside the default [9_700, 10_000] window + tao_amount=tao_amount, + from_chain=from_chain, + to_chain=to_chain, + clearing_rate=rate, + ) + + def test_deep_rate_earns_full_pool(self, tmp_path: Path): + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator(tmp_path, hotkeys) + # tao→btc: lower is deeper. Reference 0.00020, holder posts 6% lower. + self.seed_clearing(v, 'hk_a', rate=0.00020) + self.seed_crown(v, 'hk_a', rate=0.00020 * 0.94) + rewards, _ = calculate_miner_rewards(v) + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC, atol=1e-6) + v.state_store.close() + + def test_at_market_rate_earns_floor(self, tmp_path: Path): + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator(tmp_path, hotkeys) + self.seed_clearing(v, 'hk_a', rate=0.00020) + self.seed_crown(v, 'hk_a', rate=0.00020) # at reference → quality floor + rewards, _ = calculate_miner_rewards(v) + # hk_a is sole crown holder AND sole server, both at-market: crown term = + # cap·quality = 0.5 (floor), qvol_share = 1.0. reward = λ·1 + (1−λ)·0.5. + lam = QVOL_REWARD_WEIGHT + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC * (lam * 1.0 + (1 - lam) * 0.5), atol=1e-6) + np.testing.assert_allclose(rewards.sum(), 1.0, atol=1e-6) + v.state_store.close() + + def test_bootstrap_below_n_min_behaves_like_today(self, tmp_path: Path): + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator(tmp_path, hotkeys) + # Only 5 observations (< QUALITY_N_MIN) → reference disabled → factor 1.0. + self.seed_clearing(v, 'hk_a', rate=0.00020, n=5) + self.seed_crown(v, 'hk_a', rate=0.00020) # at market, but depth is off + rewards, _ = calculate_miner_rewards(v) + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC, atol=1e-6) + v.state_store.close() + + def test_btc_tao_direction_deep_earns_full(self, tmp_path: Path): + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator(tmp_path, hotkeys) + # btc→tao: higher is deeper. Reference 280, holder posts 6% higher. + self.seed_clearing(v, 'hk_a', rate=280.0, from_chain='btc', to_chain='tao') + self.seed_crown(v, 'hk_a', rate=280.0 * 1.06, from_chain='btc', to_chain='tao') + rewards, _ = calculate_miner_rewards(v) + np.testing.assert_allclose(rewards[0], POOL_BTC_TAO, atol=1e-6) + v.state_store.close() + + def test_idle_holder_at_market_earns_crown_slice_only(self, tmp_path: Path): + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a', 'hk_b']) + v = make_validator(tmp_path, hotkeys) + # hk_a holds crown at market (quality 0.5) but serves no volume; hk_b + # serves all the volume. hk_a gets only the (1−λ) crown slice, halved by + # the at-market quality floor; the λ volume slice goes to hk_b. + self.seed_crown(v, 'hk_a', rate=0.00020) + self.seed_clearing(v, 'hk_b', rate=0.00020, swap_id_base=2_000) + rewards, _ = calculate_miner_rewards(v) + lam = QVOL_REWARD_WEIGHT + np.testing.assert_allclose(rewards[0], POOL_TAO_BTC * (1 - lam) * 0.5, atol=1e-6) + np.testing.assert_allclose(rewards[1], POOL_TAO_BTC * lam, atol=1e-6) + v.state_store.close() + + +class TestClearingRateStorage: + """clearing_rate column + per-direction query for the depth reference.""" + + def test_round_trip(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.insert_swap_outcome( + swap_id=1, + miner_hotkey='hk_a', + completed=True, + resolved_block=100, + tao_amount=5, + from_chain='tao', + to_chain='btc', + clearing_rate=0.00021, + ) + rows = store.get_clearing_rates_by_direction_since(0, 'tao', 'btc') + assert len(rows) == 1 + rate, tao, block, hk = rows[0] + assert np.isclose(rate, 0.00021) + assert (tao, block, hk) == (5, 100, 'hk_a') + store.close() + + def test_excludes_zero_rate_and_timed_out(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + # completed but no usable rate (legacy / unresolved) + store.insert_swap_outcome( + swap_id=1, + miner_hotkey='hk_a', + completed=True, + resolved_block=100, + tao_amount=5, + from_chain='tao', + to_chain='btc', + clearing_rate=0.0, + ) + # timed out (never excluded by completed=1) + store.insert_swap_outcome( + swap_id=2, + miner_hotkey='hk_a', + completed=False, + resolved_block=100, + tao_amount=5, + from_chain='tao', + to_chain='btc', + clearing_rate=0.0003, + ) + assert store.get_clearing_rates_by_direction_since(0, 'tao', 'btc') == [] + store.close() + + def test_direction_filter_lowercased(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.insert_swap_outcome( + swap_id=1, + miner_hotkey='hk_a', + completed=True, + resolved_block=100, + tao_amount=5, + from_chain='tao', + to_chain='btc', + clearing_rate=0.0002, + ) + store.insert_swap_outcome( + swap_id=2, + miner_hotkey='hk_a', + completed=True, + resolved_block=100, + tao_amount=5, + from_chain='btc', + to_chain='tao', + clearing_rate=300.0, + ) + # Query with uppercase → normalized to lowercase, matches the tao→btc row only. + rows = store.get_clearing_rates_by_direction_since(0, 'TAO', 'BTC') + assert len(rows) == 1 and np.isclose(rows[0][0], 0.0002) + store.close() + + +class TestEventWatcherPassesClearingRate: + """SwapCompleted resolves the swap's clearing rate into the outcome row.""" + + def test_swap_completed_persists_clearing_rate(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + swap = SimpleNamespace(rate='0.00022', from_chain='tao', to_chain='btc') + watcher.swap_tracker = SimpleNamespace(active={7: swap}, resolve=MagicMock()) + watcher.apply_event(100, 'SwapInitiated', {'swap_id': 7, 'miner': 'hk_a'}) + watcher.apply_event(200, 'SwapCompleted', {'swap_id': 7, 'miner': 'hk_a', 'tao_amount': 5}) + rows = store.get_clearing_rates_by_direction_since(0, 'tao', 'btc') + assert len(rows) == 1 and np.isclose(rows[0][0], 0.00022) + store.close() + + def test_unparseable_rate_falls_back_to_zero(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + swap = SimpleNamespace(rate='', from_chain='tao', to_chain='btc') + watcher.swap_tracker = SimpleNamespace(active={7: swap}, resolve=MagicMock()) + watcher.apply_event(100, 'SwapInitiated', {'swap_id': 7, 'miner': 'hk_a'}) + watcher.apply_event(200, 'SwapCompleted', {'swap_id': 7, 'miner': 'hk_a', 'tao_amount': 5}) + # Row written with clearing_rate 0 → excluded from the reference. + assert store.get_clearing_rates_by_direction_since(0, 'tao', 'btc') == [] + store.close() diff --git a/tests/test_state_store.py b/tests/test_state_store.py index 20384a3..af3ae49 100644 --- a/tests/test_state_store.py +++ b/tests/test_state_store.py @@ -113,6 +113,26 @@ def test_purge_without_current_block_fn_is_noop(self, tmp_path: Path): assert store.get_reservation_pin('miner-1') == PIN_SAMPLE1 store.close() + def test_get_expired_returns_only_expired_rows(self, tmp_path: Path): + store = ValidatorStateStore( + db_path=tmp_path / 'state.db', + current_block_fn=lambda: 1001, + ) + store.upsert_reservation_pin(PIN_SAMPLE1) # reserved_until=1000 → expired at 1001 + store.upsert_reservation_pin(PIN_SAMPLE2) # reserved_until=1005 → still live + + expired = store.get_expired_reservation_pins() + assert expired == [PIN_SAMPLE1] + # Read-only: the row is left for the caller to emit a pin-end before purging. + assert store.get_reservation_pin('miner-1') == PIN_SAMPLE1 + store.close() + + def test_get_expired_without_current_block_fn_is_empty(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.upsert_reservation_pin(PIN_SAMPLE1) + assert store.get_expired_reservation_pins() == [] + store.close() + def test_update_reserved_until_keeps_row_a_purge_would_drop(self, tmp_path: Path): """Regression: after the contract extends a reservation, refreshing the pin's reserved_until must keep it alive past its original TTL — exactly diff --git a/uv.lock b/uv.lock index 35bfa8e..cc106e3 100644 --- a/uv.lock +++ b/uv.lock @@ -168,7 +168,6 @@ version = "1.0.9" source = { editable = "." } dependencies = [ { name = "base58" }, - { name = "bech32" }, { name = "bitcoin-message-tool" }, { name = "bittensor" }, { name = "bittensor-cli" }, @@ -196,7 +195,6 @@ dev = [ [package.metadata] requires-dist = [ { name = "base58" }, - { name = "bech32" }, { name = "bitcoin-message-tool" }, { name = "bittensor", specifier = "==10.3.0" }, { name = "bittensor-cli", specifier = "==9.21.0" },