Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .claude/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"permissions": {
"allow": [
"Bash(python -c \"import sys; sys.path.insert\\(0,'src'\\); import relay.domain_fronter; import proxy.proxy_server; print\\('Full import OK'\\)\")"
]
}
}
7 changes: 7 additions & 0 deletions .claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"permissions": {
"allow": [
"Bash(python -c \"import sys; sys.path.insert\\(0,'src'\\); from relay.domain_fronter import DomainFronter; print\\('OK'\\)\")"
]
}
}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ domainfront_certs_*/

# Local scripts (excluded from git pushes)
scripts/
ideas.md
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,29 @@ Notes:
- `mode: "selective"` = only domains in `hosts` go through exit node.
- `psk` must exactly match your deployed exit node secret.

**Failover (multiple exit nodes):** Add a `urls` list to configure fallback exit nodes. The proxy monitors all of them with periodic health checks and automatically switches to the next available URL when one goes down:

```json
"exit_node": {
"enabled": true,
"provider": "cloudflare",
"url": "https://PRIMARY-WORKER.YOUR-SUBDOMAIN.workers.dev",
"urls": [
"https://PRIMARY-WORKER.YOUR-SUBDOMAIN.workers.dev",
"https://FALLBACK-WORKER.YOUR-SUBDOMAIN.workers.dev"
],
"psk": "CHANGE_ME_TO_A_STRONG_SECRET",
"mode": "full",
"health_check_interval": 30,
"health_check_failures_before_failover": 3
}
```

- `urls` — ordered list of exit node URLs; primary first, fallbacks after.
- `health_check_interval` — seconds between health checks (default: `30`).
- `health_check_failures_before_failover` — consecutive failures before a URL is marked dead and the next one is tried (default: `3`).
- When the primary URL recovers, the proxy automatically switches back to it.

Production recommendation:
- Keep `verify_ssl: true`
- Keep `listen_host: 127.0.0.1` unless LAN sharing is explicitly needed
Expand Down
5 changes: 4 additions & 1 deletion config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,14 @@
"enabled": false,
"provider": "cloudflare",
"url": "",
"urls": [],
"psk": "",
"mode": "full",
"hosts": [
"example.com",
"example.org"
]
],
"health_check_interval": 30,
"health_check_failures_before_failover": 3
}
}
2 changes: 1 addition & 1 deletion src/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from __future__ import annotations

# ── Version ───────────────────────────────────────────────────────────────
__version__ = "1.1.0"
__version__ = "1.2.0"


# ── Size caps ─────────────────────────────────────────────────────────────
Expand Down
199 changes: 197 additions & 2 deletions src/relay/domain_fronter.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,44 @@ def __init__(self, config: dict):
for h in (en_cfg.get("hosts") or [])
if h
)

# Health check & auto-failover state.
# _exit_node_urls holds the full ordered list (primary first, then fallbacks).
# _exit_node_url always points to the *currently active* URL and is updated
# on failover so the rest of the relay code needs no changes.
self._exit_node_urls: list[str] = self._build_exit_node_url_list(
self._exit_node_url, en_cfg,
)
# If url was empty but urls[] has entries, promote the first entry.
if not self._exit_node_url and self._exit_node_urls:
self._exit_node_url = self._exit_node_urls[0]
self._exit_node_failures: dict[str, int] = {} # url → consecutive failures
self._exit_node_dead_until: dict[str, float] = {} # url → cooldown timestamp
self._health_check_interval: float = max(
10.0, float(en_cfg.get("health_check_interval", 30)),
)
self._health_check_max_failures: int = max(
1, int(en_cfg.get("health_check_failures_before_failover", 3)),
)
self._exit_node_health_task: asyncio.Task | None = None

