Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/anthias_server/app/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,16 +1292,18 @@ def settings_backup(request: HttpRequest) -> HttpResponseBase:
that build takes minutes on an SBC with a real content library.
Browsers abort a request that has produced no bytes for ~5
minutes, so on devices like the reporter's Pi 3 "Get backup"
spun and then silently failed every time. stream_backup() puts
spun and then silently failed every time. astream_backup() puts
the first tar block on the wire immediately and keeps bytes
flowing for the whole build (and needs no staging space on the
SD card). The iterator is synchronous — Django adapts it under
ASGI via its thread executor, which is fine for an operation
this rare.
SD card).

The iterator must be async: StreamingHttpResponse list()-buffers a
*sync* generator under ASGI (builds the whole archive before the
first byte), which silently brought the timeout back — issue #3073.
"""
filename = backup_helper.backup_archive_name(settings['player_name'])
response = StreamingHttpResponse(
backup_helper.stream_backup(),
backup_helper.astream_backup(),
content_type='application/x-tgz',
)
# content_disposition_header() RFC-8187-escapes the filename —
Expand Down
59 changes: 58 additions & 1 deletion src/anthias_server/lib/backup_helper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import logging
import os
import sys
import tarfile
import threading
from collections.abc import Generator
from collections.abc import AsyncGenerator, Generator
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from os import getenv, makedirs, path, remove
from typing import Any
Expand Down Expand Up @@ -136,6 +138,61 @@ def produce() -> None:
raise produce_error[0]


async def astream_backup() -> AsyncGenerator[bytes, None]:
"""Async front-end to stream_backup() for the ASGI download view.

StreamingHttpResponse only *streams* an asynchronous iterator under
ASGI. Handed a synchronous generator, Django's __aiter__ falls back
to ``await sync_to_async(list)(...)``, which drains the generator
whole — i.e. builds the entire archive (and buffers every chunk in
a RAM list) before the first response byte goes out. That silently
reintroduces the exact 0-bytes-then-timeout failure stream_backup()
was written to fix, and risks OOM on a 1 GB Pi with a multi-GB
library (issue #3073). Driving the sync generator one chunk at a
time off the event loop keeps bytes flowing as the tar is built and
the footprint flat.

A single-worker executor serialises every touch of the sync
generator — both ``next()`` and the cleanup ``close()`` — onto one
thread. They therefore can never overlap: if the client disconnects
mid-``next()``, the queued ``close()`` runs only after that
``next()`` returns, so we avoid ``ValueError: generator already
executing`` and the leaked producer thread that a cross-thread
close would cause. A dedicated executor (rather than Django's
shared sync pool) also keeps the long blocking pipe read from
wedging unrelated sync work.
"""
loop = asyncio.get_running_loop()
gen = stream_backup()
executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix='backup-stream-reader'
)

def next_chunk() -> bytes | None:
# next(gen, None) rather than bare next() so exhaustion returns
# a sentinel instead of raising StopIteration, which can't
# cross the executor boundary cleanly. stream_backup() only ever
# yields non-empty bytes, so None is an unambiguous end marker.
return next(gen, None)

try:
while True:
chunk = await loop.run_in_executor(executor, next_chunk)
if chunk is None:
break
yield chunk
finally:
# On client disconnect Django aclose()s this generator. Closing
# the sync generator throws GeneratorExit into stream_backup at
# its yield, so its own finally joins the producer thread and
# closes the pipe (the producer's next write then hits
# BrokenPipeError and exits) — nothing is left taring.
try:
await loop.run_in_executor(executor, gen.close)
finally:
executor.shutdown(wait=False)


def create_backup(name: str = default_archive_name) -> str:
home = getenv('HOME') or ''
archive_name = backup_archive_name(name)
Expand Down
72 changes: 72 additions & 0 deletions tests/test_backup_helper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import shutil
import tarfile
Expand All @@ -9,8 +10,10 @@
from unittest import mock

import pytest
from django.http import StreamingHttpResponse

from anthias_server.lib.backup_helper import (
astream_backup,
backup_archive_name,
create_backup,
recover,
Expand Down Expand Up @@ -123,6 +126,75 @@ def test_stream_backup_stops_when_consumer_disconnects(
assert not thread.is_alive()


def test_astream_backup_response_streams_under_asgi(
backup_home: str,
) -> None:
# Regression for issue #3073. StreamingHttpResponse only streams an
# *asynchronous* iterator under ASGI; handed a sync generator,
# Django's __aiter__ does `await sync_to_async(list)(...)` — it
# builds the whole archive in RAM before the first byte, which
# reproduced the original 0-bytes-then-timeout failure. The download
# view must wrap the producer in astream_backup() so Django takes
# its real streaming path (is_async == True) and round-trips back
# through recover() unchanged.
marker = path.join(backup_home, '.anthias', 'anthias.conf')
with open(marker, 'w') as f:
f.write('[viewer]\n')

response = StreamingHttpResponse(
astream_backup(), content_type='application/x-tgz'
)
# The crux of the fix: a sync generator would leave this False and
# send Django down the list()-buffering branch.
assert response.is_async is True

async def drain() -> list[bytes]:
# aiter(response) is exactly what Django's ASGI handler consumes.
return [part async for part in aiter(response)]

chunks = asyncio.run(drain())
assert chunks

os.makedirs(path.join(backup_home, static_dir), exist_ok=True)
file_path = path.join(backup_home, static_dir, 'astreamed.tar.gz')
with open(file_path, 'wb') as out_file:
out_file.write(b''.join(chunks))

with tarfile.open(file_path, 'r:gz') as tar:
names = tar.getnames()
assert '.anthias/anthias.conf' in names

os.remove(marker)
recover(file_path)
assert path.isfile(marker)


def test_astream_backup_stops_producer_when_consumer_disconnects(
backup_home: str,
) -> None:
# A client that disconnects mid-download makes Django aclose() the
# async generator. Cleanup must stop the producer thread (and not
# raise) — a cross-thread close racing an in-flight next() would
# leave it taring forever (PR #3074 review).
marker = path.join(backup_home, '.anthias', 'anthias.conf')
with open(marker, 'w') as f:
f.write('[viewer]\n')

async def take_one_then_disconnect() -> None:
agen = astream_backup()
first = await agen.__anext__()
assert first
await agen.aclose() # GeneratorExit cleanup path

asyncio.run(take_one_then_disconnect())

main_thread = threading.main_thread()
for thread in threading.enumerate():
if thread.name == 'backup-stream' and thread is not main_thread:
thread.join(timeout=5)
assert not thread.is_alive()


@pytest.fixture
def legacy_home() -> Iterator[str]:
"""Backups produced by pre-rename releases used `.screenly` and
Expand Down