proxy: add per-model ackTimeout keep-alive for slow upstream responses#735
proxy: add per-model ackTimeout keep-alive for slow upstream responses#735Ales999 wants to merge 5 commits into
Conversation
Add an optional ackTimeout field to model config that sends a keep-alive
heartbeat to the client when the upstream inference server takes longer
than the configured timeout to respond. This prevents client-side
timeouts (e.g. Kilo Code task timeouts) during long generation or startup
sessions.
Key changes:
- Add AckTimeout int field to ModelConfig with YAML tag "ackTimeout"
- Validate ackTimeout >= 0 in LoadConfigFromReader
- Background goroutine in Process.ProxyRequest creates a per-model timer
- Uses atomic.CompareAndSwapUint32 to ensure only one heartbeat is sent
per model instance, preventing duplicate ACKs across concurrent requests
- Heartbeat is an SSE comment (: heart-beat) which standard clients ignore
but keeps the connection alive and prevents proxy/browser timeouts
- Timer goroutine respects r.Context().Done() for graceful cancellation
- Add ackTimeout to JSON schema (minimum: 0, default: 0 = disabled)
Example config:
models:
my-slow-model:
cmd: llama-server --model /path/to/model.gguf
ackTimeout: 110 # send heartbeat after 110s of upstream silence
|
Caution Review failedFailed to post review comments Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds two per-model config options, ChangesStreaming keep‑alive
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 3❌ Failed checks (3 warnings)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@proxy/process.go`:
- Around line 80-82: The ackSentFlag on the Process struct is shared across all
requests and never reset, causing the first timed-out request to permanently
disable future heartbeats; remove ackSentFlag from the Process struct and
replace it with a per-request flag (e.g., a local uint32 in the request-handling
function or attached to the request context) that you atomically set/read where
currently using ackSentFlag, ensuring it is initialized to 0 for each new
request and not reused across concurrent requests; update all usages (places
referencing ackSentFlag alongside ackTimeout and the methods that send the
heartbeat/ack) to use the new per-request variable so concurrent slow requests
each manage their own ack state.
- Around line 640-663: The goroutine currently writes headers/status/body to the
original ResponseWriter (w) before the upstream response exists, which commits a
200 and can corrupt non-streaming responses; change it to never modify or write
to w before the reverseProxy response is started: in the ackTimeout goroutine
(the function that references p.ackTimeout, p.ackSentFlag, proxyLogger,
requestBeginTime, r, w) remove the calls to w.Header().Set, w.WriteHeader,
fmt.Fprintf and any Flush; instead only atomically set p.ackSentFlag and log the
heartbeat (keep the Debugf), and if you need true heartbeat bytes implement it
in a streaming-specific code path (e.g., only write when a response-wrapping
ResponseWriter indicates headers have already been written or use a dedicated
streaming/hijack mechanism) so no concurrent/early writes occur before
reverseProxy.ServeHTTP runs.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 7c114581-ab9d-4ebb-94f4-54e7e113e8b6
📒 Files selected for processing (4)
config-schema.jsonproxy/config/config.goproxy/config/model_config.goproxy/process.go
|
Yes, I'll work on it a little more... Task is continue after 110 sec and kilocode not restart this task after 10 secs. |
Implements periodic keep-alive heartbeats when an upstream response is slow, preventing client timeouts during extended processing or generation sessions. This logic uses `ackTimeout` to determine the heartbeat frequency, sending HTTP 200 responses to the client as necessary. Refactors the background goroutine from a single timer-based mechanism to a ticker-based approach for continuous heartbeats (mostlygeek#726)
There was a problem hiding this comment.
♻️ Duplicate comments (1)
proxy/process.go (1)
639-670:⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy liftCritical: Concurrent writes to ResponseWriter cause data races and corrupt responses.
This implementation still has the issues flagged in the previous review:
Data race: The goroutine writes headers/body to
wwhilereverseProxy.ServeHTTP(w, r)(line 677) runs concurrently — both writing to the sameResponseWriterwithout synchronization.Premature response commitment: Writing
WriteHeader(http.StatusOK)before the upstream responds forces HTTP 200 regardless of actual upstream status. Non-streaming or error responses are corrupted.No termination signal: The goroutine has no way to know when the reverse proxy starts sending the actual response. It only exits on context cancellation, not when data begins flowing.
Repeated header operations:
Header().Set()andWriteHeader()are called every tick — ineffective after the first write and indicates the design needs rethinking.To fix this properly, consider a coordinated approach where:
- A custom
ResponseWriterwrapper intercepts the first upstream write- The wrapper signals the heartbeat goroutine to stop via a channel/context before any upstream data is written
- All writes go through the wrapper to ensure mutual exclusion
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proxy/process.go` around lines 639 - 670, The heartbeat goroutine is concurrently writing to the original ResponseWriter (w) causing races and premature committed responses; fix by replacing direct writes with a synchronized wrapper: implement a custom responseWriter wrapper type (e.g., proxyResponseWriter) that embeds http.ResponseWriter and overrides Header(), WriteHeader(), and Write() to acquire a mutex and detect the first write; have the wrapper expose a channel or context-cancel function (e.g., firstWriteCh or stopHeartbeat()) that the wrapper closes/signals when the first upstream Write or WriteHeader occurs; change the heartbeat goroutine (spawned when p.ackTimeout > 0) to accept the wrapper and stop sending heartbeats when it receives that signal instead of writing to the raw w, and ensure reverseProxy.ServeHTTP is called with the wrapper so all writes are funneled through the mutex-protected methods to avoid concurrent writes and prevent premature WriteHeader calls.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@proxy/process.go`:
- Around line 639-670: The heartbeat goroutine is concurrently writing to the
original ResponseWriter (w) causing races and premature committed responses; fix
by replacing direct writes with a synchronized wrapper: implement a custom
responseWriter wrapper type (e.g., proxyResponseWriter) that embeds
http.ResponseWriter and overrides Header(), WriteHeader(), and Write() to
acquire a mutex and detect the first write; have the wrapper expose a channel or
context-cancel function (e.g., firstWriteCh or stopHeartbeat()) that the wrapper
closes/signals when the first upstream Write or WriteHeader occurs; change the
heartbeat goroutine (spawned when p.ackTimeout > 0) to accept the wrapper and
stop sending heartbeats when it receives that signal instead of writing to the
raw w, and ensure reverseProxy.ServeHTTP is called with the wrapper so all
writes are funneled through the mutex-protected methods to avoid concurrent
writes and prevent premature WriteHeader calls.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 0dfe116c-e93d-4d8a-b020-6dfe0cad86a0
📒 Files selected for processing (1)
proxy/process.go
The ackTimeout heartbeat goroutine was writing directly to http.ResponseWriter while reverseProxy.ServeHTTP did the same, causing data races and potential premature response commits. - Add proxyResponseWriter type wrapping http.ResponseWriter with mutex-based synchronization for all write operations (Header, WriteHeader, Write, Flush) - Use firstWriteCh channel to signal heartbeat goroutine when upstream starts writing, allowing it to stop sending heartbeats automatically - Route all response writes through the synchronized wrapper: heartbeat, reverseProxy.ServeHTTP, testHandler, and statusResponseWriter fixes race condition described in PR review of proxy/process.go
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
proxy/process.go (1)
647-674:⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy liftHeartbeat path can pre-commit an incorrect response before upstream responds.
At Line 668–673, the goroutine writes SSE headers +
200+ body beforereverseProxy.ServeHTTP(...)runs. That can overwrite intended upstream status/headers and corrupt non-SSE payloads.#!/bin/bash # Verify heartbeat writes happen before reverse proxy serving in the same handler path. rg -n -C4 'if p\.ackTimeout > 0|WriteHeader\(http\.StatusOK\)|fmt\.Fprintf\(prw, ": heart-beat' proxy/process.go rg -n -C3 'p\.testHandler\.ServeHTTP\(prw, r\)|p\.reverseProxy\.ServeHTTP\(srw, r\)|p\.reverseProxy\.ServeHTTP\(prw, r\)' proxy/process.go
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@proxy/process.go`:
- Around line 643-675: The Go file fails gofmt checks due to misformatted
goroutine/for/select block around the ackTimeout heartbeat logic; run gofmt -w
on this file to fix formatting (ensure the goroutine that uses p.ackTimeout,
tickInterval, the for range over ticker.C, the select handling prw.firstWriteCh
and r.Context().Done(), and the prw.Header()/prw.WriteHeader()/fmt.Fprintf(prw,
...) / prw.Flush() lines are properly indented and braces aligned), then re-run
gofmt -l . to confirm no files are reported.
- Around line 648-651: The heartbeat ticker is currently set to half of
p.ackTimeout (tickInterval := p.ackTimeout / 2), causing the first heartbeat to
fire too early; change the ticker interval logic in the goroutine that creates
ticker to use p.ackTimeout (or otherwise ensure the first tick occurs after
p.ackTimeout) so that the first and subsequent heartbeats honor the configured
ackTimeout; update references to tickInterval, ticker, and the anonymous
goroutine accordingly to reflect the corrected interval.
- Around line 793-797: The Header() method on proxyResponseWriter returns the
live http.Header after releasing p.mu, causing races with concurrent
WriteHeader() and other goroutines; change Header() to return a deep copy/clone
of p.writer.Header() (copy each header slice) instead of the live map so callers
mutate the copy, or alternatively provide and update usage to a locked setter
(e.g., a SetHeader/AddHeader method on proxyResponseWriter that takes
p.mu.Lock/p.mu.Unlock) and replace direct Header().Set usages; ensure changes
reference proxyResponseWriter.Header, p.writer, p.mu, and WriteHeader so all
header mutations are either performed on the cloned map or through the new
locked accessor to eliminate the race.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: bb657c4b-6d04-4f1d-bb3e-f504777a5fd9
📒 Files selected for processing (1)
proxy/process.go
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
proxy/process.go (2)
648-651:⚠️ Potential issue | 🟠 Major | ⚡ Quick winHeartbeat ticker fires at
ackTimeout/2, not atackTimeout.
tickInterval := p.ackTimeout / 2means the first tick — and every subsequent tick — fires at half the configured value. A user who setsackTimeout: 110expects the keep-alive after 110 s but receives it after 55 s.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proxy/process.go` around lines 648 - 651, The heartbeat ticker currently uses tickInterval := p.ackTimeout / 2 which fires at half the configured ackTimeout; change the ticker interval to use the actual ack timeout value (e.g., set tickInterval := p.ackTimeout) so the Ticker created by ticker := time.NewTicker(tickInterval) fires at the expected p.ackTimeout; update any related comments or tests referencing the old behavior and ensure p.ackTimeout is a time.Duration when used here.
793-797:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
Header()returns the live map without lock protection on subsequent mutations.
Header()acquiresRLockonly to readp.writer, then returns the livehttp.Headermap before releasing the lock. The.Set()calls at lines 668–670 (heartbeat goroutine) and 905–907 (newStatusResponseWriter) execute concurrently withWriteHeader()(which holds an exclusiveLock), creating an unguarded data race on the underlying map.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proxy/process.go` around lines 793 - 797, Header() currently returns the live http.Header without protecting subsequent mutations, causing data races; change Header() to return a deep copy (clone) of p.writer.Header() while holding the lock, and add/convert callers to use new locked setters (e.g., proxyResponseWriter.SetHeader(key, value) and DelHeader(key)) that take p.mu.Lock/p.mu.Unlock and call p.writer.Header().Set/Del, updating the heartbeat goroutine (the .Set calls around lines 668–670) and newStatusResponseWriter header mutations (around lines 905–907) to use these setter methods instead of calling Header().Set concurrently; ensure WriteHeader() continues to use the exclusive Lock as-is.
🧹 Nitpick comments (1)
proxy/process.go (1)
570-596: 💤 Low value
proxyResponseWritercreation andsrwwiring look correct.Worth noting: when
srw != nil,newStatusResponseWriterimmediately callsWriteHeader(200)onprw, which closesprw.firstWriteCh. The heartbeat goroutine (started later at line 649) will therefore exit on its very first tick, makingackTimeouta no-op in the loading-state path. This is the correct behavior — the client is already receiving updates — but it means the goroutine is unconditionally started and immediately exits in that path. Consider gating the goroutine start onsrw == nilto avoid the unnecessary spawn.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@proxy/process.go` around lines 570 - 596, The heartbeat goroutine is always started even when srw != nil (i.e., newStatusResponseWriter already called prw.WriteHeader and client is streaming), causing an unnecessary immediate-exit goroutine; change the logic that starts the heartbeat goroutine to only spawn it when srw == nil (keep existing behavior for proxyResponseWriter/prw and when no statusResponseWriter is used), so locate the code that starts the heartbeat goroutine (the goroutine started after creating prw/newProxyResponseWriter and srw/newStatusResponseWriter) and wrap its creation in a conditional that checks srw == nil before starting.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@proxy/process.go`:
- Around line 799-807: The WriteHeader method on proxyResponseWriter calls
p.writer.WriteHeader(code) unconditionally, causing superfluous WriteHeader
calls after firstWritten is set by the heartbeat; update WriteHeader so that the
call to p.writer.WriteHeader(code) is performed only inside the existing if
!p.firstWritten guard (after setting firstWritten=true and closing firstWriteCh)
and skipped on subsequent calls, ensuring only the first WriteHeader delegates
to p.writer.WriteHeader and avoiding redundant writes.
- Around line 647-676: The heartbeat goroutine is firing for non-streaming
requests and corrupting responses; gate it on the request's isStreaming flag
instead of running unconditionally. Before launching the goroutine (the block
that checks p.ackTimeout > 0 and uses prw.firstWriteCh, prw.Header,
prw.WriteHeader, fmt.Fprintf, prw.Flush), check the resolved isStreaming value
from the request context and only start the ticker if isStreaming is true;
additionally, inside the goroutine double-check isStreaming (or short-circuit)
before writing headers/body to ensure no SSE headers or ": heart-beat" lines are
sent for non-streaming handlers.
---
Duplicate comments:
In `@proxy/process.go`:
- Around line 648-651: The heartbeat ticker currently uses tickInterval :=
p.ackTimeout / 2 which fires at half the configured ackTimeout; change the
ticker interval to use the actual ack timeout value (e.g., set tickInterval :=
p.ackTimeout) so the Ticker created by ticker := time.NewTicker(tickInterval)
fires at the expected p.ackTimeout; update any related comments or tests
referencing the old behavior and ensure p.ackTimeout is a time.Duration when
used here.
- Around line 793-797: Header() currently returns the live http.Header without
protecting subsequent mutations, causing data races; change Header() to return a
deep copy (clone) of p.writer.Header() while holding the lock, and add/convert
callers to use new locked setters (e.g., proxyResponseWriter.SetHeader(key,
value) and DelHeader(key)) that take p.mu.Lock/p.mu.Unlock and call
p.writer.Header().Set/Del, updating the heartbeat goroutine (the .Set calls
around lines 668–670) and newStatusResponseWriter header mutations (around lines
905–907) to use these setter methods instead of calling Header().Set
concurrently; ensure WriteHeader() continues to use the exclusive Lock as-is.
---
Nitpick comments:
In `@proxy/process.go`:
- Around line 570-596: The heartbeat goroutine is always started even when srw
!= nil (i.e., newStatusResponseWriter already called prw.WriteHeader and client
is streaming), causing an unnecessary immediate-exit goroutine; change the logic
that starts the heartbeat goroutine to only spawn it when srw == nil (keep
existing behavior for proxyResponseWriter/prw and when no statusResponseWriter
is used), so locate the code that starts the heartbeat goroutine (the goroutine
started after creating prw/newProxyResponseWriter and
srw/newStatusResponseWriter) and wrap its creation in a conditional that checks
srw == nil before starting.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: bd25a890-93c3-4fba-ba07-b92d88779979
📒 Files selected for processing (1)
proxy/process.go
…y handling Refactor the keep-alive heartbeat feature (issue mostlygeek#726): - Rename AckTimeout config field to HeartbeatInterval across all files - Enforce minimum 10 second interval for heartbeat - Replace proxyResponseWriter with lighter responseSyncer using atomic.Bool - Add FlushUnlocked to prevent deadlock during concurrent flush operations - Activate heartbeat only for streaming requests (isStreaming check moved earlier) - Use closeCh signal instead of firstWriteCh for upstream started detection - Wrap statusResponseWriter in responseSyncer when SendLoadingState is enabled
Add an optional ackTimeout field to model config that sends a keep-alive heartbeat to the client when the upstream inference server takes longer than the configured timeout to respond. This prevents client-side timeouts (e.g. Kilo Code task timeouts) during long generation or startup sessions.
Key changes:
Example config:
models:
my-slow-model:
cmd: llama-server --model /path/to/model.gguf
ackTimeout: 110 # send heartbeat after 110s of upstream silence
Summary
Add per-model
ackTimeoutconfiguration to send a keep-alive heartbeat when the upstream inference server takes longer than expected to respond. This prevents client-side timeouts during long generation or model startup sessions.AckTimeoutfield toModelConfig(YAML:ackTimeout)Process.ProxyRequestsends SSE heartbeat after timeout elapsesackTimeout >= 0, default0(disabled)