Skip to content

Conversation

@J-HowHuang
Copy link
Collaborator

@J-HowHuang J-HowHuang commented Dec 24, 2025

Description

Currently Pinot server uses the default Helix thread pool for state transition messages (appears as HelixTaskExecutor-message_handle_thread_<tid>). The main concern of this is that

  1. Every transition regardless of the states uses the same pool. New consuming segments (OFFLINE->ONLINE) could be blocked by operations on other tables (e.g. reset, rebalance) because they flood the message queue.
  2. It's not possible to manage the threads within the scope of Pinot. For example, change max threads.

Change

  • Add a new class ServerThreadPoolManager that constructs the thread pools for consuming transitions and other transitions
  • Override ExecutorService getExecutorService(String resourceName, String fromState, String toState) in SegmentOnlineOfflineStateModelFactory to return the thread pools created by ServerThreadPoolManager
  • Add new configs
    • pinot.server.helix.transition.thread.pool.size, default to null
    • pinot.server.helix.transition.consuming.thread.pool.size, default to null

The behavior of the new configs (values are example):

helix.transition.thread.pool.size helix.transition.consuming.thread.pool.size result
null null original path
null 10 transition from/to consuming use the new thread pool, others use the original
40 null transition from/to consuming use the original path, others use the new thread pool
40 10 all transitions use the new thread pools

Local test

Set up

server 1 server 2
table_OFFLINE (RF=2) 999 online segments 999 online segments
anotherTable_REALTIME (RF=1) 5 consuming segments 5 consuming segments

Test

The goal is to reset table_OFFLINE with long transition time, then meanwhile force commit anotherTable_REALTIME.
We should see consuming segments blocked from committing with the old config, and allowed committing with the new config

1. Apply delay into OFFLINE->ONLINE transition to simulate transition blocker

In org.apache.pinot.core.data.manager.BaseTableDataManager

  public void addNewOnlineSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig)
      throws Exception {
    _logger.info("Adding new ONLINE segment: {}", zkMetadata.getSegmentName());
    Thread.sleep(30_000); // add 30 sec delay
    if (!tryLoadExistingSegment(zkMetadata, indexLoadingConfig)) {
      downloadAndLoadSegment(zkMetadata, indexLoadingConfig);
    }
  }

2. Add the following to the cluster config:

{
  "pinot.server.helix.transition.thread.pool.size": "40",
  "pinot.server.helix.transition.consuming.thread.pool.size": "10"
}

3. Restart only server 1

4. Reset the OFFLINE table

5. Force commit the REALTIME table

6. Verify that segments on server 1 is committed immediately but server 2 has not

image

From the log, server 1 immediately finish the consuming segment transitions, using the custom thread pool, with thread prefix HelixConsumingTransitionExecutor:

2025/12/29 11:11:22.538 INFO [HelixStateTransitionHandler] [HelixConsumingTransitionExecutor-5] handling message: 4c360bdf-e160-4501-a89a-cf3ccbfbecbb transit airlineStats_REALTIME.airlineStats__1__0__20251229T1818Z|[] from:CONSUMING to:ONLINE, relayedFrom: null
2025/12/29 11:11:25.804 INFO [HelixStateTransitionHandler] [HelixConsumingTransitionExecutor-7] handling message: 5975803b-244d-473f-8cf8-ef2580345935 transit airlineStats_REALTIME.airlineStats__5__0__20251229T1818Z|[] from:CONSUMING to:ONLINE, relayedFrom: null
2025/12/29 11:11:28.865 INFO [HelixStateTransitionHandler] [HelixConsumingTransitionExecutor-8] handling message: b103d624-7d1b-43f4-83c2-b57f5178473e transit airlineStats_REALTIME.airlineStats__3__0__20251229T1818Z|[] from:CONSUMING to:ONLINE, relayedFrom: null
2025/12/29 11:11:32.016 INFO [HelixStateTransitionHandler] [HelixConsumingTransitionExecutor-4] handling message: b7700675-2785-434c-94b0-8329627f30f8 transit airlineStats_REALTIME.airlineStats__7__0__20251229T1818Z|[] from:CONSUMING to:ONLINE, relayedFrom: null
2025/12/29 11:11:35.108 INFO [HelixStateTransitionHandler] [HelixConsumingTransitionExecutor-1] handling message: 25eeccbd-7055-4a46-97cf-fda270b20cc5 transit airlineStats_REALTIME.airlineStats__9__0__20251229T1818Z|[] from:CONSUMING to:ONLINE, relayedFrom: null

Server 2 started the consuming segment transitions after ~12 min (999 online segments / 40 threads * 30 sec), and used the default Helix thread pool, with prefix HelixTaskExecutor-message_handle_thread