if self._exit_node_enabled and self._exit_node_url:
log.info(
"Exit node enabled [mode=%s, provider=%s]: %s",
self._exit_node_mode,
self._exit_node_provider,
self._exit_node_url,
)
if len(self._exit_node_urls) > 1:
log.info(
"Exit node failover pool: %d URLs (health-check every %.0fs, "
"failover after %d failures)",
len(self._exit_node_urls),
self._health_check_interval,
self._health_check_max_failures,
)
for i, u in enumerate(self._exit_node_urls):
log.info(" [%d] %s", i + 1, u)
elif self._exit_node_enabled:
log.warning(
"Exit node is enabled but no URL is configured for provider '%s'",
Expand Down Expand Up @@ -886,6 +917,12 @@ async def _warm_pool(self):
self._stats_task = self._spawn(self._stats_logger())
if self._execution_task is None:
self._execution_task = self._spawn(self._execution_logger())
# Exit node health checker — runs for any configured exit node so that
# a single dead URL can also recover automatically via the loop.
if (self._exit_node_enabled
and len(self._exit_node_urls) >= 1
and self._exit_node_health_task is None):
self._exit_node_health_task = self._spawn(self._exit_node_health_loop())
# Start H2 connection (runs alongside H1 pool)
if self._h2:
self._spawn(self._h2_connect_and_warm())
Expand Down Expand Up @@ -932,6 +969,7 @@ async def close(self):
self._stats_task = None
self._execution_task = None
self._keepalive_task = None
self._exit_node_health_task = None

await self._flush_pool()

Expand Down Expand Up @@ -1172,6 +1210,159 @@ def _pick_from(mapping: dict[str, object], *keys: str) -> str:
# Backward compatibility for older config format.
return _pick_from(en_cfg, "relay_url")

@staticmethod
def _build_exit_node_url_list(primary: str, en_cfg: dict) -> list[str]:
"""Return ordered list of exit node URLs: primary first, then fallbacks."""
urls: list[str] = []
seen: set[str] = set()
if primary:
urls.append(primary)
seen.add(primary)
for u in (en_cfg.get("urls") or []):
u = str(u).strip().rstrip("/")
if u and u not in seen:
urls.append(u)
seen.add(u)
return urls

def _is_exit_node_dead(self, url: str) -> bool:
"""True if url is in cooldown after repeated failures."""
deadline = self._exit_node_dead_until.get(url, 0.0)
if deadline <= time.time():
if deadline:
self._exit_node_dead_until.pop(url, None)
self._exit_node_failures.pop(url, None)
return False
return True

def _record_exit_node_success(self, url: str) -> None:
was_failed = bool(self._exit_node_failures.get(url, 0)
or self._exit_node_dead_until.get(url, 0.0))
self._exit_node_failures.pop(url, None)
self._exit_node_dead_until.pop(url, None)

if was_failed:
log.info("Exit node recovered: %s", url.split("//", 1)[-1][:70])

# Case 1: all nodes were down (_exit_node_url cleared) — restore with
# whichever URL responded first.
if not self._exit_node_url:
self._exit_node_url = url
log.info("Exit node restored after all-down: %s", url.split("//", 1)[-1][:60])
return

# Case 2: primary recovered while we were on a fallback — switch back.
if (self._exit_node_urls
and url == self._exit_node_urls[0]
and self._exit_node_url != self._exit_node_urls[0]):
self._exit_node_url = self._exit_node_urls[0]
log.info(
"Switched back to primary exit node: %s",
url.split("//", 1)[-1][:60],
)

def _record_exit_node_failure(self, url: str) -> None:
# Skip if already in cooldown — avoid inflated counts and log spam.
if self._exit_node_dead_until.get(url, 0.0) > time.time():
return
count = self._exit_node_failures.get(url, 0) + 1
self._exit_node_failures[url] = count
if count >= self._health_check_max_failures:
cooldown = self._health_check_interval * 2
self._exit_node_dead_until[url] = time.time() + cooldown
log.warning(
"Exit node marked dead for %.0fs after %d consecutive failures: %s",
cooldown, count, url[:70],
)
self._try_exit_node_failover(url)
else:
log.debug(
"Exit node failure %d/%d: %s",
count, self._health_check_max_failures, url[:70],
)

def _try_exit_node_failover(self, failed_url: str) -> None:
"""Switch _exit_node_url to the next alive URL in the pool."""
if len(self._exit_node_urls) <= 1:
# No fallback — clear active URL so _exit_node_matches returns False
# until the health loop restores it after recovery.
self._exit_node_url = ""
log.error(
"Exit node %s is down and no fallback URLs are configured. "
"Traffic falls back to Apps Script until recovery.",
failed_url[:70],
)
return
for url in self._exit_node_urls:
if url != failed_url and not self._is_exit_node_dead(url):
self._exit_node_url = url
log.warning(
"Exit node failover: %s → %s",
failed_url.split("//", 1)[-1][:50],
url.split("//", 1)[-1][:50],
)
return
# All URLs are dead — clear active so traffic bypasses silently.
self._exit_node_url = ""
log.error(
"All %d exit node URLs are down. Traffic falls back to Apps Script "
"until the health check restores a URL.",
len(self._exit_node_urls),
)

async def _ping_exit_node(self, url: str) -> bool:
"""Send a lightweight GET to url. Returns True if any HTTP response arrives."""
try:
parsed = urlparse(url)
host = parsed.hostname or ""
port = parsed.port or (443 if parsed.scheme == "https" else 80)
use_ssl = parsed.scheme == "https"
timeout = min(self._health_check_interval / 3, 10.0)
ctx = self._ssl_ctx() if use_ssl else None
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port, ssl=ctx),
timeout=timeout,
)
try:
writer.write(
f"GET / HTTP/1.1\r\nHost: {host}\r\n"
f"Connection: close\r\nUser-Agent: MasterHttpRelay/healthcheck\r\n\r\n"
.encode()
)
await asyncio.wait_for(writer.drain(), timeout=timeout)
line = await asyncio.wait_for(reader.readline(), timeout=timeout)
return line.startswith(b"HTTP/")
finally:
writer.close()
try:
await asyncio.wait_for(writer.wait_closed(), timeout=2.0)
except Exception:
pass
except Exception as exc:
log.debug("Exit node ping failed (%s): %s", url.split("//", 1)[-1][:50], exc)
return False

