|
| 1 | +"""Cryptographic primitives for HDP — Ed25519 signing/verification with RFC 8785 canonical JSON. |
| 2 | +
|
| 3 | +Matches the signing scheme in the TypeScript SDK (src/crypto/sign.ts + src/crypto/verify.ts): |
| 4 | + - Root: canonicalize({header, principal, scope}) → Ed25519 → base64url |
| 5 | + - Hop: canonicalize({chain: [...], root_sig: <value>}) → Ed25519 → base64url |
| 6 | +""" |
| 7 | + |
| 8 | +from __future__ import annotations |
| 9 | + |
| 10 | +import base64 |
| 11 | +from typing import Any |
| 12 | + |
| 13 | +import jcs |
| 14 | +from cryptography.exceptions import InvalidSignature |
| 15 | +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey |
| 16 | + |
| 17 | + |
| 18 | +def _b64url(sig_bytes: bytes) -> str: |
| 19 | + """Encode bytes as unpadded base64url (matches Buffer.toString('base64url') in Node).""" |
| 20 | + return base64.urlsafe_b64encode(sig_bytes).rstrip(b"=").decode() |
| 21 | + |
| 22 | + |
| 23 | +def _canonicalize(obj: Any) -> bytes: |
| 24 | + """RFC 8785 canonical JSON bytes.""" |
| 25 | + return jcs.canonicalize(obj) |
| 26 | + |
| 27 | + |
| 28 | +def sign_root(unsigned_token: dict, private_key_bytes: bytes, kid: str) -> dict: |
| 29 | + """Sign the root token over {header, principal, scope} and return a signature dict.""" |
| 30 | + subset = {f: unsigned_token[f] for f in ["header", "principal", "scope"] if f in unsigned_token} |
| 31 | + message = _canonicalize(subset) |
| 32 | + key = Ed25519PrivateKey.from_private_bytes(private_key_bytes) |
| 33 | + sig_bytes = key.sign(message) |
| 34 | + return { |
| 35 | + "alg": "Ed25519", |
| 36 | + "kid": kid, |
| 37 | + "value": _b64url(sig_bytes), |
| 38 | + "signed_fields": ["header", "principal", "scope"], |
| 39 | + } |
| 40 | + |
| 41 | + |
| 42 | +def sign_hop(cumulative_chain: list[dict], root_sig_value: str, private_key_bytes: bytes) -> str: |
| 43 | + """Sign a hop over the cumulative chain + root signature value.""" |
| 44 | + payload = {"chain": cumulative_chain, "root_sig": root_sig_value} |
| 45 | + message = _canonicalize(payload) |
| 46 | + key = Ed25519PrivateKey.from_private_bytes(private_key_bytes) |
| 47 | + sig_bytes = key.sign(message) |
| 48 | + return _b64url(sig_bytes) |
| 49 | + |
| 50 | + |
| 51 | +def _b64url_decode(s: str) -> bytes: |
| 52 | + """Decode unpadded base64url string to bytes.""" |
| 53 | + padding = 4 - len(s) % 4 |
| 54 | + return base64.urlsafe_b64decode(s + "=" * padding) |
| 55 | + |
| 56 | + |
| 57 | +def verify_root(token: dict, public_key: Ed25519PublicKey) -> bool: |
| 58 | + """Verify the root signature over {header, principal, scope}.""" |
| 59 | + try: |
| 60 | + subset = {f: token[f] for f in ["header", "principal", "scope"] if f in token} |
| 61 | + message = _canonicalize(subset) |
| 62 | + sig_bytes = _b64url_decode(token["signature"]["value"]) |
| 63 | + public_key.verify(sig_bytes, message) |
| 64 | + return True |
| 65 | + except (InvalidSignature, KeyError, Exception): |
| 66 | + return False |
| 67 | + |
| 68 | + |
| 69 | +def verify_hop(cumulative_chain: list[dict], root_sig_value: str, hop_signature: str, public_key: Ed25519PublicKey) -> bool: |
| 70 | + """Verify a single hop signature over the cumulative chain + root sig value.""" |
| 71 | + try: |
| 72 | + payload = {"chain": cumulative_chain, "root_sig": root_sig_value} |
| 73 | + message = _canonicalize(payload) |
| 74 | + sig_bytes = _b64url_decode(hop_signature) |
| 75 | + public_key.verify(sig_bytes, message) |
| 76 | + return True |
| 77 | + except (InvalidSignature, Exception): |
| 78 | + return False |
0 commit comments