Skip to content

Conversation

@ericallam
Copy link
Member

No description provided.

@changeset-bot
Copy link

changeset-bot bot commented Dec 24, 2025

⚠️ No Changeset found

Latest commit: 71d67e1

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 24, 2025

Walkthrough

This pull request spans three files with complementary changes to the batch processing and fair-queue systems. The first change refactors the BatchQueue.handleMessage flow to reduce nesting and consolidate control paths, with earlier metadata retrieval and adjusted telemetry instrumentation timing. The second change extends the FairQueue with new public methods to expose cache sizes and adds cleanup logic for in-memory caches (queueDescriptorCache and queueCooloffStates) when queues become empty across multiple code paths. The third change introduces an optional getDynamicAttributes callback to the BatchedSpanManagerOptions interface, allowing dynamic span attributes to be computed at span creation time, which FairQueue now uses to report cache sizes during telemetry.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Rationale

The diff involves three interdependent files with mixed complexity. File 1 contains control-flow refactoring with consolidated error handling and telemetry instrumentation changes. File 2 introduces new public methods and replicates cache cleanup logic across three similar code paths (claim results, message completion, and direct processing), requiring consistent verification across locations. File 3 adds interface-level callback support. The heterogeneous nature of changes—refactoring, new public methods, cross-file integration, and logic additions—requires separate reasoning per file, though the repetitive cache cleanup pattern in File 2 reduces some complexity. The integration dependency between Files 2 and 3 adds moderate scrutiny overhead.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description is entirely empty, missing all required sections from the template including testing, changelog, and checklist items. Add a complete description following the template: include issue reference, checklist confirmations, testing steps, changelog entry, and any relevant screenshots.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main objective of the changeset: preventing memory growth through cache cleanup in the fair-queue module.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ea-branch-112

📜 Recent review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2eba36c and 71d67e1.

📒 Files selected for processing (3)
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/telemetry.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • packages/redis-worker/src/fair-queue/telemetry.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • packages/redis-worker/src/fair-queue/telemetry.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier

Files:

  • packages/redis-worker/src/fair-queue/telemetry.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
🧬 Code graph analysis (3)
packages/redis-worker/src/fair-queue/telemetry.ts (1)
internal-packages/tracing/src/index.ts (1)
  • Attributes (15-15)
internal-packages/run-engine/src/batch-queue/index.ts (2)
packages/core/src/v3/apiClient/index.ts (1)
  • batchId (419-537)
internal-packages/run-engine/src/engine/systems/batchSystem.ts (1)
  • batchId (39-137)
packages/redis-worker/src/fair-queue/index.ts (3)
packages/redis-worker/src/fair-queue/keyProducer.ts (2)
  • masterQueueKey (29-31)
  • queueKey (37-39)
packages/redis-worker/src/fair-queue/visibility.ts (1)
  • queueId (366-368)
packages/redis-worker/src/fair-queue/masterQueue.ts (1)
  • queueId (195-197)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (7)
internal-packages/run-engine/src/batch-queue/index.ts (1)

540-730: Well-structured refactoring of message handling flow.

The consolidation of the #handleMessage flow with earlier metadata retrieval, unified try/catch block, and improved telemetry instrumentation is a clean improvement. The ordering of operations (record success/failure before completing message) is correctly documented and implemented.

packages/redis-worker/src/fair-queue/telemetry.ts (2)

471-494: Clean addition of dynamic attributes callback.

The optional getDynamicAttributes callback is well-integrated into the existing BatchedSpanManagerOptions interface. The type signature () => Attributes is appropriate for a synchronous callback that returns span attributes.


592-609: Good attribute precedence ordering.

The spread order ...dynamicAttributes, ...attributes correctly allows explicitly passed attributes to override dynamic ones, providing flexibility while maintaining the default dynamic attribute behavior.

packages/redis-worker/src/fair-queue/index.ts (4)

166-169: Good integration of cache size telemetry.

Exposing cache sizes as dynamic span attributes enables monitoring of the in-memory cache growth that this PR aims to address. This provides operational visibility without adding significant overhead.


604-631: Well-documented cache size monitoring API.

The three methods provide flexible access to cache sizes for monitoring purposes. The JSDoc comments clearly explain the purpose and lifecycle of each cache, which aids in understanding and debugging.


933-938: Consistent cache cleanup across all empty-queue code paths.

The cache cleanup is correctly applied in all three locations where queues become empty:

  1. #claimAndPushToWorkerQueue (two-stage processing)
  2. #processOneMessage (direct processing)
  3. #completeMessage (after processing last message)

The condition removed === 1 ensures cleanup only happens when the queue was actually removed from the master queue, preventing premature cleanup in race conditions.

Also applies to: 1274-1279, 1486-1491


1746-1748: Note: #resetCooloff provides additional cleanup path.

The #resetCooloff method at line 1747 deletes from queueCooloffStates when processing succeeds, which means cooloff entries are cleaned up either on success or when the queue becomes empty. This dual cleanup path ensures cooloff state doesn't accumulate.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@vibe-kanban-cloud
Copy link

Review Complete

Your review story is ready!

View Story

Comment !reviewfast on this PR to re-generate the story.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal-packages/run-engine/src/batch-queue/index.ts (1)

588-709: Potential undefined processedCount if recordFailure throws in catch block.

If recordFailure at line 685-694 throws an exception, processedCount remains uninitialized, causing a runtime error at line 709 when accessing processedCount. Consider wrapping the catch block's recordFailure in its own try/catch or using a default value.