2025/12/29 11:23:09.244 INFO [HelixStateTransitionHandler] [HelixTaskExecutor-message_handle_thread_32] handling message: da587a07-99aa-4c68-8d5e-0a7e546147c6 transit airlineStats_REALTIME.airlineStats__8__0__20251229T1818Z|[] from:CONSUMING to:ONLINE, relayedFrom: null
2025/12/29 11:23:12.321 INFO [HelixStateTransitionHandler] [HelixTaskExecutor-message_handle_thread_32] handling message: 271fc503-fcfc-4e51-8dd8-ef71b7b4273a transit airlineStats_REALTIME.airlineStats__0__0__20251229T1818Z|[] from:CONSUMING to:ONLINE, relayedFrom: null
2025/12/29 11:23:15.363 INFO [HelixStateTransitionHandler] [HelixTaskExecutor-message_handle_thread_32] handling message: 90e3dc43-fe64-4f04-8046-34d66f545797 transit airlineStats_REALTIME.airlineStats__6__0__20251229T1818Z|[] from:CONSUMING to:ONLINE, relayedFrom: null
2025/12/29 11:23:18.407 INFO [HelixStateTransitionHandler] [HelixTaskExecutor-message_handle_thread_32] handling message: b598cc36-fff4-4baf-9d7e-1a27d9e08441 transit airlineStats_REALTIME.airlineStats__4__0__20251229T1818Z|[] from:CONSUMING to:ONLINE, relayedFrom: null
2025/12/29 11:23:21.464 INFO [HelixStateTransitionHandler] [HelixTaskExecutor-message_handle_thread_32] handling message: 3318ff4e-5aef-48dc-8890-e9e6c74c7ac2 transit airlineStats_REALTIME.airlineStats__2__0__20251229T1818Z|[] from:CONSUMING to:ONLINE, relayedFrom: null

@codecov-commenter
Copy link

codecov-commenter commented Dec 24, 2025

Codecov Report

❌ Patch coverage is 0% with 58 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.31%. Comparing base (23c36b9) to head (6d3127e).
⚠️ Report is 28 commits behind head on master.

Files with missing lines Patch % Lines
.../server/starter/helix/ServerThreadPoolManager.java 0.00% 50 Missing ⚠️
...r/helix/SegmentOnlineOfflineStateModelFactory.java 0.00% 5 Missing ⚠️
.../pinot/server/starter/helix/BaseServerStarter.java 0.00% 3 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17424      +/-   ##
============================================
+ Coverage     63.22%   63.31%   +0.08%     
  Complexity     1474     1474              
============================================
  Files          3148     3156       +8     
  Lines        187610   188176     +566     
  Branches      28714    28802      +88     
============================================
+ Hits         118623   119139     +516     
- Misses        59784    59811      +27     
- Partials       9203     9226      +23     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.28% <0.00%> (+0.08%) ⬆️
java-21 63.25% <0.00%> (+0.07%) ⬆️
temurin 63.31% <0.00%> (+0.08%) ⬆️
unittests 63.30% <0.00%> (+0.08%) ⬆️
unittests1 55.67% <ø> (+0.05%) ⬆️
unittests2 33.98% <0.00%> (+0.09%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

}

@Override
public ExecutorService getExecutorService(String resourceName, String fromState, String toState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The helix docs of StateModelFactory states "This method is called just once per transition type per resource. "

What is a transition type ? Will this be an issue for us ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transition type means "from which state to which state". For each pair of states, this method will be called with it at most once, to register the thread pool to that transition type.

This would not be an issue since it constructs the thread pools for the transitions in the new ServerThreadPoolManager at server start, then reuse them throughout the server lifetime.

}

@Override
public ExecutorService getExecutorService(String resourceName, String fromState, String toState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't change default behaviour . so by default we should return null here so that it goes through existing code flow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it. Example configs:

helix.transition.thread.pool.size helix.transition.consuming.thread.pool.size result
null null original path
null 10 transition from/to consuming use the new thread pool, others use the original
40 null transition from/to consuming use the original path, others use the new thread pool
40 10 all transitions use the new thread pools

Added a note there will be no effect on the change from null to 40 or from 40 to null, because the thread pool would be registered via getExecutor upon the first time a transition type is seen, and will not be changed after that. But the change from 40 to 10, for example, is effective because it just changes the core size and max size of the thread pool, not changing the thread pool object.

}

@Override
public ExecutorService getExecutorService(String resourceName, String fromState, String toState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally lets add a seperate flag to create a threadpool per resource (table). that way we achieve more isolation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm unsure is that this allows the limit of ongoing transitions to go from 40 to something way bigger. Especially in the case during server start, a cluster with 100 tables would then results in 4000 state transitions concurrently. If we have all expensive operations throttled, then having thread pool per table might be safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we need to configure it appropriatley. Overall, I think we need 3 behaviours

  1. default - use existing helix thread pool executor
  2. thread pool based on state transition type (already added in the PR). I agree this is a more manageable conf
  3. thread pool based on resource name. This will be required in extreme cases where tables are stepping on each other. Having this capability will enable configuring it when required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you agree on thread pool based on resource name, we can add that in a followup PR

Copy link
Contributor

@krishan1390 krishan1390 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Lets test it locally by enabling the config selectively and ensuring the respective state transition msgs are being processed.

}

@Override
public ExecutorService getExecutorService(String resourceName, String fromState, String toState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you agree on thread pool based on resource name, we can add that in a followup PR

@J-HowHuang J-HowHuang marked this pull request as ready for review December 29, 2025 19:38
@J-HowHuang J-HowHuang added the Configuration Config changes (addition/deletion/change in behavior) label Dec 29, 2025
@J-HowHuang J-HowHuang closed this Jan 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Configuration Config changes (addition/deletion/change in behavior)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants