From 33cc2a2916193be7584c9e806956fd1bf6a993b5 Mon Sep 17 00:00:00 2001 From: David Nashash Date: Tue, 9 Jun 2026 22:52:04 +0300 Subject: [PATCH] Fix Xtream cancellation handling in CI tests --- .../remote/xtream/OkHttpXtreamApiService.kt | 25 +++++++----- .../xtream/OkHttpXtreamApiServiceTest.kt | 38 +++++++++++-------- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/data/src/main/java/com/streamvault/data/remote/xtream/OkHttpXtreamApiService.kt b/data/src/main/java/com/streamvault/data/remote/xtream/OkHttpXtreamApiService.kt index 0c3fc29f..2ec166a6 100644 --- a/data/src/main/java/com/streamvault/data/remote/xtream/OkHttpXtreamApiService.kt +++ b/data/src/main/java/com/streamvault/data/remote/xtream/OkHttpXtreamApiService.kt @@ -29,8 +29,10 @@ import kotlinx.serialization.json.Json import kotlinx.serialization.json.decodeFromStream import kotlinx.serialization.SerializationException import okhttp3.Call +import okhttp3.Callback import okhttp3.OkHttpClient import okhttp3.Request +import okhttp3.Response import okhttp3.ResponseBody import java.net.URI import java.nio.charset.Charset @@ -376,18 +378,21 @@ class OkHttpXtreamApiService( continuation.invokeOnCancellation { call.cancel() } - try { - val response = call.execute() - if (continuation.isActive) { - continuation.resume(response) - } else { - response.close() + call.enqueue(object : Callback { + override fun onFailure(call: Call, e: IOException) { + if (continuation.isActive) { + continuation.resumeWithException(e) + } } - } catch (e: Exception) { - if (continuation.isActive) { - continuation.resumeWithException(e) + + override fun onResponse(call: Call, response: Response) { + if (continuation.isActive) { + continuation.resume(response) + } else { + response.close() + } } - } + }) } private inline fun decodeBodyBounded( diff --git a/data/src/test/java/com/streamvault/data/remote/xtream/OkHttpXtreamApiServiceTest.kt b/data/src/test/java/com/streamvault/data/remote/xtream/OkHttpXtreamApiServiceTest.kt index e06d3738..fd4dd62c 100644 --- a/data/src/test/java/com/streamvault/data/remote/xtream/OkHttpXtreamApiServiceTest.kt +++ b/data/src/test/java/com/streamvault/data/remote/xtream/OkHttpXtreamApiServiceTest.kt @@ -6,10 +6,15 @@ import com.streamvault.data.remote.http.HttpRequestProfile import com.streamvault.data.remote.dto.XtreamSeriesInfoResponse import java.io.IOException import java.net.SocketTimeoutException +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger -import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.withTimeout +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import kotlinx.coroutines.test.runTest import kotlinx.serialization.json.Json @@ -139,33 +144,36 @@ class OkHttpXtreamApiServiceTest { @Test fun `streamLiveStreamRows cancels the underlying call when coroutine times out`() = runTest { - val cancellationObserved = CompletableDeferred() + val requestStarted = CountDownLatch(1) + val cancellationObserved = CountDownLatch(1) val service = OkHttpXtreamApiService( client = OkHttpClient.Builder() .addInterceptor(Interceptor { chain -> + requestStarted.countDown() while (!chain.call().isCanceled()) { Thread.sleep(10) } - cancellationObserved.complete(Unit) + cancellationObserved.countDown() throw IOException("Canceled") }) .build(), json = json ) - val failure = runCatching { - withTimeout(100) { - service.streamLiveStreamRows( - "https://example.test/player_api.php?action=get_live_streams&category_id=7" - ) { } - } - }.exceptionOrNull() + val requestScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + val requestJob = requestScope.launch { + service.streamLiveStreamRows( + "https://example.test/player_api.php?action=get_live_streams&category_id=7" + ) { } + } + + assertThat(requestStarted.await(5, TimeUnit.SECONDS)).isTrue() + requestJob.cancelAndJoin() + requestScope.cancel() - assertThat(failure).isNotNull() + assertThat(requestJob.isCancelled).isTrue() withContext(Dispatchers.Default.limitedParallelism(1)) { - withTimeout(500) { - cancellationObserved.await() - } + assertThat(cancellationObserved.await(5, TimeUnit.SECONDS)).isTrue() } }