async def _exit_node_health_loop(self) -> None:
"""Background task: periodically ping all exit node URLs and failover as needed."""
# Initial delay so the first check doesn't race with startup.
await asyncio.sleep(self._health_check_interval)
while True:
try:
for url in list(self._exit_node_urls):
alive = await self._ping_exit_node(url)
if alive:
self._record_exit_node_success(url)
else:
self._record_exit_node_failure(url)
except asyncio.CancelledError:
break
except Exception as exc:
log.debug("Exit node health loop error: %s", exc)
try:
await asyncio.sleep(self._health_check_interval)
except asyncio.CancelledError:
break

def _exit_node_matches(self, url: str) -> bool:
"""Return True if this URL should be routed through the exit node."""
if not self._exit_node_enabled or not self._exit_node_url:
Expand Down Expand Up @@ -1203,6 +1394,7 @@ async def _relay_via_exit_node(self, payload: dict) -> bytes:
body of the outer Apps Script relay call, so Apps Script POSTs it to
the exit node URL on our behalf.
"""
active_url = self._exit_node_url
# Build inner payload: what the exit node will execute
inner = dict(payload)
inner["k"] = self._exit_node_psk
Expand All @@ -1212,7 +1404,7 @@ async def _relay_via_exit_node(self, payload: dict) -> bytes:
# Apps Script does: UrlFetchApp.fetch(exit_node_url, { method: "POST", payload: inner_json })
outer = self._build_payload(
"POST",
self._exit_node_url,
active_url,
{"Content-Type": "application/json"},
inner_json,
)
Expand All @@ -1221,7 +1413,7 @@ async def _relay_via_exit_node(self, payload: dict) -> bytes:

log.debug(
"Exit node chain: Apps Script → %s → %s",
self._exit_node_url.split("//", 1)[-1][:50],
active_url.split("//", 1)[-1][:50],
payload.get("u", "")[:60],
)

Expand All @@ -1237,6 +1429,7 @@ async def _relay_via_exit_node(self, payload: dict) -> bytes:
_, _, apps_script_body = split_raw_response(raw)
result = parse_relay_response(apps_script_body, self._max_response_body_bytes)
log.debug("Exit node relay OK: %s", payload.get("u", "")[:80])
self._record_exit_node_success(active_url)
return result

# ── Apps Script relay (apps_script mode) ──────────────────────
Expand Down Expand Up @@ -1277,13 +1470,15 @@ async def relay(self, method: str, url: str,
if self._exit_node_matches(url):
t0 = time.perf_counter()
errored = False
_active_en_url = self._exit_node_url
try:
return await asyncio.wait_for(
self._relay_via_exit_node(payload),
timeout=self._relay_timeout + self._tls_connect_timeout,
)
except Exception as exc:
errored = True
self._record_exit_node_failure(_active_en_url)
log.warning(
"Exit node failed for %s (%s: %s), falling back to Apps Script",
url[:60], type(exc).__name__, exc,
Expand Down