Skip to content

feat: RPC fallback for missing DAG definitions on workers#2271

Draft
four-bytes-robby wants to merge 1 commit into
dagucloud:mainfrom
four-flames:feat/2259-rpc-fallback-dag-definitions
Draft

feat: RPC fallback for missing DAG definitions on workers#2271
four-bytes-robby wants to merge 1 commit into
dagucloud:mainfrom
four-flames:feat/2259-rpc-fallback-dag-definitions

Conversation

@four-bytes-robby

@four-bytes-robby four-bytes-robby commented Jun 7, 2026

Copy link
Copy Markdown
Contributor

Closes #2259

Problem

When a worker executes a sub-DAG via dag.run and the DAG YAML is not yet present in its local store, the step fails. This creates a race condition in git-synced setups:

  1. New DAG YAML is pushed to the coordinator
  2. Sub-DAG is triggered before the worker's local git-sync catches up
  3. The step fails unnecessarily

Solution

Workers fetch missing DAG definitions from the coordinator on demand via a new GetDAG RPC as a fallback when the local store misses.

  • Primary source: local DAG store (unchanged)
  • Fallback: RPC call to coordinator for DAG YAML retrieval
  • Error handling: if coordinator also does not have it → fail gracefully

Changes

  • New gRPC GetDAG RPC in coordinator proto with GetDAGRequest/GetDAGResponse
  • Coordinator handler: dagStore field + GetDAG implementation (returns YAML spec by name, returns Unimplemented when dagStore is nil)
  • Coordinator client: GetDAG(ctx, name) method added to Client interface + implementation
  • Agent/dbClient: RemoteDAGLoader function type wired through agent options → dbClient.GetDAG() tries local-first, then RPC fallback on miss
  • Worker remote_handler: closure using coordinator client GetDAG + spec.LoadYAML to parse remote YAML
  • cmd/coord.go + context.go: DAGStore wired into coordinator startup

Data Flow

Worker sub-DAG → dbClient.GetDAG(name)
  ├─ [Primary] Local DAGStore.GetDetails(name) → ok? return
  └─ [Fallback] RemoteDAGLoader(ctx, name)
       └─ coordinatorClient.GetDAG [gRPC] → coordinator: dagStore.GetSpec(name)
       └─ spec.LoadYAML(yaml) → *core.DAG

Testing

  • go build ./... — clean
  • go vet ./... — clean (no new issues)
  • ✅ Unit tests pass: agent, coordinator, worker, cmd, frontend/api/v1
  • 🔜 Pending: integration test on real distributed coordinator+worker setup

Notes

  • Feature is purely additive — dagStore on coordinator is optional (returns Unimplemented if nil)
  • RemoteDAGLoader defaults to nil → no fallback attempted on workers without a coordinator client
  • All existing code paths are untouched; fallback only triggers after local lookup fails

Summary by cubic

Adds an RPC fallback so workers can fetch missing DAG YAML from the coordinator when local DAGs aren’t synced yet, preventing sub-DAG failures. Addresses #2259 by eliminating race conditions in git-synced setups.

  • New Features
    • Added gRPC GetDAG to the coordinator service (GetDAGRequest/GetDAGResponse) to serve DAG YAML from the DAGStore (returns Unimplemented if no store).
    • Extended coordinator client with GetDAG(ctx, name); workers now try local-first and fall back to RPC on miss.
    • Introduced RemoteDAGLoader in the agent and wired it through dbClient; worker remote_handler fetches YAML and parses via spec.LoadYAML.
    • Wired DAGStore creation into command context and coordinator startup (including startall) so the RPC can read DAG definitions.

Written for commit 87f00ae. Summary will update on new commits.

Review in cubic

@coderabbitai

coderabbitai Bot commented Jun 7, 2026

Copy link
Copy Markdown

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 448a6d1a-1566-47bb-9e94-f9215d9bb562

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@four-bytes-robby four-bytes-robby force-pushed the feat/2259-rpc-fallback-dag-definitions branch from 329b0ee to ffb7e07 Compare June 7, 2026 10:04


Adds GetDAG RPC to coordinator service allowing workers to fetch missing
DAG definitions on demand when the local DAG store misses. This eliminates
race conditions in git-synced setups where new DAG YAML may not yet be
present on the worker.

- New gRPC: GetDAG RPC in coordinator proto (GetDAGRequest/GetDAGResponse)
- Coordinator: dagsStore field + GetDAG handler (returns YAML spec by name)
- Client: GetDAG(ctx, name) method on coordinator.Client
- Agent: RemoteDAGLoader func type + option, wired through dbClient
- dbClient: local-first lookup with RPC fallback on miss
- Worker: RemoteDAGLoader closure using coordinator client GetDAG
- Context: DAGStore field added, populated at init, passed to coordinator
@four-bytes-robby four-bytes-robby force-pushed the feat/2259-rpc-fallback-dag-definitions branch from ffb7e07 to 87f00ae Compare June 7, 2026 10:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: RPC fallback for missing DAG definitions on workers

1 participant