-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add custom thread pools for SegmentOnlineOfflineStateModelFactory #17424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17424 +/- ##
============================================
+ Coverage 63.22% 63.31% +0.08%
Complexity 1474 1474
============================================
Files 3148 3156 +8
Lines 187610 188176 +566
Branches 28714 28802 +88
============================================
+ Hits 118623 119139 +516
- Misses 59784 59811 +27
- Partials 9203 9226 +23
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| } | ||
|
|
||
| @Override | ||
| public ExecutorService getExecutorService(String resourceName, String fromState, String toState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The helix docs of StateModelFactory states "This method is called just once per transition type per resource. "
What is a transition type ? Will this be an issue for us ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Transition type means "from which state to which state". For each pair of states, this method will be called with it at most once, to register the thread pool to that transition type.
This would not be an issue since it constructs the thread pools for the transitions in the new ServerThreadPoolManager at server start, then reuse them throughout the server lifetime.
| } | ||
|
|
||
| @Override | ||
| public ExecutorService getExecutorService(String resourceName, String fromState, String toState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't change default behaviour . so by default we should return null here so that it goes through existing code flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get it. Example configs:
helix.transition.thread.pool.size |
helix.transition.consuming.thread.pool.size |
result |
|---|---|---|
| null | null | original path |
| null | 10 | transition from/to consuming use the new thread pool, others use the original |
| 40 | null | transition from/to consuming use the original path, others use the new thread pool |
| 40 | 10 | all transitions use the new thread pools |
Added a note there will be no effect on the change from null to 40 or from 40 to null, because the thread pool would be registered via getExecutor upon the first time a transition type is seen, and will not be changed after that. But the change from 40 to 10, for example, is effective because it just changes the core size and max size of the thread pool, not changing the thread pool object.
| } | ||
|
|
||
| @Override | ||
| public ExecutorService getExecutorService(String resourceName, String fromState, String toState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally lets add a seperate flag to create a threadpool per resource (table). that way we achieve more isolation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I'm unsure is that this allows the limit of ongoing transitions to go from 40 to something way bigger. Especially in the case during server start, a cluster with 100 tables would then results in 4000 state transitions concurrently. If we have all expensive operations throttled, then having thread pool per table might be safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we need to configure it appropriatley. Overall, I think we need 3 behaviours
- default - use existing helix thread pool executor
- thread pool based on state transition type (already added in the PR). I agree this is a more manageable conf
- thread pool based on resource name. This will be required in extreme cases where tables are stepping on each other. Having this capability will enable configuring it when required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you agree on thread pool based on resource name, we can add that in a followup PR
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
Show resolved
Hide resolved
krishan1390
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Lets test it locally by enabling the config selectively and ensuring the respective state transition msgs are being processed.
| } | ||
|
|
||
| @Override | ||
| public ExecutorService getExecutorService(String resourceName, String fromState, String toState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you agree on thread pool based on resource name, we can add that in a followup PR
Description
Currently Pinot server uses the default Helix thread pool for state transition messages (appears as
HelixTaskExecutor-message_handle_thread_<tid>). The main concern of this is thatChange
ServerThreadPoolManagerthat constructs the thread pools for consuming transitions and other transitionsExecutorService getExecutorService(String resourceName, String fromState, String toState)inSegmentOnlineOfflineStateModelFactoryto return the thread pools created byServerThreadPoolManagerpinot.server.helix.transition.thread.pool.size, default tonullpinot.server.helix.transition.consuming.thread.pool.size, default tonullThe behavior of the new configs (values are example):
helix.transition.thread.pool.sizehelix.transition.consuming.thread.pool.sizeLocal test
Set up
Test
The goal is to reset
table_OFFLINEwith long transition time, then meanwhile force commitanotherTable_REALTIME.We should see consuming segments blocked from committing with the old config, and allowed committing with the new config
1. Apply delay into
OFFLINE->ONLINEtransition to simulate transition blockerIn
org.apache.pinot.core.data.manager.BaseTableDataManager2. Add the following to the cluster config:
3. Restart only server 1
4. Reset the OFFLINE table
5. Force commit the REALTIME table
6. Verify that segments on server 1 is committed immediately but server 2 has not
From the log, server 1 immediately finish the consuming segment transitions, using the custom thread pool, with thread prefix
HelixConsumingTransitionExecutor:Server 2 started the consuming segment transitions after ~12 min (999 online segments / 40 threads * 30 sec), and used the default Helix thread pool, with prefix
HelixTaskExecutor-message_handle_thread