Skip to content

Commit 1a3a0bf

Browse files
committed
fix: harden ARQ cancellation and document upload retries
- enable ARQ job aborts via WorkerSettings - add explicit Telegram document read/write timeouts - stop network-failed document sends from retrying forever
1 parent 3003f93 commit 1a3a0bf

File tree

4 files changed

+29
-12
lines changed

4 files changed

+29
-12
lines changed

dumpyarabot/arq_config.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ def get_functions(cls):
5656
# Retry configuration
5757
max_tries = 3
5858

59-
# Health check
60-
health_check_interval = 30
61-
62-
# Logging
63-
log_results = True
59+
# Health check
60+
health_check_interval = 30
61+
allow_abort_jobs = True
62+
63+
# Logging
64+
log_results = True
6465

6566
# Result TTL configuration (seconds)
6667
result_ttl = {

dumpyarabot/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ class Settings(BaseSettings):
2121

2222
# Telegram formatting configuration
2323
DEFAULT_PARSE_MODE: str = "Markdown"
24+
TELEGRAM_DOCUMENT_READ_TIMEOUT: float = 120.0
25+
TELEGRAM_DOCUMENT_WRITE_TIMEOUT: float = 120.0
2426

2527
# Optional custom base URL for Telegram Bot API (e.g. nginx reverse proxy)
2628
# Default: https://api.telegram.org/bot

dumpyarabot/message_queue.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,8 @@ async def _process_message(self, message: QueuedMessage) -> bool:
451451
),
452452
caption=message.caption,
453453
parse_mode=message.parse_mode,
454+
read_timeout=settings.TELEGRAM_DOCUMENT_READ_TIMEOUT,
455+
write_timeout=settings.TELEGRAM_DOCUMENT_WRITE_TIMEOUT,
454456
)
455457
console.print(f"[green]Successfully processed {message.type.value} message[/green]")
456458
return True
@@ -510,13 +512,24 @@ async def _process_message(self, message: QueuedMessage) -> bool:
510512
await self._requeue_message(message)
511513
return True # Don't increment retry count for rate limits
512514

513-
except NetworkError as e:
514-
console.print(f"[yellow]Network error processing message: {e}[/yellow]")
515-
# Simple retry with exponential backoff for network issues
516-
retry_delay = min(30 * (2 ** message.retry_count), 300) # 30s, 60s, 120s, 240s, 300s max
517-
message.scheduled_for = datetime.now(timezone.utc) + timedelta(seconds=retry_delay)
518-
await self._requeue_message(message)
519-
return True # Don't increment retry count for network issues
515+
except NetworkError as e:
516+
console.print(f"[yellow]Network error processing message: {e}[/yellow]")
517+
message.retry_count += 1
518+
if message.retry_count <= message.max_retries:
519+
retry_delay = min(30 * (2 ** (message.retry_count - 1)), 300)
520+
console.print(
521+
f"[yellow]Retrying message {message.message_id} after network error "
522+
f"(attempt {message.retry_count}/{message.max_retries}) in {retry_delay}s[/yellow]"
523+
)
524+
message.scheduled_for = datetime.now(timezone.utc) + timedelta(seconds=retry_delay)
525+
await self._requeue_message(message)
526+
else:
527+
console.print(
528+
f"[red]Message {message.message_id} exceeded max retries after network errors, "
529+
f"moving to dead letter queue[/red]"
530+
)
531+
await self._move_to_dead_letter_queue(message)
532+
return True
520533

521534
except TelegramError as e:
522535
console.print(f"[red]Telegram API error processing message: {e}[/red]")

run_arq_worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ async def start_worker(self):
4848
keep_result=WorkerSettings.keep_result,
4949
max_tries=WorkerSettings.max_tries,
5050
health_check_interval=WorkerSettings.health_check_interval,
51+
allow_abort_jobs=WorkerSettings.allow_abort_jobs,
5152
queue_name=WorkerSettings.queue_name
5253
)
5354

0 commit comments

Comments
 (0)