🔎 Suggested fix
     } catch (error) {
       span?.setAttribute("batch.result", "unexpected_error");
       span?.setAttribute("batch.error", error instanceof Error ? error.message : String(error));

       // Unexpected error during processing
       // For offloaded payloads, payload is an R2 path; for inline payloads, store full payload
-      const payloadStr = await this.#startSpan(
-        "BatchQueue.serializePayload",
-        async (innerSpan) => {
-          const str =
-            typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
-          innerSpan?.setAttribute("batch.payloadSize", str.length);
-          return str;
-        }
-      );
-
-      processedCount = await this.#startSpan("BatchQueue.recordFailure", async () => {
-        return this.completionTracker.recordFailure(batchId, {
-          index: itemIndex,
-          taskIdentifier: item.task,
-          payload: payloadStr,
-          options: item.options,
-          error: error instanceof Error ? error.message : String(error),
-          errorCode: "UNEXPECTED_ERROR",
+      try {
+        const payloadStr = await this.#startSpan(
+          "BatchQueue.serializePayload",
+          async (innerSpan) => {
+            const str =
+              typeof item.payload === "string" ? item.payload : JSON.stringify(item.payload);
+            innerSpan?.setAttribute("batch.payloadSize", str.length);
+            return str;
+          }
+        );
+
+        processedCount = await this.#startSpan("BatchQueue.recordFailure", async () => {
+          return this.completionTracker.recordFailure(batchId, {
+            index: itemIndex,
+            taskIdentifier: item.task,
+            payload: payloadStr,
+            options: item.options,
+            error: error instanceof Error ? error.message : String(error),
+            errorCode: "UNEXPECTED_ERROR",
+          });
         });
-      });
+      } catch (recordError) {
+        this.logger.error("Failed to record failure in completion tracker", {
+          batchId,
+          itemIndex,
+          error: recordError instanceof Error ? recordError.message : String(recordError),
+        });
+        // Re-throw to ensure message is not incorrectly completed
+        throw recordError;
+      }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2eba36c and 71d67e1.

📒 Files selected for processing (3)
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
  • packages/redis-worker/src/fair-queue/telemetry.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • packages/redis-worker/src/fair-queue/telemetry.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Files:

  • packages/redis-worker/src/fair-queue/telemetry.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
**/*.{js,ts,jsx,tsx,json,md,css,scss}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier

Files:

  • packages/redis-worker/src/fair-queue/telemetry.ts
  • internal-packages/run-engine/src/batch-queue/index.ts
  • packages/redis-worker/src/fair-queue/index.ts
🧬 Code graph analysis (3)
packages/redis-worker/src/fair-queue/telemetry.ts (1)
internal-packages/tracing/src/index.ts (1)
  • Attributes (15-15)
internal-packages/run-engine/src/batch-queue/index.ts (2)
packages/core/src/v3/apiClient/index.ts (1)
  • batchId (419-537)
internal-packages/run-engine/src/engine/systems/batchSystem.ts (1)
  • batchId (39-137)
packages/redis-worker/src/fair-queue/index.ts (3)
packages/redis-worker/src/fair-queue/keyProducer.ts (2)
  • masterQueueKey (29-31)
  • queueKey (37-39)
packages/redis-worker/src/fair-queue/visibility.ts (1)
  • queueId (366-368)
packages/redis-worker/src/fair-queue/masterQueue.ts (1)
  • queueId (195-197)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (24)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (7)
internal-packages/run-engine/src/batch-queue/index.ts (1)

540-730: Well-structured refactoring of message handling flow.

The consolidation of the #handleMessage flow with earlier metadata retrieval, unified try/catch block, and improved telemetry instrumentation is a clean improvement. The ordering of operations (record success/failure before completing message) is correctly documented and implemented.

packages/redis-worker/src/fair-queue/telemetry.ts (2)

471-494: Clean addition of dynamic attributes callback.

The optional getDynamicAttributes callback is well-integrated into the existing BatchedSpanManagerOptions interface. The type signature () => Attributes is appropriate for a synchronous callback that returns span attributes.


592-609: Good attribute precedence ordering.

The spread order ...dynamicAttributes, ...attributes correctly allows explicitly passed attributes to override dynamic ones, providing flexibility while maintaining the default dynamic attribute behavior.

packages/redis-worker/src/fair-queue/index.ts (4)

166-169: Good integration of cache size telemetry.

Exposing cache sizes as dynamic span attributes enables monitoring of the in-memory cache growth that this PR aims to address. This provides operational visibility without adding significant overhead.


604-631: Well-documented cache size monitoring API.

The three methods provide flexible access to cache sizes for monitoring purposes. The JSDoc comments clearly explain the purpose and lifecycle of each cache, which aids in understanding and debugging.


933-938: Consistent cache cleanup across all empty-queue code paths.

The cache cleanup is correctly applied in all three locations where queues become empty:

  1. #claimAndPushToWorkerQueue (two-stage processing)
  2. #processOneMessage (direct processing)
  3. #completeMessage (after processing last message)

The condition removed === 1 ensures cleanup only happens when the queue was actually removed from the master queue, preventing premature cleanup in race conditions.

Also applies to: 1274-1279, 1486-1491


1746-1748: Note: #resetCooloff provides additional cleanup path.

The #resetCooloff method at line 1747 deletes from queueCooloffStates when processing succeeds, which means cooloff entries are cleaned up either on success or when the queue becomes empty. This dual cleanup path ensures cooloff state doesn't accumulate.

@ericallam ericallam merged commit 71279a7 into main Dec 24, 2025
32 checks passed
@ericallam ericallam deleted the ea-branch-112 branch December 24, 2025 10:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants