-
Notifications
You must be signed in to change notification settings - Fork 587
Streamable HTTP resumability + redelivery + SSE polling via server-side disconnect #1077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| /// For other transports, this property will be <see langword="null"/>. | ||
| /// </para> | ||
| /// </remarks> | ||
| public Action? CloseStandaloneSseStream { get; set; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the changes to the JsonRpcMessageContext necessary? I think I see the purpose. You cannot always rely on the request ID from the POST body to identify the final message in the SSE stream, but I wonder if there isn't a cleaner abstraction. Maybe disposing the RelatedTransport? That wouldn't cover the GET stream, but I don't know why you need to be able to close the GET stream via a random JsonRpcMessage at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree that it's probably not necessary to support closing the unsolicited message stream.
I've updated how this works. RequestContext now has an EnablePollingAsync(TimeSpan retryInterval, ...) method that calls through to StreamableHttpPostTransport.EnablePollingAsync(...). If the developer knows that an operation is going to take a long time, they can call EnablePollingAsync() which closes the connection and forces the client to poll for updates at a stream-specific interval.
If the connection gets closed via network error and EnablePollingAsync() hasn't been called, then after the client reconnects, the connection will remain open until the stream has completed (the server generates a final response).
| } | ||
|
|
||
| // If this is a POST stream, we're done - the replay was the complete response | ||
| if (streamId != GetStreamId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't there be similar changes to HandlePostRequestAsync? Also, we need to do more to avoid stream ID collisions. Even though there's one "get" per session, IEventStore is supplied via HttpServerTransportOptions.EventStore which makes it quite difficult to configure a per-session store.
I don't think people want to be forced to configure the store per-session anyway. I think we need to include the session ID or maybe a random GUID for "Stateless" requests if we support stateless resumption at all.
I think it could make sense to support stateless, but we need to have tests for stateless and non-stateless tests. I recommend looking at the MapMcpTests and MapMcpStreamableHttpTests. By adding test cases to MapMcpStreamableHttpTests, you should get both stateless and non-stateless test coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we need to do more to avoid stream ID collisions. Even though there's one "get" per session, IEventStore is supplied via HttpServerTransportOptions.EventStore which makes it quite difficult to configure a per-session store.
This is now fixed. The new ISseEventStreamStore abstraction now requires both a SessionId and StreamId to be supplied in order to create a stored SSE event stream. The event IDs returned from the stream are then guaranteed to be globally unique so long as the SessionId is also globally unique and the StreamId is unique within its own session.
I think it could make sense to support stateless, but we need to have tests for stateless and non-stateless tests. I recommend looking at the MapMcpTests and MapMcpStreamableHttpTests. By adding test cases to MapMcpStreamableHttpTests, you should get both stateless and non-stateless test coverage.
I've experimented a bit with supporting stateless, but I think we should do that in a follow-up. The main blockers at the moment are:
- The server won't know the client's protocol version after the initial handshake. How do we determine if the client supports priming events?
- In stateless mode, we currently dispose the session when the POST request completes. This ends the message processing loop and prevents the server from sending more messages to the client. We'd need to instead keep the session alive until the final response message is sent.
| /// Priming events are only supported in protocol version >= 2025-11-25. | ||
| /// Older clients may crash when receiving SSE events with empty data. | ||
| /// </remarks> | ||
| internal static bool SupportsResumability(string? protocolVersion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This should be renamed. It's not about whether the protocol supports resumability, but it's specifically about the "priming" event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to SupportsPrimingEvent()
| /// In-memory event store for testing resumability. | ||
| /// This is a simple implementation intended for testing, not for production use. | ||
| /// </summary> | ||
| public class InMemoryEventStore : IEventStore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about the idea of including a default implementation of IEventStore based on IDistributedCache? It'd be opt-in ofc. I could definitely see it being a follow up. Figuring out how to get the usability of that right might influence the design of the abstraction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I've prototyped an IDistribuedCache-based implementation validate the ISseEventStreamStore abstraction, but that implementation is not included in this PR. If we think it's blocking, we can wait to merge this PR until I have that polished up.
| /// Gets or sets the retry interval to suggest to clients in SSE retry field. | ||
| /// </summary> | ||
| /// <value> | ||
| /// The retry interval. The default is <see langword="null"/>, meaning no retry field is sent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spec states "it SHOULD send an SSE event with a standard retry field". Does that mean our default is going against the spec's recommendation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I've now added a non-null default value of 1 second.
src/ModelContextProtocol.Core/Client/StreamableHttpClientSessionTransport.cs
Outdated
Show resolved
Hide resolved
| else | ||
| { | ||
| // We have an event ID, so reconnection should work - reset attempts | ||
| attempt = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What prevents us from ending up in an infinite loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the server never sends the response/error message and continues returning successful HTTP responses, then this could theoretically loop indefinitely. But I don't see that as different than, for example, the server leaving the response stream open indefinitely and keeping the client waiting for the final message.
| /// Implementations should be thread-safe, as events may be stored and replayed concurrently. | ||
| /// </para> | ||
| /// </remarks> | ||
| public interface IEventStore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a fairly general name. Could we make it ISseEventStore or something similarly more specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Renamed to ISseEventStreamStore.
| /// Stores an event for later retrieval. | ||
| /// </summary> | ||
| /// <param name="streamId"> | ||
| /// The ID of the stream the event belongs to. This is typically the JSON-RPC request ID | ||
| /// for POST SSE responses, or a special identifier for the standalone GET SSE stream. | ||
| /// </param> | ||
| /// <param name="message"> | ||
| /// The JSON-RPC message to store, or <see langword="null"/> for priming events. | ||
| /// Priming events establish the event ID without carrying a message payload. | ||
| /// </param> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the definition of "event" here? It says this is for storing an event, but then accepts the JSON-RPC message to store. Is the intimation that the event contains that message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've since updated the abstraction to instead accept an SseItem<JsonRpcMessage?>, so hopefully this clears up any ambiguity.
| ValueTask<string?> ReplayEventsAfterAsync( | ||
| string lastEventId, | ||
| Func<JsonRpcMessage, string, CancellationToken, ValueTask> sendCallback, | ||
| CancellationToken cancellationToken = default); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious why you chose to expose it like this rather than, say, exposing a GetEventsAfter method that returned an enumerable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - changed this to return an IAsyncEnumerable.
mikekistler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't get through all of this but want to share the comments I've made so far.
| /// </param> | ||
| /// <param name="cancellationToken">A token to cancel the operation.</param> | ||
| /// <returns>The generated event ID for the stored event.</returns> | ||
| ValueTask<string> StoreEventAsync(string sessionId, string streamId, JsonRpcMessage? message, CancellationToken cancellationToken = default); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If sessionId could be null, should it be declared as string??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that if the caller doesn't have a session ID to provide, they should generate a unique one on-the-fly (e.g., a GUID) even if only for the sake of avoiding stream ID conflicts.
| /// </param> | ||
| /// <param name="cancellationToken">A token to cancel the operation.</param> | ||
| /// <returns>The generated event ID for the stored event.</returns> | ||
| ValueTask<string> StoreEventAsync(string sessionId, string streamId, JsonRpcMessage? message, CancellationToken cancellationToken = default); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should allow the EventStore to selectively store events. For example, we should allow the EventStore to choose to not store Progress notifications. In these cases, it isn't necessary to assign an id to the event -- I'm pretty sure the SSE spec allows this. So could we make the return value a string? to permit the EventStore to choose to not assign an id to the event (which it may not have stored).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The updated abstraction just returns an SseItem<JsonRpcMessage?>, so an implementation could opt to just return the original item with a null EventId if they don't want to store it.
| /// Priming events establish the event ID without carrying a message payload. | ||
| /// </param> | ||
| /// <param name="cancellationToken">A token to cancel the operation.</param> | ||
| /// <returns>The generated event ID for the stored event.</returns> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are some requirements on the event ID, like it must be unique within the session, if the server uses session management. It might be good to add these requirements in the doc here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. The new XML docs make this clearer.
| /// <summary> | ||
| /// Gets the session ID that the events belong to. | ||
| /// </summary> | ||
| public required string SessionId { get; init; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too, should SessionId be a string??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #1077 (comment)
| /// This class simplifies event storage by binding the session ID, stream ID, and retry interval | ||
| /// so that callers only need to provide the message when storing events. | ||
| /// </remarks> | ||
| internal sealed class SseStreamEventStore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be really helpful to expose a base class for the event store that users could easily extend with custom behavior to suit their needs, such as an event filter, id generator, and retryInterval delegate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestion. I'm planning to eventually provide an implementation based on IDistributedCache. We could make it unsealed to allow for these scenarios.
348fac0 to
493062a
Compare
| { | ||
| // The GET and DELETE endpoints are not mapped in Stateless mode since there's no way to send unsolicited messages | ||
| // for the GET to handle, and there is no server-side state for the DELETE to clean up. | ||
| streamableHttpGroup.MapGet("", streamableHttpHandler.HandleGetRequestAsync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change prepares for the future possibility of allowing stateless servers to resume POST streams.
| private static async Task HandleResumePostResponseStreamAsync(HttpContext context, ISseEventStreamReader eventStreamReader) | ||
| { | ||
| InitializeSseResponse(context); | ||
| await eventStreamReader.CopyToAsync(context.Response.Body, context.RequestAborted); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that when resuming a POST stream, we read directly from the ISseEventStreamReader until the event stream completes, rather than handing control back to the StreamableHttpPostTransport. This both keeps the implementation simple and is compatible with stateless (the original StreamableHttpPostTransport may be on a different server).
This differs from how the StreamableHttpServerTransport handles resuming the unsolicited message stream. After reading all messages currently available in the ISseEventStreamStore, the StreamableHttpServerTransport transitions to writing directly to the response stream. This is fine because only stateful sessions support the unsolicited message stream. It complicates the implementation slightly, but it avoids having to repeatedly poll for new messages in the ISseEventStreamStore.
| await eventStreamReader.CopyToAsync(sseResponseStream, cancellationToken); | ||
| } | ||
|
|
||
| var eventStreamWriter = await GetOrCreateEventStreamAsync(cancellationToken).ConfigureAwait(false); | ||
| if (eventStreamWriter is not null) | ||
| { | ||
| await _sseWriter.SendPrimingEventAsync(RetryInterval, eventStreamWriter, cancellationToken).ConfigureAwait(false); | ||
| } | ||
|
|
||
| // We do not need to reference _disposeCts like in HandlePostRequest, because the session ending completes the _sseWriter gracefully. | ||
| await _sseWriter.WriteAllAsync(sseResponseStream, cancellationToken).ConfigureAwait(false); | ||
| return _sseWriter.WriteAllAsync(sseResponseStream, cancellationToken); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to note that there might be a possibility for missing events here, depending on the ISseEventStreamStore implementation. For example:
SendMessageAsync()gets called and writes an event viaISseEventStreamWriter.WriteEventAsync()- The call to
WriteEventAsync()completes while the event is still being written to the distributed cache. HandleGetRequestAsync()gets called andeventStreamReader.CopyToAsync()completes before the just-written event has fully hit the cache.
Not sure if this would happen in practice, as the reader and writer are both guaranteed to be in the same process and the ISseEventStreamWriter.WriteEventAsync() method would have to complete before the reader is able to consume the written event. To guarantee this isn't an issue, we could store the ID of the most recent event in memory and keep reading from the store until we've consumed the event with that ID.
Overview
This PR implements the following features:
Last-Event-IDheaderISseEventStreamStoreabstraction to allow for storage and replay of SSE eventsLast-Event-IDheaderEach of these features has been split into its own commit for ease of review.
Description
There are two disconnection scenarios covered by this PR:
If a network error occurs, and an
ISseEventStreamStoreis configured, then the client may attempt to reconnect by making a GET request with aLast-Event-IDheader. The server will then replay stored events before continuing to use the new GET response to stream remaining events. If further disconnections happen, the client may continue to make new GET requests to resume the stream. This applies for both client-initiated requests (POST) and the unsolicited message stream (GET).However, the server can also initiate a disconnection and force the client to poll for updates. This is useful for avoiding long-running streams. This can be done via the new
RequestContext<TParams>.EnablePollingAsync(TimeSpan retryInterval, CancellationToken cancellationToken = default)API. When the client reconnects via GET with aLast-Event-ID, the response will only contain events currently available in theISseEventStreamStorebefore completing. The client must continue initiating new GET requests at the specifiedretryIntervaluntil the final response is received.Fixes #510
Fixes #1020