Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/cmd/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Context struct {
DAGRunLeaseStore exec.DAGRunLeaseStore
ActiveDistributedRunStore exec.ActiveDistributedRunStore

DAGStore exec.DAGStore
Proc exec.ProcHandle
LicenseManager *license.Manager
ContextStore *clicontext.Store
Expand Down Expand Up @@ -100,6 +101,7 @@ func (c *Context) WithContext(ctx context.Context) *Context {
WorkerHeartbeatStore: c.WorkerHeartbeatStore,
DAGRunLeaseStore: c.DAGRunLeaseStore,
ActiveDistributedRunStore: c.ActiveDistributedRunStore,
DAGStore: c.DAGStore,
Proc: c.Proc,
LicenseManager: c.LicenseManager,
ContextStore: c.ContextStore,
Expand Down Expand Up @@ -344,6 +346,10 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) {
sm := file.NewServiceRegistry(cfg)
dispatchTaskStore := store.NewDispatchTaskStore(file.NewCollection(distributedDir))
workerHeartbeatStore := store.NewWorkerHeartbeatStore(file.NewCollection(filepath.Join(distributedDir, "workers")))
dagStore, err := cmdprocess.NewDAGStore(cfg, cmdprocess.DAGStoreConfig{})
if err != nil {
return nil, fmt.Errorf("failed to create DAG store: %w", err)
}

// Initialize license manager for server commands
var licMgr *license.Manager
Expand Down Expand Up @@ -410,6 +416,7 @@ func NewContext(cmd *cobra.Command, flags []commandLineFlag) (*Context, error) {
WorkerHeartbeatStore: workerHeartbeatStore,
DAGRunLeaseStore: dagRunLeaseStore,
ActiveDistributedRunStore: activeDistributedRunStore,
DAGStore: dagStore,
LicenseManager: licMgr,
ContextStore: contextStore,
CLIContext: selectedContext,
Expand Down
3 changes: 3 additions & 0 deletions internal/cmd/coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func runCoordinator(ctx *Context, _ []string) error {
coordCtx.WorkerHeartbeatStore,
coordCtx.DAGRunLeaseStore,
coordCtx.ActiveDistributedRunStore,
coordCtx.DAGStore,
)
if err != nil {
return fmt.Errorf("failed to initialize coordinator: %w", err)
Expand Down Expand Up @@ -145,6 +146,7 @@ func newCoordinator(
workerHeartbeatStore exec.WorkerHeartbeatStore,
dagRunLeaseStore exec.DAGRunLeaseStore,
activeDistributedRunStore exec.ActiveDistributedRunStore,
dagStore exec.DAGStore,
) (*coordinator.Service, *coordinator.Handler, error) {
// Generate instance ID
hostname, err := os.Hostname()
Expand Down Expand Up @@ -234,6 +236,7 @@ func newCoordinator(
WorkerHeartbeatStore: workerHeartbeatStore,
DAGRunLeaseStore: dagRunLeaseStore,
ActiveDistributedRunStore: activeDistributedRunStore,
DAGStore: dagStore,
EventService: ctx.EventService,
EventSourceInstance: ctx.EventSourceInstance,
})
Expand Down
1 change: 1 addition & 0 deletions internal/cmd/startall.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func runStartAll(ctx *Context, _ []string) error {
ctx.WorkerHeartbeatStore,
ctx.DAGRunLeaseStore,
ctx.ActiveDistributedRunStore,
ctx.DAGStore,
)
if err != nil {
return fmt.Errorf("failed to initialize coordinator: %w", err)
Expand Down
15 changes: 13 additions & 2 deletions internal/runtime/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ type Agent struct {
// secretMasker redacts resolved secret values from status/history snapshots.
secretMasker *masking.Masker

// remoteDAGLoader loads a DAG from a remote source when local store misses.
remoteDAGLoader RemoteDAGLoader

// Evaluated configs - these are expanded at runtime and stored separately
// to avoid mutating the original DAG struct.
evaluatedSMTP *core.SMTPConfig
Expand Down Expand Up @@ -274,6 +277,10 @@ type ArtifactFinalizer = runtime.ArtifactFinalizer
// SubWorkflowRunnerFactory creates a runner for child workflows.
type SubWorkflowRunnerFactory func(ctx context.Context) (runtimeexec.SubWorkflowRunner, error)

// RemoteDAGLoader loads a DAG definition from a remote source.
// Returns nil, nil when the remote source does not have the DAG.
type RemoteDAGLoader func(ctx context.Context, name string) (*core.DAG, error)

// Options is the configuration for the Agent.
type Options struct {
// Dry is a dry-run mode. It does not execute the actual command.
Expand Down Expand Up @@ -363,6 +370,9 @@ type Options struct {
DAGRunArtifactDir string
// ArtifactFinalizer persists artifacts before the final terminal status is written.
ArtifactFinalizer ArtifactFinalizer
// RemoteDAGLoader loads a DAG from a remote source when the local DAG store misses.
// When nil, no remote fallback is attempted.
RemoteDAGLoader RemoteDAGLoader
// SocketServerFactory creates the local status/control transport.
// When nil, the default Unix socket transport is used.
SocketServerFactory SocketServerFactory
Expand Down Expand Up @@ -427,6 +437,7 @@ func New(
dagRunLogDir: opts.DAGRunLogDir,
dagRunArtifactDir: opts.DAGRunArtifactDir,
socketServerFactory: opts.SocketServerFactory,
remoteDAGLoader: opts.RemoteDAGLoader,
}
if a.socketServerFactory == nil {
a.socketServerFactory = defaultSocketServerFactory
Expand Down Expand Up @@ -582,7 +593,7 @@ func (a *Agent) Run(ctx context.Context) error {
}

// Create a new environment for the dag-run.
dbClient := newDBClient(a.dagRunStore, a.dagStore)
dbClient := newDBClient(a.dagRunStore, a.dagStore, a.remoteDAGLoader)

subWorkflowRunner, err := a.createSubWorkflowRunner(ctx)
if err != nil {
Expand Down Expand Up @@ -1933,7 +1944,7 @@ func (a *Agent) dryRun(ctx context.Context) error {
}
}()

db := newDBClient(a.dagRunStore, a.dagStore)
db := newDBClient(a.dagRunStore, a.dagStore, a.remoteDAGLoader)
contextOpts := []runtime.ContextOption{
runtime.WithDatabase(db),
runtime.WithRootDAGRun(a.rootDAGRun),
Expand Down
61 changes: 56 additions & 5 deletions internal/runtime/agent/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package agent

import (
"context"
"errors"
"fmt"
"strings"

"github.com/dagucloud/dagu/internal/cmn/logger"
"github.com/dagucloud/dagu/internal/cmn/logger/tag"
"github.com/dagucloud/dagu/internal/core"
"github.com/dagucloud/dagu/internal/core/exec"
"github.com/dagucloud/dagu/internal/runtime"
Expand All @@ -16,17 +19,65 @@ import (
var _ runtime.Database = &dbClient{}

type dbClient struct {
ds exec.DAGStore
drs exec.DAGRunStore
ds exec.DAGStore
drs exec.DAGRunStore
remoteDAGLoader RemoteDAGLoader
}

func newDBClient(drs exec.DAGRunStore, ds exec.DAGStore) *dbClient {
return &dbClient{drs: drs, ds: ds}
func newDBClient(drs exec.DAGRunStore, ds exec.DAGStore, remoteDAGLoader RemoteDAGLoader) *dbClient {
return &dbClient{drs: drs, ds: ds, remoteDAGLoader: remoteDAGLoader}
}

// GetDAG implements core.DBClient.
func (o *dbClient) GetDAG(ctx context.Context, name string) (*core.DAG, error) {
return o.ds.GetDetails(ctx, name)
// Guard against nil DAG store
if o.ds == nil {
logger.Info(ctx, "No local DAG store, trying remote fallback", tag.DAG(name))
if o.remoteDAGLoader == nil {
return nil, fmt.Errorf("no local DAG store and no remote loader configured for DAG %s", name)
}
remoteDAG, remoteErr := o.remoteDAGLoader(ctx, name)
if remoteErr != nil {
logger.Warn(ctx, "Remote DAG fallback failed", tag.DAG(name), tag.Error(remoteErr))
return nil, fmt.Errorf("remote DAG load failed for %s: %w", name, remoteErr)
}
if remoteDAG == nil {
return nil, fmt.Errorf("DAG %s not found locally or remotely", name)
}
logger.Info(ctx, "DAG loaded from remote fallback", tag.DAG(name))
return remoteDAG, nil
}

dag, err := o.ds.GetDetails(ctx, name)
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
if err == nil {
return dag, nil
}
// Only fallback to remote for not-found errors; propagate other errors directly
if !errors.Is(err, exec.ErrDAGNotFound) {
return nil, err
}
// Try remote fallback if configured
if o.remoteDAGLoader == nil {
return nil, err
}
logger.Info(ctx, "DAG not found locally, trying remote fallback",
tag.DAG(name),
)
remoteDAG, remoteErr := o.remoteDAGLoader(ctx, name)
if remoteErr != nil {
logger.Warn(ctx, "Remote DAG fallback failed",
tag.DAG(name),
tag.Error(remoteErr),
)
return nil, err // Return the original local error
}
if remoteDAG == nil {
return nil, err // Return the original local error
}
logger.Info(ctx, "DAG loaded from remote fallback",
tag.DAG(name),
)
return remoteDAG, nil
Comment thread
four-bytes-robby marked this conversation as resolved.
}

func (o *dbClient) GetSubDAGRunStatus(ctx context.Context, dagRunID string, rootDAGRun exec.DAGRunRef) (*runtime.RunStatus, error) {
Expand Down
141 changes: 136 additions & 5 deletions internal/runtime/agent/dbclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestDBClient_GetSubDAGRunStatus(t *testing.T) {
},
}, nil)
// Create dbClient
dbClient := newDBClient(mockDAGRunStore, mockDAGStore)
dbClient := newDBClient(mockDAGRunStore, mockDAGStore, nil)

// Test GetSubDAGRunStatus
st, err := dbClient.GetSubDAGRunStatus(ctx, subRunID, rootRef)
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestDBClient_GetSubDAGRunStatus(t *testing.T) {

mockDAGRunStore.On("FindSubAttempt", ctx, rootRef, subRunID).Return(nil, errors.New("not found"))

dbClient := newDBClient(mockDAGRunStore, mockDAGStore)
dbClient := newDBClient(mockDAGRunStore, mockDAGStore, nil)

status, err := dbClient.GetSubDAGRunStatus(ctx, subRunID, rootRef)
assert.Error(t, err)
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestDBClient_IsSubDAGRunCompleted(t *testing.T) {
Status: core.Succeeded,
}, nil)

dbClient := newDBClient(mockDAGRunStore, mockDAGStore)
dbClient := newDBClient(mockDAGRunStore, mockDAGStore, nil)

completed, err := dbClient.IsSubDAGRunCompleted(ctx, subRunID, rootRef)
require.NoError(t, err)
Expand All @@ -133,7 +133,7 @@ func TestDBClient_IsSubDAGRunCompleted(t *testing.T) {
Status: core.Failed,
}, nil)

dbClient := newDBClient(mockDAGRunStore, mockDAGStore)
dbClient := newDBClient(mockDAGRunStore, mockDAGStore, nil)

completed, err := dbClient.IsSubDAGRunCompleted(ctx, subRunID, rootRef)
require.NoError(t, err)
Expand All @@ -153,7 +153,7 @@ func TestDBClient_IsSubDAGRunCompleted(t *testing.T) {

mockDAGRunStore.On("FindSubAttempt", ctx, rootRef, subRunID).Return(nil, errors.New("not found"))

dbClient := newDBClient(mockDAGRunStore, mockDAGStore)
dbClient := newDBClient(mockDAGRunStore, mockDAGStore, nil)

completed, err := dbClient.IsSubDAGRunCompleted(ctx, subRunID, rootRef)
assert.Error(t, err)
Expand Down Expand Up @@ -361,3 +361,134 @@ func (m *mockDAGRunStore) RenameDAGRuns(ctx context.Context, oldName, newName st
args := m.Called(ctx, oldName, newName)
return args.Error(0)
}

func TestDBClient_GetDAG(t *testing.T) {
testDAG := &core.DAG{Name: "test-dag"}

// Helper to create a mock DAG store with pre-set GetDetails expectations.
setupMockDS := func(name string, dag *core.DAG, err error) *mockDAGStore {
m := new(mockDAGStore)
m.On("GetDetails", mock.Anything, name, mock.Anything).Return(dag, err)
return m
}

tests := []struct {
name string
ds exec.DAGStore // nil means no local store
remoteLoader RemoteDAGLoader // nil means no remote loader
expectDAG *core.DAG
expectError bool
expectErrContains string
}{
{
name: "local hit returns dag",
ds: setupMockDS("test-dag", testDAG, nil),
remoteLoader: nil,
expectDAG: testDAG,
expectError: false,
},
{
name: "local not-found + remote hit",
ds: setupMockDS("test-dag", nil, exec.ErrDAGNotFound),
remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) {
return testDAG, nil
},
expectDAG: testDAG,
expectError: false,
},
{
name: "local not-found + remote returns nil",
ds: setupMockDS("test-dag", nil, exec.ErrDAGNotFound),
remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) {
return nil, nil
},
expectError: true,
expectErrContains: "DAG is not found",
},
{
name: "local not-found + remote returns error",
ds: setupMockDS("test-dag", nil, exec.ErrDAGNotFound),
remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) {
return nil, errors.New("remote unavailable")
},
expectError: true,
expectErrContains: "DAG is not found",
},
{
name: "local not-found + no remote loader",
ds: setupMockDS("test-dag", nil, exec.ErrDAGNotFound),
remoteLoader: nil,
expectError: true,
expectErrContains: "DAG is not found",
},
{
name: "local non-not-found error propagates immediately",
ds: setupMockDS("test-dag", nil, errors.New("permission denied")),
remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) {
return testDAG, nil // should NOT be called
},
expectError: true,
expectErrContains: "permission denied",
},
{
name: "nil ds + remote hit",
ds: nil,
remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) {
return testDAG, nil
},
expectDAG: testDAG,
expectError: false,
},
{
name: "nil ds + remote returns nil dag",
ds: nil,
remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) {
return nil, nil
},
expectError: true,
expectErrContains: "not found locally or remotely",
},
{
name: "nil ds + remote returns error",
ds: nil,
remoteLoader: func(ctx context.Context, name string) (*core.DAG, error) {
return nil, errors.New("remote unavailable")
},
expectError: true,
expectErrContains: "remote DAG load failed",
},
{
name: "nil ds + no remote loader",
ds: nil,
remoteLoader: nil,
expectError: true,
expectErrContains: "no local DAG store and no remote loader",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
mockDRS := new(mockDAGRunStore)
client := newDBClient(mockDRS, tt.ds, tt.remoteLoader)

dag, err := client.GetDAG(ctx, "test-dag")

if tt.expectError {
require.Error(t, err)
if tt.expectErrContains != "" {
assert.Contains(t, err.Error(), tt.expectErrContains)
}
assert.Nil(t, dag)
} else {
require.NoError(t, err)
assert.Equal(t, tt.expectDAG, dag)
}

// Assert mock expectations for the DAG store (when a mock is used).
if mockDS, ok := tt.ds.(*mockDAGStore); ok {
mockDS.AssertExpectations(t)
}
})
}
}
Loading
Loading