diff --git a/internal/cmd/context.go b/internal/cmd/context.go index e5fc182a1c..822f377e44 100644 --- a/internal/cmd/context.go +++ b/internal/cmd/context.go @@ -71,6 +71,7 @@ type Context struct { DAGRunLeaseStore exec.DAGRunLeaseStore ActiveDistributedRunStore exec.ActiveDistributedRunStore + DAGStore exec.DAGStore Proc exec.ProcHandle LicenseManager *license.Manager ContextStore *clicontext.Store @@ -100,6 +101,7 @@ func (c *Context) WithContext(ctx context.Context) *Context { WorkerHeartbeatStore: c.WorkerHeartbeatStore, DAGRunLeaseStore: c.DAGRunLeaseStore, ActiveDistributedRunStore: c.ActiveDistributedRunStore, + DAGStore: c.DAGStore, Proc: c.Proc, LicenseManager: c.LicenseManager, ContextStore: c.ContextStore, @@ -344,6 +346,10 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) { sm := file.NewServiceRegistry(cfg) dispatchTaskStore := store.NewDispatchTaskStore(file.NewCollection(distributedDir)) workerHeartbeatStore := store.NewWorkerHeartbeatStore(file.NewCollection(filepath.Join(distributedDir, "workers"))) + dagStore, err := cmdprocess.NewDAGStore(cfg, cmdprocess.DAGStoreConfig{}) + if err != nil { + return nil, fmt.Errorf("failed to create DAG store: %w", err) + } // Initialize license manager for server commands var licMgr *license.Manager @@ -410,6 +416,7 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) { WorkerHeartbeatStore: workerHeartbeatStore, DAGRunLeaseStore: dagRunLeaseStore, ActiveDistributedRunStore: activeDistributedRunStore, + DAGStore: dagStore, LicenseManager: licMgr, ContextStore: contextStore, CLIContext: selectedContext, diff --git a/internal/cmd/coord.go b/internal/cmd/coord.go index d4de91ed59..de0e3b7c95 100644 --- a/internal/cmd/coord.go +++ b/internal/cmd/coord.go @@ -106,6 +106,7 @@ func runCoordinator(ctx *Context, _ []string) error { coordCtx.WorkerHeartbeatStore, coordCtx.DAGRunLeaseStore, coordCtx.ActiveDistributedRunStore, + coordCtx.DAGStore, ) if err != nil { return fmt.Errorf("failed to initialize coordinator: %w", err) @@ -145,6 +146,7 @@ func newCoordinator( workerHeartbeatStore exec.WorkerHeartbeatStore, dagRunLeaseStore exec.DAGRunLeaseStore, activeDistributedRunStore exec.ActiveDistributedRunStore, + dagStore exec.DAGStore, ) (*coordinator.Service, *coordinator.Handler, error) { // Generate instance ID hostname, err := os.Hostname() @@ -234,6 +236,7 @@ func newCoordinator( WorkerHeartbeatStore: workerHeartbeatStore, DAGRunLeaseStore: dagRunLeaseStore, ActiveDistributedRunStore: activeDistributedRunStore, + DAGStore: dagStore, EventService: ctx.EventService, EventSourceInstance: ctx.EventSourceInstance, }) diff --git a/internal/cmd/startall.go b/internal/cmd/startall.go index 3f62bf8578..338012c54e 100644 --- a/internal/cmd/startall.go +++ b/internal/cmd/startall.go @@ -155,6 +155,7 @@ func runStartAll(ctx *Context, _ []string) error { ctx.WorkerHeartbeatStore, ctx.DAGRunLeaseStore, ctx.ActiveDistributedRunStore, + ctx.DAGStore, ) if err != nil { return fmt.Errorf("failed to initialize coordinator: %w", err) diff --git a/internal/runtime/agent/agent.go b/internal/runtime/agent/agent.go index 6acf856eff..8eab62fbc7 100644 --- a/internal/runtime/agent/agent.go +++ b/internal/runtime/agent/agent.go @@ -240,6 +240,9 @@ type Agent struct { // secretMasker redacts resolved secret values from status/history snapshots. secretMasker *masking.Masker + // remoteDAGLoader loads a DAG from a remote source when local store misses. + remoteDAGLoader RemoteDAGLoader + // Evaluated configs - these are expanded at runtime and stored separately // to avoid mutating the original DAG struct. evaluatedSMTP *core.SMTPConfig @@ -274,6 +277,10 @@ type ArtifactFinalizer = runtime.ArtifactFinalizer // SubWorkflowRunnerFactory creates a runner for child workflows. type SubWorkflowRunnerFactory func(ctx context.Context) (runtimeexec.SubWorkflowRunner, error) +// RemoteDAGLoader loads a DAG definition from a remote source. +// Returns nil, nil when the remote source does not have the DAG. +type RemoteDAGLoader func(ctx context.Context, name string) (*core.DAG, error) + // Options is the configuration for the Agent. type Options struct { // Dry is a dry-run mode. It does not execute the actual command. @@ -363,6 +370,9 @@ type Options struct { DAGRunArtifactDir string // ArtifactFinalizer persists artifacts before the final terminal status is written. ArtifactFinalizer ArtifactFinalizer + // RemoteDAGLoader loads a DAG from a remote source when the local DAG store misses. + // When nil, no remote fallback is attempted. + RemoteDAGLoader RemoteDAGLoader // SocketServerFactory creates the local status/control transport. // When nil, the default Unix socket transport is used. SocketServerFactory SocketServerFactory @@ -427,6 +437,7 @@ func New( dagRunLogDir: opts.DAGRunLogDir, dagRunArtifactDir: opts.DAGRunArtifactDir, socketServerFactory: opts.SocketServerFactory, + remoteDAGLoader: opts.RemoteDAGLoader, } if a.socketServerFactory == nil { a.socketServerFactory = defaultSocketServerFactory @@ -582,7 +593,7 @@ func (a *Agent) Run(ctx context.Context) error { } // Create a new environment for the dag-run. - dbClient := newDBClient(a.dagRunStore, a.dagStore) + dbClient := newDBClient(a.dagRunStore, a.dagStore, a.remoteDAGLoader) subWorkflowRunner, err := a.createSubWorkflowRunner(ctx) if err != nil { @@ -1933,7 +1944,7 @@ func (a *Agent) dryRun(ctx context.Context) error { } }() - db := newDBClient(a.dagRunStore, a.dagStore) + db := newDBClient(a.dagRunStore, a.dagStore, a.remoteDAGLoader) contextOpts := []runtime.ContextOption{ runtime.WithDatabase(db), runtime.WithRootDAGRun(a.rootDAGRun), diff --git a/internal/runtime/agent/dbclient.go b/internal/runtime/agent/dbclient.go index e31e370da6..0eeb6191a4 100644 --- a/internal/runtime/agent/dbclient.go +++ b/internal/runtime/agent/dbclient.go @@ -5,9 +5,12 @@ package agent import ( "context" + "errors" "fmt" "strings" + "github.com/dagucloud/dagu/internal/cmn/logger" + "github.com/dagucloud/dagu/internal/cmn/logger/tag" "github.com/dagucloud/dagu/internal/core" "github.com/dagucloud/dagu/internal/core/exec" "github.com/dagucloud/dagu/internal/runtime" @@ -16,17 +19,65 @@ import ( var _ runtime.Database = &dbClient{} type dbClient struct { - ds exec.DAGStore - drs exec.DAGRunStore + ds exec.DAGStore + drs exec.DAGRunStore + remoteDAGLoader RemoteDAGLoader } -func newDBClient(drs exec.DAGRunStore, ds exec.DAGStore) *dbClient { - return &dbClient{drs: drs, ds: ds} +func newDBClient(drs exec.DAGRunStore, ds exec.DAGStore, remoteDAGLoader RemoteDAGLoader) *dbClient { + return &dbClient{drs: drs, ds: ds, remoteDAGLoader: remoteDAGLoader} } // GetDAG implements core.DBClient. func (o *dbClient) GetDAG(ctx context.Context, name string) (*core.DAG, error) { - return o.ds.GetDetails(ctx, name) + // Guard against nil DAG store + if o.ds == nil { + logger.Info(ctx, "No local DAG store, trying remote fallback", tag.DAG(name)) + if o.remoteDAGLoader == nil { + return nil, fmt.Errorf("no local DAG store and no remote loader configured for DAG %s", name) + } + remoteDAG, remoteErr := o.remoteDAGLoader(ctx, name) + if remoteErr != nil { + logger.Warn(ctx, "Remote DAG fallback failed", tag.DAG(name), tag.Error(remoteErr)) + return nil, fmt.Errorf("remote DAG load failed for %s: %w", name, remoteErr) + } + if remoteDAG == nil { + return nil, fmt.Errorf("DAG %s not found locally or remotely", name) + } + logger.Info(ctx, "DAG loaded from remote fallback", tag.DAG(name)) + return remoteDAG, nil + } + + dag, err := o.ds.GetDetails(ctx, name) + if err == nil { + return dag, nil + } + // Only fallback to remote for not-found errors; propagate other errors directly + if !errors.Is(err, exec.ErrDAGNotFound) { + return nil, err + } + // Try remote fallback if configured + if o.remoteDAGLoader == nil { + return nil, err + } + logger.Info(ctx, "DAG not found locally, trying remote fallback", + tag.DAG(name), + ) + remoteDAG, remoteErr := o.remoteDAGLoader(ctx, name) + if remoteErr != nil { + logger.Warn(ctx, "Remote DAG fallback failed", + tag.DAG(name), + tag.Error(remoteErr), + ) + return nil, err // Return the original local error + } + if remoteDAG == nil { + return nil, err // Return the original local error + } + logger.Info(ctx, "DAG loaded from remote fallback", + tag.DAG(name), + ) + return remoteDAG, nil } func (o *dbClient) GetSubDAGRunStatus(ctx context.Context, dagRunID string, rootDAGRun exec.DAGRunRef) (*runtime.RunStatus, error) { diff --git a/internal/runtime/agent/dbclient_test.go b/internal/runtime/agent/dbclient_test.go index 436e423531..bcac059d5b 100644 --- a/internal/runtime/agent/dbclient_test.go +++ b/internal/runtime/agent/dbclient_test.go @@ -48,7 +48,7 @@ func TestDBClient_GetSubDAGRunStatus(t *testing.T) { }, }, nil) // Create dbClient - dbClient := newDBClient(mockDAGRunStore, mockDAGStore) + dbClient := newDBClient(mockDAGRunStore, mockDAGStore, nil) // Test GetSubDAGRunStatus st, err := dbClient.GetSubDAGRunStatus(ctx, subRunID, rootRef) @@ -77,7 +77,7 @@ func TestDBClient_GetSubDAGRunStatus(t *testing.T) { mockDAGRunStore.On("FindSubAttempt", ctx, rootRef, subRunID).Return(nil, errors.New("not found")) - dbClient := newDBClient(mockDAGRunStore, mockDAGStore) + dbClient := newDBClient(mockDAGRunStore, mockDAGStore, nil) status, err := dbClient.GetSubDAGRunStatus(ctx, subRunID, rootRef) assert.Error(t, err) @@ -106,7 +106,7 @@ func TestDBClient_IsSubDAGRunCompleted(t *testing.T) { Status: core.Succeeded, }, nil) - dbClient := newDBClient(mockDAGRunStore, mockDAGStore) + dbClient := newDBClient(mockDAGRunStore, mockDAGStore, nil) completed, err := dbClient.IsSubDAGRunCompleted(ctx, subRunID, rootRef) require.NoError(t, err) @@ -133,7 +133,7 @@ func TestDBClient_IsSubDAGRunCompleted(t *testing.T) { Status: core.Failed, }, nil) - dbClient := newDBClient(mockDAGRunStore, mockDAGStore) + dbClient := newDBClient(mockDAGRunStore, mockDAGStore, nil) completed, err := dbClient.IsSubDAGRunCompleted(ctx, subRunID, rootRef) require.NoError(t, err) @@ -153,7 +153,7 @@ func TestDBClient_IsSubDAGRunCompleted(t *testing.T) { mockDAGRunStore.On("FindSubAttempt", ctx, rootRef, subRunID).Return(nil, errors.New("not found")) - dbClient := newDBClient(mockDAGRunStore, mockDAGStore) + dbClient := newDBClient(mockDAGRunStore, mockDAGStore, nil) completed, err := dbClient.IsSubDAGRunCompleted(ctx, subRunID, rootRef) assert.Error(t, err) @@ -361,3 +361,134 @@ func (m *mockDAGRunStore) RenameDAGRuns(ctx context.Context, oldName, newName st args := m.Called(ctx, oldName, newName) return args.Error(0) } + +func TestDBClient_GetDAG(t *testing.T) { + testDAG := &core.DAG{Name: "test-dag"} + + // Helper to create a mock DAG store with pre-set GetDetails expectations. + setupMockDS := func(name string, dag *core.DAG, err error) *mockDAGStore { + m := new(mockDAGStore) + m.On("GetDetails", mock.Anything, name, mock.Anything).Return(dag, err) + return m + } + + tests := []struct { + name string + ds exec.DAGStore // nil means no local store + remoteLoader RemoteDAGLoader // nil means no remote loader + expectDAG *core.DAG + expectError bool + expectErrContains string + }{ + { + name: "local hit returns dag", + ds: setupMockDS("test-dag", testDAG, nil), + remoteLoader: nil, + expectDAG: testDAG, + expectError: false, + }, + { + name: "local not-found + remote hit", + ds: setupMockDS("test-dag", nil, exec.ErrDAGNotFound), + remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) { + return testDAG, nil + }, + expectDAG: testDAG, + expectError: false, + }, + { + name: "local not-found + remote returns nil", + ds: setupMockDS("test-dag", nil, exec.ErrDAGNotFound), + remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) { + return nil, nil + }, + expectError: true, + expectErrContains: "DAG is not found", + }, + { + name: "local not-found + remote returns error", + ds: setupMockDS("test-dag", nil, exec.ErrDAGNotFound), + remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) { + return nil, errors.New("remote unavailable") + }, + expectError: true, + expectErrContains: "DAG is not found", + }, + { + name: "local not-found + no remote loader", + ds: setupMockDS("test-dag", nil, exec.ErrDAGNotFound), + remoteLoader: nil, + expectError: true, + expectErrContains: "DAG is not found", + }, + { + name: "local non-not-found error propagates immediately", + ds: setupMockDS("test-dag", nil, errors.New("permission denied")), + remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) { + return testDAG, nil // should NOT be called + }, + expectError: true, + expectErrContains: "permission denied", + }, + { + name: "nil ds + remote hit", + ds: nil, + remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) { + return testDAG, nil + }, + expectDAG: testDAG, + expectError: false, + }, + { + name: "nil ds + remote returns nil dag", + ds: nil, + remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) { + return nil, nil + }, + expectError: true, + expectErrContains: "not found locally or remotely", + }, + { + name: "nil ds + remote returns error", + ds: nil, + remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) { + return nil, errors.New("remote unavailable") + }, + expectError: true, + expectErrContains: "remote DAG load failed", + }, + { + name: "nil ds + no remote loader", + ds: nil, + remoteLoader: nil, + expectError: true, + expectErrContains: "no local DAG store and no remote loader", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + mockDRS := new(mockDAGRunStore) + client := newDBClient(mockDRS, tt.ds, tt.remoteLoader) + + dag, err := client.GetDAG(ctx, "test-dag") + + if tt.expectError { + require.Error(t, err) + if tt.expectErrContains != "" { + assert.Contains(t, err.Error(), tt.expectErrContains) + } + assert.Nil(t, dag) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expectDAG, dag) + } + + // Assert mock expectations for the DAG store (when a mock is used). + if mockDS, ok := tt.ds.(*mockDAGStore); ok { + mockDS.AssertExpectations(t) + } + }) + } +} diff --git a/internal/service/chatbridge/notifications.go b/internal/service/chatbridge/notifications.go index 604f865e65..455706a225 100644 --- a/internal/service/chatbridge/notifications.go +++ b/internal/service/chatbridge/notifications.go @@ -358,6 +358,31 @@ func (b *NotificationBatcher) DiscardDestinations(destinations []string) { } } +// flushBucketsLocked synchronously moves buffered buckets of the given class +// to the ready queue, stopping any pending timers. It is safe to call when the +// batcher has not been stopped. +func (b *NotificationBatcher) flushBucketsLocked(class NotificationClass) { + b.mu.Lock() + type bucketRef struct { + key string + id uint64 + } + refs := make([]bucketRef, 0) + for key, bucket := range b.buckets { + if bucket != nil && bucket.class == class { + if bucket.timer != nil { + bucket.timer.Stop() + } + refs = append(refs, bucketRef{key: key, id: bucket.id}) + } + } + b.mu.Unlock() + + for _, ref := range refs { + b.readyBucket(ref.key, ref.id) + } +} + func (b *NotificationBatcher) readyBucket(bucketKey string, bucketID uint64) { b.mu.Lock() if b.stopped { diff --git a/internal/service/chatbridge/notifications_test.go b/internal/service/chatbridge/notifications_test.go index ce09281660..b4218afca2 100644 --- a/internal/service/chatbridge/notifications_test.go +++ b/internal/service/chatbridge/notifications_test.go @@ -226,11 +226,12 @@ func TestNotificationBatcher_DiscardDestinationsRemovesReadyAndBufferedBatches(t Status: core.Failed, }))) - require.Eventually(t, func() bool { - batcher.mu.Lock() - defer batcher.mu.Unlock() - return len(batcher.ready) == 2 - }, time.Second, 10*time.Millisecond) + // Synchronously flush success-class buckets to ready + batcher.flushBucketsLocked(NotificationClassSuccessDigest) + + batcher.mu.Lock() + require.Len(t, batcher.ready, 2) + batcher.mu.Unlock() batcher.DiscardDestinations([]string{"ready-remove", "buffered-remove"}) diff --git a/internal/service/coordinator/client.go b/internal/service/coordinator/client.go index f75a28d1ee..81c11a9d29 100644 --- a/internal/service/coordinator/client.go +++ b/internal/service/coordinator/client.go @@ -81,6 +81,10 @@ type Client interface { // GetDAGRunStatus is inherited from execution.Dispatcher + // GetDAG retrieves a DAG definition (raw YAML) from the coordinator's DAG store. + // Used as a fallback when a worker's local DAG store misses a definition. + GetDAG(ctx context.Context, name string) (string, error) + // Metrics returns the metrics for the coordinator client Metrics() Metrics } @@ -1002,6 +1006,38 @@ func (cli *clientImpl) GetDAGRunStatus(ctx context.Context, dagName, dagRunID st return result, nil } +// GetDAG retrieves a DAG definition (raw YAML spec) from the coordinator's DAG store. +func (cli *clientImpl) GetDAG(ctx context.Context, name string) (string, error) { + members, err := cli.getCoordinatorMembers(ctx) + if err != nil { + return "", err + } + + req := &coordinatorv1.GetDAGRequest{ + Name: name, + } + + var resp *coordinatorv1.GetDAGResponse + err = cli.attemptCall(ctx, members, func(ctx context.Context, member exec.HostInfo, client *client) error { + var callErr error + resp, callErr = client.client.GetDAG(ctx, req) + if callErr != nil { + return fmt.Errorf("get DAG definition failed: %w", callErr) + } + if resp == nil { + return fmt.Errorf("coordinator %s returned empty DAG definition response", member.ID) + } + if resp.Error != "" { + return fmt.Errorf("coordinator %s get DAG failed: %s", member.ID, resp.Error) + } + return nil + }) + if err != nil { + return "", err + } + return resp.Spec, nil +} + // RequestCancel requests cancellation of a DAG run through the coordinator func (cli *clientImpl) RequestCancel(ctx context.Context, dagName, dagRunID string, rootRef *exec.DAGRunRef) error { members, err := cli.getCoordinatorMembers(ctx) diff --git a/internal/service/coordinator/handler.go b/internal/service/coordinator/handler.go index 486acca323..10b55105e5 100644 --- a/internal/service/coordinator/handler.go +++ b/internal/service/coordinator/handler.go @@ -104,6 +104,7 @@ type Handler struct { workerHeartbeatStore exec.WorkerHeartbeatStore // Shared worker presence dagRunLeaseStore exec.DAGRunLeaseStore // Shared distributed run leases activeDistributedRunStore exec.ActiveDistributedRunStore // Shared active distributed attempt index + dagStore exec.DAGStore // DAG definitions for the GetDAG RPC // Open attempts cache for status persistence attemptsMu sync.RWMutex @@ -165,6 +166,10 @@ type HandlerConfig struct { // active distributed attempt index used by zombie detection. ActiveDistributedRunStore exec.ActiveDistributedRunStore + // DAGStore serves DAG definitions for the GetDAG RPC. + // Optional - when nil, GetDAG returns Unimplemented. + DAGStore exec.DAGStore + // StaleHeartbeatThreshold is the duration after which a worker's heartbeat // is considered stale. Defaults to 30 seconds if not set. StaleHeartbeatThreshold time.Duration @@ -212,6 +217,7 @@ func NewHandler(cfg HandlerConfig) *Handler { workerHeartbeatStore: cfg.WorkerHeartbeatStore, dagRunLeaseStore: cfg.DAGRunLeaseStore, activeDistributedRunStore: cfg.ActiveDistributedRunStore, + dagStore: cfg.DAGStore, staleHeartbeatThreshold: cfg.StaleHeartbeatThreshold, staleLeaseThreshold: cfg.StaleLeaseThreshold, eventService: cfg.EventService, @@ -1887,6 +1893,20 @@ func (h *Handler) GetDAGRunStatus(ctx context.Context, req *coordinatorv1.GetDAG }, nil } +// GetDAG retrieves the raw specification of a DAG by name. +// This is used by workers to obtain DAG definitions that may not be available locally +// (e.g., in a shared-nothing worker architecture). +func (h *Handler) GetDAG(ctx context.Context, req *coordinatorv1.GetDAGRequest) (*coordinatorv1.GetDAGResponse, error) { + if h.dagStore == nil { + return nil, status.Error(codes.Unimplemented, "DAG store not configured: GetDAG is not available") + } + spec, err := h.dagStore.GetSpec(ctx, req.Name) + if err != nil { + return &coordinatorv1.GetDAGResponse{Error: err.Error()}, nil + } + return &coordinatorv1.GetDAGResponse{Spec: spec}, nil +} + // StartZombieDetector starts a background goroutine that periodically checks for zombie runs. // It detects workers that have stopped sending heartbeats and marks their running tasks as failed. // The interval parameter controls how often the detector runs (recommended: 45 seconds). diff --git a/internal/service/frontend/api/v1/workers_internal_test.go b/internal/service/frontend/api/v1/workers_internal_test.go index 5260d017f1..b2895f43be 100644 --- a/internal/service/frontend/api/v1/workers_internal_test.go +++ b/internal/service/frontend/api/v1/workers_internal_test.go @@ -89,6 +89,10 @@ func (s *stubCoordinatorClient) Metrics() coordinator.Metrics { return coordinator.Metrics{} } +func (s *stubCoordinatorClient) GetDAG(_ context.Context, _ string) (string, error) { + return "", errors.New("not implemented") +} + func TestAPIGetWorkers_ReturnsPartialResultsWithErrors(t *testing.T) { t.Parallel() diff --git a/internal/service/worker/coordreport/status_pusher_test.go b/internal/service/worker/coordreport/status_pusher_test.go index a6585010b7..edd70e16e4 100644 --- a/internal/service/worker/coordreport/status_pusher_test.go +++ b/internal/service/worker/coordreport/status_pusher_test.go @@ -98,6 +98,10 @@ func (m *mockCoordinatorClient) GetDAGRunStatus(_ context.Context, _, _ string, panic("GetDAGRunStatus not implemented in mock") } +func (m *mockCoordinatorClient) GetDAG(_ context.Context, _ string) (string, error) { + panic("GetDAG not implemented in mock") +} + func (m *mockCoordinatorClient) RequestCancel(_ context.Context, _, _ string, _ *exec.DAGRunRef) error { panic("RequestCancel not implemented in mock") } diff --git a/internal/service/worker/poller_test.go b/internal/service/worker/poller_test.go index 79b5912ed4..896b8c2843 100644 --- a/internal/service/worker/poller_test.go +++ b/internal/service/worker/poller_test.go @@ -508,6 +508,10 @@ func (m *mockCoordinatorCli) GetDAGRunStatus(_ context.Context, _, _ string, _ * return &exec.DAGRunStatusResult{Found: false}, nil } +func (m *mockCoordinatorCli) GetDAG(_ context.Context, _ string) (string, error) { + return "", nil +} + func (m *mockCoordinatorCli) RequestCancel(_ context.Context, _, _ string, _ *exec.DAGRunRef) error { return nil } diff --git a/internal/service/worker/remote_handler.go b/internal/service/worker/remote_handler.go index d0dfd0d084..539fd34968 100644 --- a/internal/service/worker/remote_handler.go +++ b/internal/service/worker/remote_handler.go @@ -634,6 +634,23 @@ func (h *remoteTaskHandler) executeDAGRun( DAGRunArtifactDir: h.config.Paths.ArtifactDir, }) + // Create a remote DAG loader that fetches DAG definitions from the coordinator + // as a fallback when the local DAG store misses. + remoteDAGLoader := rtagent.RemoteDAGLoader(func(ctx context.Context, name string) (*core.DAG, error) { + dagYAML, err := h.coordinatorClient.GetDAG(ctx, name) + if err != nil { + return nil, err + } + if dagYAML == "" { + return nil, nil + } + dag, loadErr := spec.LoadYAML(ctx, []byte(dagYAML), spec.WithName(name)) + if loadErr != nil { + return nil, fmt.Errorf("failed to parse DAG from remote: %w", loadErr) + } + return dag, nil + }) + // Build agent options opts := rtagent.Options{ ParentDAGRun: parent, @@ -650,6 +667,7 @@ func (h *remoteTaskHandler) executeDAGRun( ProfileName: profileName, ServiceRegistry: h.serviceRegistry, SubWorkflowRunnerFactory: subWorkflowRunnerFactory, + RemoteDAGLoader: remoteDAGLoader, RootDAGRun: root, PeerConfig: h.peerConfig, DefaultExecMode: h.config.DefaultExecMode, diff --git a/internal/service/worker/remote_handler_test.go b/internal/service/worker/remote_handler_test.go index 347f00d013..4a8cb6e6c2 100644 --- a/internal/service/worker/remote_handler_test.go +++ b/internal/service/worker/remote_handler_test.go @@ -310,6 +310,7 @@ type mockRemoteCoordinatorClient struct { StreamArtifactsFunc func(ctx context.Context) (coordinatorv1.CoordinatorService_StreamArtifactsClient, error) StreamArtifactsToFunc func(ctx context.Context, owner exec.HostInfo) (coordinatorv1.CoordinatorService_StreamArtifactsClient, error) GetDAGRunStatusFunc func(ctx context.Context, dagName, dagRunID string, rootRef *exec.DAGRunRef) (*exec.DAGRunStatusResult, error) + GetDAGFunc func(ctx context.Context, name string) (string, error) DispatchFunc func(ctx context.Context, task *exec.DispatchTask) error PollFunc func(ctx context.Context, policy backoff.RetryPolicy, req *coordinatorv1.PollRequest) (*coordinatorv1.Task, error) HeartbeatFunc func(ctx context.Context, req *coordinatorv1.HeartbeatRequest) (*coordinatorv1.HeartbeatResponse, error) @@ -402,6 +403,13 @@ func (m *mockRemoteCoordinatorClient) GetDAGRunStatus(ctx context.Context, dagNa return &exec.DAGRunStatusResult{Found: false}, nil } +func (m *mockRemoteCoordinatorClient) GetDAG(ctx context.Context, name string) (string, error) { + if m.GetDAGFunc != nil { + return m.GetDAGFunc(ctx, name) + } + return "", nil +} + func (m *mockRemoteCoordinatorClient) Dispatch(ctx context.Context, task *exec.DispatchTask) error { if m.DispatchFunc != nil { return m.DispatchFunc(ctx, task) diff --git a/proto/coordinator/v1/coordinator.pb.go b/proto/coordinator/v1/coordinator.pb.go index ebb3d4ea6f..2665d16944 100644 --- a/proto/coordinator/v1/coordinator.pb.go +++ b/proto/coordinator/v1/coordinator.pb.go @@ -4663,6 +4663,138 @@ func (b0 ListStateResponse_builder) Build() *ListStateResponse { return m0 } +// GetDAG retrieves a DAG definition (raw YAML spec) from the coordinator's DAG store. +// Used as a fallback when a worker's local DAG store misses a definition during +// sub-DAG execution. This eliminates race conditions in git-synced setups where +// a new DAG YAML may not yet be present on the worker. +type GetDAGRequest struct { + state protoimpl.MessageState `protogen:"hybrid.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // DAG name to look up + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetDAGRequest) Reset() { + *x = GetDAGRequest{} + mi := &file_proto_coordinator_v1_coordinator_proto_msgTypes[45] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetDAGRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetDAGRequest) ProtoMessage() {} + +func (x *GetDAGRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_coordinator_v1_coordinator_proto_msgTypes[45] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +func (x *GetDAGRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *GetDAGRequest) SetName(v string) { + x.Name = v +} + +type GetDAGRequest_builder struct { + _ [0]func() // Prevents comparability and use of unkeyed literals for the builder. + + Name string +} + +func (b0 GetDAGRequest_builder) Build() *GetDAGRequest { + m0 := &GetDAGRequest{} + b, x := &b0, m0 + _, _ = b, x + x.Name = b.Name + return m0 +} + +type GetDAGResponse struct { + state protoimpl.MessageState `protogen:"hybrid.v1"` + Spec string `protobuf:"bytes,1,opt,name=spec,proto3" json:"spec,omitempty"` // Raw YAML content of the DAG definition (empty if not found) + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` // Error description when the DAG cannot be retrieved + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetDAGResponse) Reset() { + *x = GetDAGResponse{} + mi := &file_proto_coordinator_v1_coordinator_proto_msgTypes[46] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetDAGResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetDAGResponse) ProtoMessage() {} + +func (x *GetDAGResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_coordinator_v1_coordinator_proto_msgTypes[46] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +func (x *GetDAGResponse) GetSpec() string { + if x != nil { + return x.Spec + } + return "" +} + +func (x *GetDAGResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *GetDAGResponse) SetSpec(v string) { + x.Spec = v +} + +func (x *GetDAGResponse) SetError(v string) { + x.Error = v +} + +type GetDAGResponse_builder struct { + _ [0]func() // Prevents comparability and use of unkeyed literals for the builder. + + Spec string + Error string +} + +func (b0 GetDAGResponse_builder) Build() *GetDAGResponse { + m0 := &GetDAGResponse{} + b, x := &b0, m0 + _, _ = b, x + x.Spec = b.Spec + x.Error = b.Error + return m0 +} + var File_proto_coordinator_v1_coordinator_proto protoreflect.FileDescriptor const file_proto_coordinator_v1_coordinator_proto_rawDesc = "" + @@ -4922,7 +5054,12 @@ const file_proto_coordinator_v1_coordinator_proto_rawDesc = "" + "key_prefix\x18\x03 \x01(\tR\tkeyPrefix\x12\x14\n" + "\x05limit\x18\x04 \x01(\x05R\x05limit\"I\n" + "\x11ListStateResponse\x124\n" + - "\aentries\x18\x01 \x03(\v2\x1a.coordinator.v1.StateEntryR\aentries*P\n" + + "\aentries\x18\x01 \x03(\v2\x1a.coordinator.v1.StateEntryR\aentries\"#\n" + + "\rGetDAGRequest\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\":\n" + + "\x0eGetDAGResponse\x12\x12\n" + + "\x04spec\x18\x01 \x01(\tR\x04spec\x12\x14\n" + + "\x05error\x18\x02 \x01(\tR\x05error*P\n" + "\tOperation\x12\x19\n" + "\x15OPERATION_UNSPECIFIED\x10\x00\x12\x13\n" + "\x0fOPERATION_START\x10\x01\x12\x13\n" + @@ -4936,7 +5073,7 @@ const file_proto_coordinator_v1_coordinator_proto_rawDesc = "" + "\x1bLOG_STREAM_TYPE_UNSPECIFIED\x10\x00\x12\x1a\n" + "\x16LOG_STREAM_TYPE_STDOUT\x10\x01\x12\x1a\n" + "\x16LOG_STREAM_TYPE_STDERR\x10\x02\x12\x1d\n" + - "\x19LOG_STREAM_TYPE_SCHEDULER\x10\x032\xd3\f\n" + + "\x19LOG_STREAM_TYPE_SCHEDULER\x10\x032\x9c\r\n" + "\x12CoordinatorService\x12A\n" + "\x04Poll\x12\x1b.coordinator.v1.PollRequest\x1a\x1c.coordinator.v1.PollResponse\x12M\n" + "\bDispatch\x12\x1f.coordinator.v1.DispatchRequest\x1a .coordinator.v1.DispatchResponse\x12S\n" + @@ -4957,10 +5094,11 @@ const file_proto_coordinator_v1_coordinator_proto_rawDesc = "" + "\bGetState\x12\x1f.coordinator.v1.GetStateRequest\x1a .coordinator.v1.GetStateResponse\x12M\n" + "\bPutState\x12\x1f.coordinator.v1.PutStateRequest\x1a .coordinator.v1.PutStateResponse\x12V\n" + "\vDeleteState\x12\".coordinator.v1.DeleteStateRequest\x1a#.coordinator.v1.DeleteStateResponse\x12P\n" + - "\tListState\x12 .coordinator.v1.ListStateRequest\x1a!.coordinator.v1.ListStateResponseB>ZZ coordinator.v1.PollRequest.LabelsEntry + 50, // 0: coordinator.v1.PollRequest.labels:type_name -> coordinator.v1.PollRequest.LabelsEntry 7, // 1: coordinator.v1.PollResponse.task:type_name -> coordinator.v1.Task 7, // 2: coordinator.v1.DispatchRequest.task:type_name -> coordinator.v1.Task 0, // 3: coordinator.v1.Task.operation:type_name -> coordinator.v1.Operation - 49, // 4: coordinator.v1.Task.worker_selector:type_name -> coordinator.v1.Task.WorkerSelectorEntry + 51, // 4: coordinator.v1.Task.worker_selector:type_name -> coordinator.v1.Task.WorkerSelectorEntry 22, // 5: coordinator.v1.Task.previous_status:type_name -> coordinator.v1.DAGRunStatusProto 10, // 6: coordinator.v1.GetWorkersResponse.workers:type_name -> coordinator.v1.WorkerInfo - 50, // 7: coordinator.v1.WorkerInfo.labels:type_name -> coordinator.v1.WorkerInfo.LabelsEntry + 52, // 7: coordinator.v1.WorkerInfo.labels:type_name -> coordinator.v1.WorkerInfo.LabelsEntry 19, // 8: coordinator.v1.WorkerInfo.running_tasks:type_name -> coordinator.v1.RunningTask 1, // 9: coordinator.v1.WorkerInfo.health_status:type_name -> coordinator.v1.WorkerHealthStatus - 51, // 10: coordinator.v1.HeartbeatRequest.labels:type_name -> coordinator.v1.HeartbeatRequest.LabelsEntry + 53, // 10: coordinator.v1.HeartbeatRequest.labels:type_name -> coordinator.v1.HeartbeatRequest.LabelsEntry 18, // 11: coordinator.v1.HeartbeatRequest.stats:type_name -> coordinator.v1.WorkerStats 17, // 12: coordinator.v1.HeartbeatResponse.cancelled_runs:type_name -> coordinator.v1.CancelledRun 19, // 13: coordinator.v1.RunHeartbeatRequest.running_tasks:type_name -> coordinator.v1.RunningTask @@ -5063,26 +5203,28 @@ var file_proto_coordinator_v1_coordinator_proto_depIdxs = []int32{ 42, // 44: coordinator.v1.CoordinatorService.PutState:input_type -> coordinator.v1.PutStateRequest 44, // 45: coordinator.v1.CoordinatorService.DeleteState:input_type -> coordinator.v1.DeleteStateRequest 46, // 46: coordinator.v1.CoordinatorService.ListState:input_type -> coordinator.v1.ListStateRequest - 4, // 47: coordinator.v1.CoordinatorService.Poll:output_type -> coordinator.v1.PollResponse - 6, // 48: coordinator.v1.CoordinatorService.Dispatch:output_type -> coordinator.v1.DispatchResponse - 9, // 49: coordinator.v1.CoordinatorService.GetWorkers:output_type -> coordinator.v1.GetWorkersResponse - 12, // 50: coordinator.v1.CoordinatorService.Heartbeat:output_type -> coordinator.v1.HeartbeatResponse - 14, // 51: coordinator.v1.CoordinatorService.AckTaskClaim:output_type -> coordinator.v1.AckTaskClaimResponse - 16, // 52: coordinator.v1.CoordinatorService.RunHeartbeat:output_type -> coordinator.v1.RunHeartbeatResponse - 21, // 53: coordinator.v1.CoordinatorService.ReportStatus:output_type -> coordinator.v1.ReportStatusResponse - 24, // 54: coordinator.v1.CoordinatorService.StreamLogs:output_type -> coordinator.v1.StreamLogsResponse - 26, // 55: coordinator.v1.CoordinatorService.StreamArtifacts:output_type -> coordinator.v1.StreamArtifactsResponse - 29, // 56: coordinator.v1.CoordinatorService.PutWorkspaceBundle:output_type -> coordinator.v1.PutWorkspaceBundleResponse - 31, // 57: coordinator.v1.CoordinatorService.HasWorkspaceBundle:output_type -> coordinator.v1.HasWorkspaceBundleResponse - 28, // 58: coordinator.v1.CoordinatorService.GetWorkspaceBundle:output_type -> coordinator.v1.WorkspaceBundleChunk - 34, // 59: coordinator.v1.CoordinatorService.GetDAGRunStatus:output_type -> coordinator.v1.GetDAGRunStatusResponse - 36, // 60: coordinator.v1.CoordinatorService.RequestCancel:output_type -> coordinator.v1.RequestCancelResponse - 41, // 61: coordinator.v1.CoordinatorService.GetState:output_type -> coordinator.v1.GetStateResponse - 43, // 62: coordinator.v1.CoordinatorService.PutState:output_type -> coordinator.v1.PutStateResponse - 45, // 63: coordinator.v1.CoordinatorService.DeleteState:output_type -> coordinator.v1.DeleteStateResponse - 47, // 64: coordinator.v1.CoordinatorService.ListState:output_type -> coordinator.v1.ListStateResponse - 47, // [47:65] is the sub-list for method output_type - 29, // [29:47] is the sub-list for method input_type + 48, // 47: coordinator.v1.CoordinatorService.GetDAG:input_type -> coordinator.v1.GetDAGRequest + 4, // 48: coordinator.v1.CoordinatorService.Poll:output_type -> coordinator.v1.PollResponse + 6, // 49: coordinator.v1.CoordinatorService.Dispatch:output_type -> coordinator.v1.DispatchResponse + 9, // 50: coordinator.v1.CoordinatorService.GetWorkers:output_type -> coordinator.v1.GetWorkersResponse + 12, // 51: coordinator.v1.CoordinatorService.Heartbeat:output_type -> coordinator.v1.HeartbeatResponse + 14, // 52: coordinator.v1.CoordinatorService.AckTaskClaim:output_type -> coordinator.v1.AckTaskClaimResponse + 16, // 53: coordinator.v1.CoordinatorService.RunHeartbeat:output_type -> coordinator.v1.RunHeartbeatResponse + 21, // 54: coordinator.v1.CoordinatorService.ReportStatus:output_type -> coordinator.v1.ReportStatusResponse + 24, // 55: coordinator.v1.CoordinatorService.StreamLogs:output_type -> coordinator.v1.StreamLogsResponse + 26, // 56: coordinator.v1.CoordinatorService.StreamArtifacts:output_type -> coordinator.v1.StreamArtifactsResponse + 29, // 57: coordinator.v1.CoordinatorService.PutWorkspaceBundle:output_type -> coordinator.v1.PutWorkspaceBundleResponse + 31, // 58: coordinator.v1.CoordinatorService.HasWorkspaceBundle:output_type -> coordinator.v1.HasWorkspaceBundleResponse + 28, // 59: coordinator.v1.CoordinatorService.GetWorkspaceBundle:output_type -> coordinator.v1.WorkspaceBundleChunk + 34, // 60: coordinator.v1.CoordinatorService.GetDAGRunStatus:output_type -> coordinator.v1.GetDAGRunStatusResponse + 36, // 61: coordinator.v1.CoordinatorService.RequestCancel:output_type -> coordinator.v1.RequestCancelResponse + 41, // 62: coordinator.v1.CoordinatorService.GetState:output_type -> coordinator.v1.GetStateResponse + 43, // 63: coordinator.v1.CoordinatorService.PutState:output_type -> coordinator.v1.PutStateResponse + 45, // 64: coordinator.v1.CoordinatorService.DeleteState:output_type -> coordinator.v1.DeleteStateResponse + 47, // 65: coordinator.v1.CoordinatorService.ListState:output_type -> coordinator.v1.ListStateResponse + 49, // 66: coordinator.v1.CoordinatorService.GetDAG:output_type -> coordinator.v1.GetDAGResponse + 48, // [48:67] is the sub-list for method output_type + 29, // [29:48] is the sub-list for method input_type 29, // [29:29] is the sub-list for extension type_name 29, // [29:29] is the sub-list for extension extendee 0, // [0:29] is the sub-list for field type_name @@ -5099,7 +5241,7 @@ func file_proto_coordinator_v1_coordinator_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_coordinator_v1_coordinator_proto_rawDesc), len(file_proto_coordinator_v1_coordinator_proto_rawDesc)), NumEnums: 3, - NumMessages: 49, + NumMessages: 51, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/coordinator/v1/coordinator.proto b/proto/coordinator/v1/coordinator.proto index 72be071236..e6bd4e5b36 100644 --- a/proto/coordinator/v1/coordinator.proto +++ b/proto/coordinator/v1/coordinator.proto @@ -66,6 +66,11 @@ service CoordinatorService { // ListState lists persistent DAG state entries by scope, namespace, and key prefix. rpc ListState (ListStateRequest) returns (ListStateResponse); + + // GetDAG retrieves a DAG definition (raw YAML spec) from the coordinator's DAG store. + // Used as a fallback when a worker's local DAG store misses a definition during + // sub-DAG execution. + rpc GetDAG (GetDAGRequest) returns (GetDAGResponse); } // Request message for polling a task. @@ -451,3 +456,16 @@ message ListStateRequest { message ListStateResponse { repeated StateEntry entries = 1; } + +// GetDAG retrieves a DAG definition (raw YAML spec) from the coordinator's DAG store. +// Used as a fallback when a worker's local DAG store misses a definition during +// sub-DAG execution. This eliminates race conditions in git-synced setups where +// a new DAG YAML may not yet be present on the worker. +message GetDAGRequest { + string name = 1; // DAG name to look up +} + +message GetDAGResponse { + string spec = 1; // Raw YAML content of the DAG definition (empty if not found) + string error = 2; // Error description when the DAG cannot be retrieved +} diff --git a/proto/coordinator/v1/coordinator_grpc.pb.go b/proto/coordinator/v1/coordinator_grpc.pb.go index 5d89f3bb7b..2e35aae034 100644 --- a/proto/coordinator/v1/coordinator_grpc.pb.go +++ b/proto/coordinator/v1/coordinator_grpc.pb.go @@ -40,6 +40,7 @@ const ( CoordinatorService_PutState_FullMethodName = "/coordinator.v1.CoordinatorService/PutState" CoordinatorService_DeleteState_FullMethodName = "/coordinator.v1.CoordinatorService/DeleteState" CoordinatorService_ListState_FullMethodName = "/coordinator.v1.CoordinatorService/ListState" + CoordinatorService_GetDAG_FullMethodName = "/coordinator.v1.CoordinatorService/GetDAG" ) // CoordinatorServiceClient is the client API for CoordinatorService service. @@ -91,6 +92,10 @@ type CoordinatorServiceClient interface { DeleteState(ctx context.Context, in *DeleteStateRequest, opts ...grpc.CallOption) (*DeleteStateResponse, error) // ListState lists persistent DAG state entries by scope, namespace, and key prefix. ListState(ctx context.Context, in *ListStateRequest, opts ...grpc.CallOption) (*ListStateResponse, error) + // GetDAG retrieves a DAG definition (raw YAML spec) from the coordinator's DAG store. + // Used as a fallback when a worker's local DAG store misses a definition during + // sub-DAG execution. + GetDAG(ctx context.Context, in *GetDAGRequest, opts ...grpc.CallOption) (*GetDAGResponse, error) } type coordinatorServiceClient struct { @@ -299,6 +304,16 @@ func (c *coordinatorServiceClient) ListState(ctx context.Context, in *ListStateR return out, nil } +func (c *coordinatorServiceClient) GetDAG(ctx context.Context, in *GetDAGRequest, opts ...grpc.CallOption) (*GetDAGResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetDAGResponse) + err := c.cc.Invoke(ctx, CoordinatorService_GetDAG_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // CoordinatorServiceServer is the server API for CoordinatorService service. // All implementations must embed UnimplementedCoordinatorServiceServer // for forward compatibility. @@ -348,6 +363,10 @@ type CoordinatorServiceServer interface { DeleteState(context.Context, *DeleteStateRequest) (*DeleteStateResponse, error) // ListState lists persistent DAG state entries by scope, namespace, and key prefix. ListState(context.Context, *ListStateRequest) (*ListStateResponse, error) + // GetDAG retrieves a DAG definition (raw YAML spec) from the coordinator's DAG store. + // Used as a fallback when a worker's local DAG store misses a definition during + // sub-DAG execution. + GetDAG(context.Context, *GetDAGRequest) (*GetDAGResponse, error) mustEmbedUnimplementedCoordinatorServiceServer() } @@ -412,6 +431,9 @@ func (UnimplementedCoordinatorServiceServer) DeleteState(context.Context, *Delet func (UnimplementedCoordinatorServiceServer) ListState(context.Context, *ListStateRequest) (*ListStateResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ListState not implemented") } +func (UnimplementedCoordinatorServiceServer) GetDAG(context.Context, *GetDAGRequest) (*GetDAGResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetDAG not implemented") +} func (UnimplementedCoordinatorServiceServer) mustEmbedUnimplementedCoordinatorServiceServer() {} func (UnimplementedCoordinatorServiceServer) testEmbeddedByValue() {} @@ -717,6 +739,24 @@ func _CoordinatorService_ListState_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _CoordinatorService_GetDAG_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetDAGRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CoordinatorServiceServer).GetDAG(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: CoordinatorService_GetDAG_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CoordinatorServiceServer).GetDAG(ctx, req.(*GetDAGRequest)) + } + return interceptor(ctx, in, info, handler) +} + // CoordinatorService_ServiceDesc is the grpc.ServiceDesc for CoordinatorService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -780,6 +820,10 @@ var CoordinatorService_ServiceDesc = grpc.ServiceDesc{ MethodName: "ListState", Handler: _CoordinatorService_ListState_Handler, }, + { + MethodName: "GetDAG", + Handler: _CoordinatorService_GetDAG_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/proto/coordinator/v1/coordinator_protoopaque.pb.go b/proto/coordinator/v1/coordinator_protoopaque.pb.go index 8571b41080..6ec3a862c3 100644 --- a/proto/coordinator/v1/coordinator_protoopaque.pb.go +++ b/proto/coordinator/v1/coordinator_protoopaque.pb.go @@ -4650,6 +4650,138 @@ func (b0 ListStateResponse_builder) Build() *ListStateResponse { return m0 } +// GetDAG retrieves a DAG definition (raw YAML spec) from the coordinator's DAG store. +// Used as a fallback when a worker's local DAG store misses a definition during +// sub-DAG execution. This eliminates race conditions in git-synced setups where +// a new DAG YAML may not yet be present on the worker. +type GetDAGRequest struct { + state protoimpl.MessageState `protogen:"opaque.v1"` + xxx_hidden_Name string `protobuf:"bytes,1,opt,name=name,proto3"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetDAGRequest) Reset() { + *x = GetDAGRequest{} + mi := &file_proto_coordinator_v1_coordinator_proto_msgTypes[45] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetDAGRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetDAGRequest) ProtoMessage() {} + +func (x *GetDAGRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_coordinator_v1_coordinator_proto_msgTypes[45] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +func (x *GetDAGRequest) GetName() string { + if x != nil { + return x.xxx_hidden_Name + } + return "" +} + +func (x *GetDAGRequest) SetName(v string) { + x.xxx_hidden_Name = v +} + +type GetDAGRequest_builder struct { + _ [0]func() // Prevents comparability and use of unkeyed literals for the builder. + + Name string +} + +func (b0 GetDAGRequest_builder) Build() *GetDAGRequest { + m0 := &GetDAGRequest{} + b, x := &b0, m0 + _, _ = b, x + x.xxx_hidden_Name = b.Name + return m0 +} + +type GetDAGResponse struct { + state protoimpl.MessageState `protogen:"opaque.v1"` + xxx_hidden_Spec string `protobuf:"bytes,1,opt,name=spec,proto3"` + xxx_hidden_Error string `protobuf:"bytes,2,opt,name=error,proto3"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetDAGResponse) Reset() { + *x = GetDAGResponse{} + mi := &file_proto_coordinator_v1_coordinator_proto_msgTypes[46] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetDAGResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetDAGResponse) ProtoMessage() {} + +func (x *GetDAGResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_coordinator_v1_coordinator_proto_msgTypes[46] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +func (x *GetDAGResponse) GetSpec() string { + if x != nil { + return x.xxx_hidden_Spec + } + return "" +} + +func (x *GetDAGResponse) GetError() string { + if x != nil { + return x.xxx_hidden_Error + } + return "" +} + +func (x *GetDAGResponse) SetSpec(v string) { + x.xxx_hidden_Spec = v +} + +func (x *GetDAGResponse) SetError(v string) { + x.xxx_hidden_Error = v +} + +type GetDAGResponse_builder struct { + _ [0]func() // Prevents comparability and use of unkeyed literals for the builder. + + Spec string + Error string +} + +func (b0 GetDAGResponse_builder) Build() *GetDAGResponse { + m0 := &GetDAGResponse{} + b, x := &b0, m0 + _, _ = b, x + x.xxx_hidden_Spec = b.Spec + x.xxx_hidden_Error = b.Error + return m0 +} + var File_proto_coordinator_v1_coordinator_proto protoreflect.FileDescriptor const file_proto_coordinator_v1_coordinator_proto_rawDesc = "" + @@ -4909,7 +5041,12 @@ const file_proto_coordinator_v1_coordinator_proto_rawDesc = "" + "key_prefix\x18\x03 \x01(\tR\tkeyPrefix\x12\x14\n" + "\x05limit\x18\x04 \x01(\x05R\x05limit\"I\n" + "\x11ListStateResponse\x124\n" + - "\aentries\x18\x01 \x03(\v2\x1a.coordinator.v1.StateEntryR\aentries*P\n" + + "\aentries\x18\x01 \x03(\v2\x1a.coordinator.v1.StateEntryR\aentries\"#\n" + + "\rGetDAGRequest\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\":\n" + + "\x0eGetDAGResponse\x12\x12\n" + + "\x04spec\x18\x01 \x01(\tR\x04spec\x12\x14\n" + + "\x05error\x18\x02 \x01(\tR\x05error*P\n" + "\tOperation\x12\x19\n" + "\x15OPERATION_UNSPECIFIED\x10\x00\x12\x13\n" + "\x0fOPERATION_START\x10\x01\x12\x13\n" + @@ -4923,7 +5060,7 @@ const file_proto_coordinator_v1_coordinator_proto_rawDesc = "" + "\x1bLOG_STREAM_TYPE_UNSPECIFIED\x10\x00\x12\x1a\n" + "\x16LOG_STREAM_TYPE_STDOUT\x10\x01\x12\x1a\n" + "\x16LOG_STREAM_TYPE_STDERR\x10\x02\x12\x1d\n" + - "\x19LOG_STREAM_TYPE_SCHEDULER\x10\x032\xd3\f\n" + + "\x19LOG_STREAM_TYPE_SCHEDULER\x10\x032\x9c\r\n" + "\x12CoordinatorService\x12A\n" + "\x04Poll\x12\x1b.coordinator.v1.PollRequest\x1a\x1c.coordinator.v1.PollResponse\x12M\n" + "\bDispatch\x12\x1f.coordinator.v1.DispatchRequest\x1a .coordinator.v1.DispatchResponse\x12S\n" + @@ -4944,10 +5081,11 @@ const file_proto_coordinator_v1_coordinator_proto_rawDesc = "" + "\bGetState\x12\x1f.coordinator.v1.GetStateRequest\x1a .coordinator.v1.GetStateResponse\x12M\n" + "\bPutState\x12\x1f.coordinator.v1.PutStateRequest\x1a .coordinator.v1.PutStateResponse\x12V\n" + "\vDeleteState\x12\".coordinator.v1.DeleteStateRequest\x1a#.coordinator.v1.DeleteStateResponse\x12P\n" + - "\tListState\x12 .coordinator.v1.ListStateRequest\x1a!.coordinator.v1.ListStateResponseB>ZZ coordinator.v1.PollRequest.LabelsEntry + 50, // 0: coordinator.v1.PollRequest.labels:type_name -> coordinator.v1.PollRequest.LabelsEntry 7, // 1: coordinator.v1.PollResponse.task:type_name -> coordinator.v1.Task 7, // 2: coordinator.v1.DispatchRequest.task:type_name -> coordinator.v1.Task 0, // 3: coordinator.v1.Task.operation:type_name -> coordinator.v1.Operation - 49, // 4: coordinator.v1.Task.worker_selector:type_name -> coordinator.v1.Task.WorkerSelectorEntry + 51, // 4: coordinator.v1.Task.worker_selector:type_name -> coordinator.v1.Task.WorkerSelectorEntry 22, // 5: coordinator.v1.Task.previous_status:type_name -> coordinator.v1.DAGRunStatusProto 10, // 6: coordinator.v1.GetWorkersResponse.workers:type_name -> coordinator.v1.WorkerInfo - 50, // 7: coordinator.v1.WorkerInfo.labels:type_name -> coordinator.v1.WorkerInfo.LabelsEntry + 52, // 7: coordinator.v1.WorkerInfo.labels:type_name -> coordinator.v1.WorkerInfo.LabelsEntry 19, // 8: coordinator.v1.WorkerInfo.running_tasks:type_name -> coordinator.v1.RunningTask 1, // 9: coordinator.v1.WorkerInfo.health_status:type_name -> coordinator.v1.WorkerHealthStatus - 51, // 10: coordinator.v1.HeartbeatRequest.labels:type_name -> coordinator.v1.HeartbeatRequest.LabelsEntry + 53, // 10: coordinator.v1.HeartbeatRequest.labels:type_name -> coordinator.v1.HeartbeatRequest.LabelsEntry 18, // 11: coordinator.v1.HeartbeatRequest.stats:type_name -> coordinator.v1.WorkerStats 17, // 12: coordinator.v1.HeartbeatResponse.cancelled_runs:type_name -> coordinator.v1.CancelledRun 19, // 13: coordinator.v1.RunHeartbeatRequest.running_tasks:type_name -> coordinator.v1.RunningTask @@ -5050,26 +5190,28 @@ var file_proto_coordinator_v1_coordinator_proto_depIdxs = []int32{ 42, // 44: coordinator.v1.CoordinatorService.PutState:input_type -> coordinator.v1.PutStateRequest 44, // 45: coordinator.v1.CoordinatorService.DeleteState:input_type -> coordinator.v1.DeleteStateRequest 46, // 46: coordinator.v1.CoordinatorService.ListState:input_type -> coordinator.v1.ListStateRequest - 4, // 47: coordinator.v1.CoordinatorService.Poll:output_type -> coordinator.v1.PollResponse - 6, // 48: coordinator.v1.CoordinatorService.Dispatch:output_type -> coordinator.v1.DispatchResponse - 9, // 49: coordinator.v1.CoordinatorService.GetWorkers:output_type -> coordinator.v1.GetWorkersResponse - 12, // 50: coordinator.v1.CoordinatorService.Heartbeat:output_type -> coordinator.v1.HeartbeatResponse - 14, // 51: coordinator.v1.CoordinatorService.AckTaskClaim:output_type -> coordinator.v1.AckTaskClaimResponse - 16, // 52: coordinator.v1.CoordinatorService.RunHeartbeat:output_type -> coordinator.v1.RunHeartbeatResponse - 21, // 53: coordinator.v1.CoordinatorService.ReportStatus:output_type -> coordinator.v1.ReportStatusResponse - 24, // 54: coordinator.v1.CoordinatorService.StreamLogs:output_type -> coordinator.v1.StreamLogsResponse - 26, // 55: coordinator.v1.CoordinatorService.StreamArtifacts:output_type -> coordinator.v1.StreamArtifactsResponse - 29, // 56: coordinator.v1.CoordinatorService.PutWorkspaceBundle:output_type -> coordinator.v1.PutWorkspaceBundleResponse - 31, // 57: coordinator.v1.CoordinatorService.HasWorkspaceBundle:output_type -> coordinator.v1.HasWorkspaceBundleResponse - 28, // 58: coordinator.v1.CoordinatorService.GetWorkspaceBundle:output_type -> coordinator.v1.WorkspaceBundleChunk - 34, // 59: coordinator.v1.CoordinatorService.GetDAGRunStatus:output_type -> coordinator.v1.GetDAGRunStatusResponse - 36, // 60: coordinator.v1.CoordinatorService.RequestCancel:output_type -> coordinator.v1.RequestCancelResponse - 41, // 61: coordinator.v1.CoordinatorService.GetState:output_type -> coordinator.v1.GetStateResponse - 43, // 62: coordinator.v1.CoordinatorService.PutState:output_type -> coordinator.v1.PutStateResponse - 45, // 63: coordinator.v1.CoordinatorService.DeleteState:output_type -> coordinator.v1.DeleteStateResponse - 47, // 64: coordinator.v1.CoordinatorService.ListState:output_type -> coordinator.v1.ListStateResponse - 47, // [47:65] is the sub-list for method output_type - 29, // [29:47] is the sub-list for method input_type + 48, // 47: coordinator.v1.CoordinatorService.GetDAG:input_type -> coordinator.v1.GetDAGRequest + 4, // 48: coordinator.v1.CoordinatorService.Poll:output_type -> coordinator.v1.PollResponse + 6, // 49: coordinator.v1.CoordinatorService.Dispatch:output_type -> coordinator.v1.DispatchResponse + 9, // 50: coordinator.v1.CoordinatorService.GetWorkers:output_type -> coordinator.v1.GetWorkersResponse + 12, // 51: coordinator.v1.CoordinatorService.Heartbeat:output_type -> coordinator.v1.HeartbeatResponse + 14, // 52: coordinator.v1.CoordinatorService.AckTaskClaim:output_type -> coordinator.v1.AckTaskClaimResponse + 16, // 53: coordinator.v1.CoordinatorService.RunHeartbeat:output_type -> coordinator.v1.RunHeartbeatResponse + 21, // 54: coordinator.v1.CoordinatorService.ReportStatus:output_type -> coordinator.v1.ReportStatusResponse + 24, // 55: coordinator.v1.CoordinatorService.StreamLogs:output_type -> coordinator.v1.StreamLogsResponse + 26, // 56: coordinator.v1.CoordinatorService.StreamArtifacts:output_type -> coordinator.v1.StreamArtifactsResponse + 29, // 57: coordinator.v1.CoordinatorService.PutWorkspaceBundle:output_type -> coordinator.v1.PutWorkspaceBundleResponse + 31, // 58: coordinator.v1.CoordinatorService.HasWorkspaceBundle:output_type -> coordinator.v1.HasWorkspaceBundleResponse + 28, // 59: coordinator.v1.CoordinatorService.GetWorkspaceBundle:output_type -> coordinator.v1.WorkspaceBundleChunk + 34, // 60: coordinator.v1.CoordinatorService.GetDAGRunStatus:output_type -> coordinator.v1.GetDAGRunStatusResponse + 36, // 61: coordinator.v1.CoordinatorService.RequestCancel:output_type -> coordinator.v1.RequestCancelResponse + 41, // 62: coordinator.v1.CoordinatorService.GetState:output_type -> coordinator.v1.GetStateResponse + 43, // 63: coordinator.v1.CoordinatorService.PutState:output_type -> coordinator.v1.PutStateResponse + 45, // 64: coordinator.v1.CoordinatorService.DeleteState:output_type -> coordinator.v1.DeleteStateResponse + 47, // 65: coordinator.v1.CoordinatorService.ListState:output_type -> coordinator.v1.ListStateResponse + 49, // 66: coordinator.v1.CoordinatorService.GetDAG:output_type -> coordinator.v1.GetDAGResponse + 48, // [48:67] is the sub-list for method output_type + 29, // [29:48] is the sub-list for method input_type 29, // [29:29] is the sub-list for extension type_name 29, // [29:29] is the sub-list for extension extendee 0, // [0:29] is the sub-list for field type_name @@ -5086,7 +5228,7 @@ func file_proto_coordinator_v1_coordinator_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_coordinator_v1_coordinator_proto_rawDesc), len(file_proto_coordinator_v1_coordinator_proto_rawDesc)), NumEnums: 3, - NumMessages: 49, + NumMessages: 51, NumExtensions: 0, NumServices: 1, },