Skip to content

Commit 085c802

Browse files
committed
fix: Brotli stream corruption via concurrent writes to compressor
- **Introduced a per-tab “single writer” actor** (a dedicated virtual thread) that *owns*: - the http-kit `AsyncChannel` - the tab’s streaming Brotli state (`br-out` + `br-stream`, when compression is enabled) - Changed `send-sse!` from “compress + `http/send!` right now” to **“enqueue message and return immediately”**: - Messages go into a `LinkedBlockingQueue` - The actor thread drains the queue and performs the actual compression + network writes sequentially - Ensured the SSE response headers are sent **exactly once**: - The actor’s *first* send writes the `{:headers ... :body ...}` map (and includes `Content-Encoding: br` when applicable) - Subsequent sends are raw chunks on the same connection - Made teardown safer: - `unregister-sse-channel!` now **signals the actor to close via a sentinel**, so the actor closes the Brotli stream/channel on its own thread (avoiding “close while writing” races) - Re-registering a tab now stops any previous writer first (reconnect safety) - Updated/added tests to match and validate the new behavior: - Tests now account for the async actor (latches/timeouts) - Added coverage that `send-sse!` **doesn’t block on Brotli compression** and that Brotli compression is **never invoked concurrently** for a tab.
1 parent 8a623bf commit 085c802

4 files changed

Lines changed: 395 additions & 71 deletions

File tree

src/hyper/render.clj

