diff --git a/src/anthias_server/app/views.py b/src/anthias_server/app/views.py index a72ad911f..41ee1c7d1 100644 --- a/src/anthias_server/app/views.py +++ b/src/anthias_server/app/views.py @@ -1292,16 +1292,18 @@ def settings_backup(request: HttpRequest) -> HttpResponseBase: that build takes minutes on an SBC with a real content library. Browsers abort a request that has produced no bytes for ~5 minutes, so on devices like the reporter's Pi 3 "Get backup" - spun and then silently failed every time. stream_backup() puts + spun and then silently failed every time. astream_backup() puts the first tar block on the wire immediately and keeps bytes flowing for the whole build (and needs no staging space on the - SD card). The iterator is synchronous — Django adapts it under - ASGI via its thread executor, which is fine for an operation - this rare. + SD card). + + The iterator must be async: StreamingHttpResponse list()-buffers a + *sync* generator under ASGI (builds the whole archive before the + first byte), which silently brought the timeout back — issue #3073. """ filename = backup_helper.backup_archive_name(settings['player_name']) response = StreamingHttpResponse( - backup_helper.stream_backup(), + backup_helper.astream_backup(), content_type='application/x-tgz', ) # content_disposition_header() RFC-8187-escapes the filename — diff --git a/src/anthias_server/lib/backup_helper.py b/src/anthias_server/lib/backup_helper.py index 496223124..6fd8b3816 100644 --- a/src/anthias_server/lib/backup_helper.py +++ b/src/anthias_server/lib/backup_helper.py @@ -1,9 +1,11 @@ +import asyncio import logging import os import sys import tarfile import threading -from collections.abc import Generator +from collections.abc import AsyncGenerator, Generator +from concurrent.futures import ThreadPoolExecutor from datetime import datetime from os import getenv, makedirs, path, remove from typing import Any @@ -136,6 +138,61 @@ def produce() -> None: raise produce_error[0] +async def astream_backup() -> AsyncGenerator[bytes, None]: + """Async front-end to stream_backup() for the ASGI download view. + + StreamingHttpResponse only *streams* an asynchronous iterator under + ASGI. Handed a synchronous generator, Django's __aiter__ falls back + to ``await sync_to_async(list)(...)``, which drains the generator + whole — i.e. builds the entire archive (and buffers every chunk in + a RAM list) before the first response byte goes out. That silently + reintroduces the exact 0-bytes-then-timeout failure stream_backup() + was written to fix, and risks OOM on a 1 GB Pi with a multi-GB + library (issue #3073). Driving the sync generator one chunk at a + time off the event loop keeps bytes flowing as the tar is built and + the footprint flat. + + A single-worker executor serialises every touch of the sync + generator — both ``next()`` and the cleanup ``close()`` — onto one + thread. They therefore can never overlap: if the client disconnects + mid-``next()``, the queued ``close()`` runs only after that + ``next()`` returns, so we avoid ``ValueError: generator already + executing`` and the leaked producer thread that a cross-thread + close would cause. A dedicated executor (rather than Django's + shared sync pool) also keeps the long blocking pipe read from + wedging unrelated sync work. + """ + loop = asyncio.get_running_loop() + gen = stream_backup() + executor = ThreadPoolExecutor( + max_workers=1, thread_name_prefix='backup-stream-reader' + ) + + def next_chunk() -> bytes | None: + # next(gen, None) rather than bare next() so exhaustion returns + # a sentinel instead of raising StopIteration, which can't + # cross the executor boundary cleanly. stream_backup() only ever + # yields non-empty bytes, so None is an unambiguous end marker. + return next(gen, None) + + try: + while True: + chunk = await loop.run_in_executor(executor, next_chunk) + if chunk is None: + break + yield chunk + finally: + # On client disconnect Django aclose()s this generator. Closing + # the sync generator throws GeneratorExit into stream_backup at + # its yield, so its own finally joins the producer thread and + # closes the pipe (the producer's next write then hits + # BrokenPipeError and exits) — nothing is left taring. + try: + await loop.run_in_executor(executor, gen.close) + finally: + executor.shutdown(wait=False) + + def create_backup(name: str = default_archive_name) -> str: home = getenv('HOME') or '' archive_name = backup_archive_name(name) diff --git a/tests/test_backup_helper.py b/tests/test_backup_helper.py index 3f7f5ae75..160778482 100644 --- a/tests/test_backup_helper.py +++ b/tests/test_backup_helper.py @@ -1,3 +1,4 @@ +import asyncio import os import shutil import tarfile @@ -9,8 +10,10 @@ from unittest import mock import pytest +from django.http import StreamingHttpResponse from anthias_server.lib.backup_helper import ( + astream_backup, backup_archive_name, create_backup, recover, @@ -123,6 +126,75 @@ def test_stream_backup_stops_when_consumer_disconnects( assert not thread.is_alive() +def test_astream_backup_response_streams_under_asgi( + backup_home: str, +) -> None: + # Regression for issue #3073. StreamingHttpResponse only streams an + # *asynchronous* iterator under ASGI; handed a sync generator, + # Django's __aiter__ does `await sync_to_async(list)(...)` — it + # builds the whole archive in RAM before the first byte, which + # reproduced the original 0-bytes-then-timeout failure. The download + # view must wrap the producer in astream_backup() so Django takes + # its real streaming path (is_async == True) and round-trips back + # through recover() unchanged. + marker = path.join(backup_home, '.anthias', 'anthias.conf') + with open(marker, 'w') as f: + f.write('[viewer]\n') + + response = StreamingHttpResponse( + astream_backup(), content_type='application/x-tgz' + ) + # The crux of the fix: a sync generator would leave this False and + # send Django down the list()-buffering branch. + assert response.is_async is True + + async def drain() -> list[bytes]: + # aiter(response) is exactly what Django's ASGI handler consumes. + return [part async for part in aiter(response)] + + chunks = asyncio.run(drain()) + assert chunks + + os.makedirs(path.join(backup_home, static_dir), exist_ok=True) + file_path = path.join(backup_home, static_dir, 'astreamed.tar.gz') + with open(file_path, 'wb') as out_file: + out_file.write(b''.join(chunks)) + + with tarfile.open(file_path, 'r:gz') as tar: + names = tar.getnames() + assert '.anthias/anthias.conf' in names + + os.remove(marker) + recover(file_path) + assert path.isfile(marker) + + +def test_astream_backup_stops_producer_when_consumer_disconnects( + backup_home: str, +) -> None: + # A client that disconnects mid-download makes Django aclose() the + # async generator. Cleanup must stop the producer thread (and not + # raise) — a cross-thread close racing an in-flight next() would + # leave it taring forever (PR #3074 review). + marker = path.join(backup_home, '.anthias', 'anthias.conf') + with open(marker, 'w') as f: + f.write('[viewer]\n') + + async def take_one_then_disconnect() -> None: + agen = astream_backup() + first = await agen.__anext__() + assert first + await agen.aclose() # GeneratorExit cleanup path + + asyncio.run(take_one_then_disconnect()) + + main_thread = threading.main_thread() + for thread in threading.enumerate(): + if thread.name == 'backup-stream' and thread is not main_thread: + thread.join(timeout=5) + assert not thread.is_alive() + + @pytest.fixture def legacy_home() -> Iterator[str]: """Backups produced by pre-rename releases used `.screenly` and