Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions tavern/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
tavernmcp "realm.pub/tavern/internal/mcp"
"realm.pub/tavern/internal/portals"
"realm.pub/tavern/internal/portals/mux"
"realm.pub/tavern/internal/portals/ssh"
"realm.pub/tavern/internal/portals/pty"
"realm.pub/tavern/internal/portals/ssh"
"realm.pub/tavern/internal/redirectors"
"realm.pub/tavern/internal/scheduler"
"realm.pub/tavern/internal/secrets"
Expand Down Expand Up @@ -313,13 +313,8 @@ func NewServer(ctx context.Context, options ...func(*Config)) (*Server, error) {
// Configure Request Logging
httpLogger := log.New(os.Stderr, "[HTTP] ", log.Flags())

// Configure Shell Muxes
wsShellMux, grpcShellMux := cfg.NewShellMuxes(ctx)
go func() {
if err := wsShellMux.Start(ctx); err != nil {
slog.ErrorContext(ctx, "websocket shell mux stopped", "err", err)
}
}()
// Configure Shell Mux
grpcShellMux := cfg.NewGRPCShellMux(ctx)
go func() {
if err := grpcShellMux.Start(ctx); err != nil {
slog.ErrorContext(ctx, "grpc shell mux stopped", "err", err)
Expand Down Expand Up @@ -493,14 +488,8 @@ func NewServer(ctx context.Context, options ...func(*Config)) (*Server, error) {
Handler: cdn.NewUploadHandler(client),
},
"/shell/ws": tavernhttp.Endpoint{
Handler: stream.NewShellHandler(client, wsShellMux),
},
"/shellv2/ws": tavernhttp.Endpoint{
Handler: tavernshell.NewHandler(client, portalMux),
},
"/shell/ping": tavernhttp.Endpoint{
Handler: stream.NewPingHandler(client, wsShellMux),
},
"/portals/ssh/ws": tavernhttp.Endpoint{
Handler: ssh.NewHandler(client, portalMux),
},
Expand Down
24 changes: 5 additions & 19 deletions tavern/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,24 +257,22 @@ func (cfg *Config) NewPortalMux(ctx context.Context) *mux.Mux {
return mux.New(mux.WithPubSubClient(client), mux.WithSubscriberBufferSize(subBufferSize))
}

// NewShellMuxes configures two stream.Mux instances for shell i/o.
// The wsMux will be used by websockets to subscribe to shell output and publish new input.
// NewGRPCShellMux configures a stream.Mux instance for shell i/o.
// The grpcMux will be used by gRPC to subscribe to shell input and publish new output.
func (cfg *Config) NewShellMuxes(ctx context.Context) (wsMux *stream.Mux, grpcMux *stream.Mux) {
func (cfg *Config) NewGRPCShellMux(ctx context.Context) (grpcMux *stream.Mux) {
var (
projectID = EnvGCPProjectID.String()
gcpTopicPrefix = fmt.Sprintf("gcppubsub://projects/%s/topics/", projectID)
topicShellInput = EnvPubSubTopicShellInput.String()
topicShellOutput = EnvPubSubTopicShellOutput.String()
subShellInput = EnvPubSubSubscriptionShellInput.String()
subShellOutput = EnvPubSubSubscriptionShellOutput.String()
)

// For GCP, messages for a "Subscription" are load-balanced across all of the "Subscribers" to that same "Subscription"
// This means we must make a new "Subscription" in GCP for each instance of tavern to ensure they all receive the
// appropriate input/output from shells. For more information, see the information here:
// https://cloud.google.com/pubsub/docs/pubsub-basics#choose_a_publish_and_subscribe_pattern
if strings.HasPrefix(subShellInput, "gcppubsub://") && strings.HasPrefix(subShellOutput, "gcppubsub://") {
if strings.HasPrefix(subShellInput, "gcppubsub://") {
if projectID == "" {
log.Fatalf("[FATAL] must set value for %q when using gcppubsub:// in configuration", EnvGCPProjectID.Key)
}
Expand Down Expand Up @@ -316,13 +314,10 @@ func (cfg *Config) NewShellMuxes(ctx context.Context) (wsMux *stream.Mux, grpcMu
}

shellInputTopicID := strings.TrimPrefix(topicShellInput, gcpTopicPrefix)
shellOutputTopicID := strings.TrimPrefix(topicShellOutput, gcpTopicPrefix)

// Overwrite env var specification with newly created GCP PubSub Subscriptions
subShellInput = fmt.Sprintf("gcppubsub://projects/%s/subscriptions/%s", projectID, createGCPSubscription(ctx, shellInputTopicID))
slog.DebugContext(ctx, "created GCP PubSub subscription for shell input", "subscription_name", subShellInput)
subShellOutput = fmt.Sprintf("gcppubsub://projects/%s/subscriptions/%s", projectID, createGCPSubscription(ctx, shellOutputTopicID))
slog.DebugContext(ctx, "created GCP PubSub subscription for shell output", "subscription_name", subShellOutput)

// Start a goroutine to publish noop messages on an interval.
// This reduces cold-start latency for GCP PubSub which can improve shell user experience.
Expand All @@ -341,24 +336,15 @@ func (cfg *Config) NewShellMuxes(ctx context.Context) (wsMux *stream.Mux, grpcMu
log.Fatalf("[FATAL] Failed to connect to pubsub topic (%q): %v", topicShellOutput, err)
}

slog.DebugContext(ctx, "opening GCP PubSub subscription for shell output", "subscription_name", subShellOutput)
subOutput, err := pubsub.OpenSubscription(ctx, subShellOutput)
if err != nil {
log.Fatalf("[FATAL] Failed to connect to pubsub subscription (%q): %v", subShellOutput, err)
}

pubInput, err := pubsub.OpenTopic(ctx, topicShellInput)
if err != nil {
log.Fatalf("[FATAL] Failed to connect to pubsub topic (%q): %v", topicShellInput, err)
}
// Make sure the topic is created before we try to subscribe to it in memory
_, _ = pubsub.OpenTopic(ctx, topicShellInput)

slog.DebugContext(ctx, "opening GCP PubSub subscription for shell input", "subscription_name", subShellInput)
subInput, err := pubsub.OpenSubscription(ctx, subShellInput)
if err != nil {
log.Fatalf("[FATAL] Failed to connect to pubsub subscription (%q): %v", subShellInput, err)
}

wsMux = stream.NewMux(pubInput, subOutput)
grpcMux = stream.NewMux(pubOutput, subInput)
return
}
Expand Down
Loading
Loading