Lines changed: 232 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,32 +12,243 @@
1212
[hyper.state :as state]
1313
[hyper.utils :as utils]
1414
[org.httpkit.server :as http]
15-
[taoensso.telemere :as t]))
15+
[taoensso.telemere :as t])
16+
(:import (java.util.concurrent BlockingQueue LinkedBlockingQueue)))
17+
18+
;; ---------------------------------------------------------------------------
19+
;; SSE writer actor (single-writer per tab)
20+
;; ---------------------------------------------------------------------------
21+
22+
(def ^:private sse-headers
23+
{"Content-Type" "text/event-stream"})
24+
25+
(def ^:private sse-headers-br
26+
{"Content-Type" "text/event-stream"
27+
"Content-Encoding" "br"})
28+
29+
(def ^:private sse-close-sentinel ::close)
30+
31+
(def ^:private default-sse-max-batch-messages 32)
32+
(def ^:private default-sse-max-batch-chars (* 64 1024)) ;; 64KiB
33+
34+
(defn- sse-batch-limits
35+
"Resolve batching limits for a new SSE writer actor.
36+
37+
Optional app-state* overrides:
38+
- :sse-max-batch-messages
39+
- :sse-max-batch-chars"
40+
[app-state*]
41+
{:max-messages (long (or (get @app-state* :sse-max-batch-messages)
42+
default-sse-max-batch-messages))
43+
:max-chars (long (or (get @app-state* :sse-max-batch-chars)
44+
default-sse-max-batch-chars))})
45+
46+
(defn- try-http-send!
47+
"Wrapper around http-kit send! that logs and returns boolean success."
48+
[tab-id channel data close?]
49+
(try
50+
(boolean (http/send! channel data close?))
51+
(catch Throwable e
52+
(t/error! e {:id :hyper.error/send-sse
53+
:data {:hyper/tab-id tab-id}})
54+
false)))
55+
56+
(defn- send-sse-initial-response!
57+
"Send the initial SSE response map (headers + body) for a channel.
58+
Must happen exactly once per SSE connection."
59+
[tab-id writer payload]
60+
(let [channel (:channel writer)
61+
headers (if (:br-stream writer) sse-headers-br sse-headers)]
62+
(try-http-send! tab-id channel {:headers headers
63+
:body payload}
64+
false)))
65+
66+
(defn- send-sse-chunk!
67+
"Send a subsequent SSE data chunk on an already-initialized channel."
68+
[tab-id writer payload]
69+
(try-http-send! tab-id (:channel writer) payload false))
70+
71+
(defn- close-sse-writer!
72+
[tab-id writer]
73+
;; Close resources on the actor thread to avoid brotli races.
74+
(br/close-stream (:br-stream writer))
75+
(when-let [channel (:channel writer)]
76+
(when (instance? org.httpkit.server.AsyncChannel channel)
77+
(t/catch->error! :hyper.error/close-sse-channel
78+
(http/close channel))))
79+
(t/log! {:level :debug
80+
:id :hyper.event/sse-writer-close
81+
:data {:hyper/tab-id tab-id}
82+
:msg "SSE writer closed"})
83+
nil)
84+
85+
(defn- drop-queued-messages!
86+
"Drain and drop any remaining queued items (used after close)."
87+
[^BlockingQueue queue]
88+
(loop [m (.poll queue)]
89+
(when m
90+
(recur (.poll queue))))
91+
nil)
92+
93+
(defn- coalesce-sse-messages
94+
"Coalesce already-queued SSE messages into one string payload.
95+
96+
- Never blocks (uses `.poll`).
97+
- Preserves ordering by returning a :pending message if a polled message
98+
would exceed the batch limits.
99+
- If a close sentinel is observed while polling, returns :close-after? true
100+
so the actor closes *after* sending this batch.
101+
102+
Returns {:batch string :pending (or string nil) :close-after? boolean}."
103+
[first-msg ^BlockingQueue queue {:keys [max-messages max-chars]}]
104+
(let [first-str (str first-msg)
105+
^StringBuilder sb (StringBuilder.)]
106+
(.append sb first-str)
107+
(loop [msg-count 1
108+
char-count (count first-str)]
109+
(cond
110+
(>= msg-count max-messages)
111+
{:batch (.toString sb) :pending nil :close-after? false}
112+
113+
(>= char-count max-chars)
114+
{:batch (.toString sb) :pending nil :close-after? false}
115+
116+
:else
117+
(let [next (.poll queue)]
118+
(cond
119+
(nil? next)
120+
{:batch (.toString sb) :pending nil :close-after? false}
121+
122+
(= next sse-close-sentinel)
123+
{:batch (.toString sb) :pending nil :close-after? true}
124+
125+
:else
126+
(let [next-str (str next)
127+
next-len (count next-str)]
128+
(if (and (pos? max-chars)
129+
(> (+ char-count next-len) max-chars))
130+
{:batch (.toString sb) :pending next-str :close-after? false}
131+
(do
132+
(.append sb next-str)
133+
(recur (inc msg-count) (+ char-count next-len)))))))))))
134+
135+
(defn- sse-actor-loop!
136+
"Virtual-thread actor loop. Owns the channel + (optional) streaming brotli
137+
state, enforcing single-writer semantics by construction."
138+
[tab-id writer]
139+
(let [^BlockingQueue queue (:queue writer)
140+
limits (:batch-limits writer)]
141+
(loop [started? false
142+
pending nil]
143+
(let [next-state
144+
(try
145+
(let [msg (or pending (.take queue))]
146+
(cond
147+
(= msg sse-close-sentinel)
148+
nil
149+
150+
:else
151+
(let [{batch :batch
152+
pending-next :pending
153+
close-after? :close-after?}
154+
(coalesce-sse-messages msg queue limits)
155+
payload (if-let [br-stream (:br-stream writer)]
156+
(br/compress-stream (:br-out writer) br-stream batch)
157+
batch)
158+
sent? (if started?
159+
(send-sse-chunk! tab-id writer payload)
160+
(send-sse-initial-response! tab-id writer payload))]
161+
(when (and sent? (not close-after?))
162+
{:started? true
163+
:pending pending-next}))))
164+
(catch InterruptedException _
165+
nil)
166+
(catch Throwable e
167+
(t/error! e {:id :hyper.error/sse-writer
168+
:data {:hyper/tab-id tab-id}})
169+
nil))]
170+
(if next-state
171+
(recur (:started? next-state) (:pending next-state))
172+
(do
173+
(close-sse-writer! tab-id writer)
174+
(drop-queued-messages! queue)
175+
nil))))))
176+
177+
(defn- start-sse-writer-thread!
178+
"Start the SSE writer actor as a single virtual thread."
179+
[tab-id writer]
180+
(-> (Thread/ofVirtual)
181+
(.name (str "hyper-sse-" tab-id))
182+
(.start ^Runnable #(sse-actor-loop! tab-id writer))))
183+
184+
(defn- new-sse-writer
185+
"Create and start a per-tab SSE writer actor.
186+
187+
- Producers enqueue quickly (send-sse!).
188+
- The actor owns http-kit send!/close and the streaming brotli state."
189+
[app-state* tab-id channel compress?]
190+
(let [queue (LinkedBlockingQueue.)
191+
out (when compress? (br/byte-array-out-stream))
192+
br-stream (when out (br/compress-out-stream out :window-size 18))
193+
writer (cond-> {:channel channel
194+
:queue queue
195+
:batch-limits (sse-batch-limits app-state*)}
196+
compress? (assoc :br-out out
197+
:br-stream br-stream))
198+
thread (start-sse-writer-thread! tab-id writer)]
199+
(assoc writer :thread thread)))
200+
201+
(defn- stop-sse-writer!
202+
"Signal an SSE writer actor to close via a sentinel."
203+
[writer]
204+
(when writer
205+
(.offer ^BlockingQueue (:queue writer) sse-close-sentinel))
206+
nil)
16207

17208
(defn register-sse-channel!
18209
"Register an SSE channel for a tab, optionally with a streaming brotli
19210
compressor. When compress? is true, creates a compressor pair
20211
(ByteArrayOutputStream + BrotliOutputStream) kept for the lifetime
21212
of the SSE connection so the LZ77 window is shared across fragments."
22213
[app-state* tab-id channel compress?]
23-
(let [tab-updates (cond-> {:sse-channel channel}
24-
compress? (merge (let [out (br/byte-array-out-stream)]
25-
{:br-out out
26-
:br-stream (br/compress-out-stream out :window-size 18)})))]
27-
(swap! app-state* update-in [:tabs tab-id] merge tab-updates))
214+
;; Reconnect safety: close any previous writer actor for this tab-id.
215+
(when-let [old-writer (get-in @app-state* [:tabs tab-id :sse-writer])]
216+
(stop-sse-writer! old-writer))
217+
218+
(let [writer (new-sse-writer app-state* tab-id channel compress?)]
219+
(swap! app-state* update-in [:tabs tab-id] merge
220+
{:sse-channel channel
221+
:sse-writer writer}))
28222
nil)
29223

30224
(defn unregister-sse-channel!
31-
"Unregister an SSE channel and close the brotli stream for a tab."
225+
"Unregister an SSE channel for a tab.
226+
227+
Enqueues a close sentinel so the *actor thread* performs:
228+
- brotli stream close (if present)
229+
- channel close
230+
231+
This avoids closing the brotli stream concurrently with an in-flight write."
32232
[app-state* tab-id]
33233
(let [tab-data (get-in @app-state* [:tabs tab-id])
234+
writer (:sse-writer tab-data)
34235
channel (:sse-channel tab-data)]
35-
(br/close-stream (:br-stream tab-data))
36-
(when (and channel (instance? org.httpkit.server.AsyncChannel channel))
236+
(stop-sse-writer! writer)
237+
238+
;; Best-effort: if we somehow have a channel but no writer, close it.
239+
(when (and (not writer)
240+
channel
241+
(instance? org.httpkit.server.AsyncChannel channel))
37242
(t/catch->error! :hyper.error/close-sse-channel
38243
(http/close channel))))
244+
39245
(swap! app-state* update-in [:tabs tab-id]
40-
assoc :sse-channel nil :br-out nil :br-stream nil)
246+
assoc
247+
:sse-channel nil
248+
:sse-writer nil
249+
;; Legacy keys from pre-writer-actor versions
250+
:br-out nil
251+
:br-stream nil)
41252
nil)
42253

43254
(defn get-sse-channel
@@ -131,23 +342,18 @@
131342
"data: elements <script data-effect=\"el.remove()\">" js "</script>\n\n")))
132343

133344
(defn send-sse!
134-
"Send an SSE message to a tab's channel.
135-
If the tab has a streaming brotli compressor (client supports br),
136-
compresses through it and sends raw bytes. The Content-Encoding: br
137-
header is set once on the initial response — subsequent sends on the
138-
async channel are just data frames in the same compressed stream."
345+
"Enqueue an SSE message for a tab.
346+
347+
This function is intentionally non-blocking:
348+
- it does *not* perform brotli compression inline
349+
- it does *not* call http-kit send! inline
350+
351+
A per-tab SSE writer actor owns the channel + (optional) streaming brotli
352+
compressor, guaranteeing single-writer semantics."
139353
[app-state* tab-id message]
140-
(let [tab-data (get-in @app-state* [:tabs tab-id])
141-
channel (:sse-channel tab-data)
142-
br-out (:br-out tab-data)
143-
br-stream (:br-stream tab-data)]
144-
(when channel
145-
(or (t/catch->error! :hyper.error/send-sse
146-
(if (and br-out br-stream)
147-
(let [compressed (br/compress-stream br-out br-stream message)]
148-
(http/send! channel compressed false))
149-
(http/send! channel message false)))
150-
false))))
354+
(if-let [writer (get-in @app-state* [:tabs tab-id :sse-writer])]
355+
(boolean (.offer ^BlockingQueue (:queue writer) message))
356+
false))
151357

152358
(defn render-error-fragment
153359
"Render an error message as a fragment."

src/hyper/server.clj

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -79,37 +79,22 @@
7979
{:on-open (fn [channel]
8080
(state/get-or-create-tab! app-state* session-id tab-id)
8181
(render/register-sse-channel! app-state* tab-id channel compress?)
82+
83+
;; Enqueue the initial connected event *before* any watchers can
84+
;; enqueue render output. The SSE writer actor owns sending headers
85+
;; (and brotli compression, if enabled).
86+
(let [connected-msg (str "event: connected\n"
87+
"data: {\"tab-id\":\"" tab-id "\"}\n\n")]
88+
(render/send-sse! app-state* tab-id connected-msg))
89+
8290
(render/setup-watchers! app-state* session-id tab-id request-var)
8391
;; Auto-watch the routes Var so title/route changes
8492
;; trigger re-renders for all connected tabs
8593
(when-let [routes-source (get @app-state* :routes-source)]
8694
(when (var? routes-source)
8795
(render/watch-source! app-state* session-id tab-id request-var routes-source)))
8896
;; Set up route-level watches (:watches + Var :get handlers)
89-
(render/setup-route-watches! app-state* session-id tab-id request-var)
90-
(let [connected-msg (str "event: connected\n"
91-
"data: {\"tab-id\":\"" tab-id "\"}\n\n")]
92-
(if compress?
93-
;; Brotli: send headers with the first chunk compressed
94-
;; through the tab's streaming compressor so all bytes
95-
;; on this connection form one contiguous brotli stream.
96-
(let [tab-data (get-in @app-state* [:tabs tab-id])
97-
compressed (br/compress-stream
98-
(:br-out tab-data)
99-
(:br-stream tab-data)
100-
connected-msg)]
101-
(http-kit/send!
102-
channel
103-
{:headers {"Content-Type" "text/event-stream"
104-
"Content-Encoding" "br"}
105-
:body compressed}
106-
false))
107-
;; No compression: plain text
108-
(http-kit/send!
109-
channel
110-
{:headers {"Content-Type" "text/event-stream"}
111-
:body connected-msg}
112-
false))))
97+
(render/setup-route-watches! app-state* session-id tab-id request-var))
11398

11499
:on-close (fn [_channel _status]
115100
(t/log! {:level :info

0 commit comments

Comments
 (0)