-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add custom thread pools for SegmentOnlineOfflineStateModelFactory #17424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | |||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,6 +18,8 @@ | ||||||||||||||||
| */ | |||||||||||||||||
| package org.apache.pinot.server.starter.helix; | |||||||||||||||||
|
|
|||||||||||||||||
| import java.util.concurrent.ExecutorService; | |||||||||||||||||
| import javax.annotation.Nullable; | |||||||||||||||||
| import org.apache.helix.NotificationContext; | |||||||||||||||||
| import org.apache.helix.model.Message; | |||||||||||||||||
| import org.apache.helix.participant.statemachine.StateModel; | |||||||||||||||||
|
|
@@ -40,10 +42,13 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta | ||||||||||||||||
|
|
|||||||||||||||||
| private final String _instanceId; | |||||||||||||||||
| private final InstanceDataManager _instanceDataManager; | |||||||||||||||||
| private final ServerThreadPoolManager _serverThreadPoolManager; | |||||||||||||||||
|
|
|||||||||||||||||
| public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) { | |||||||||||||||||
| public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager, | |||||||||||||||||
| ServerThreadPoolManager serverThreadPoolManager) { | |||||||||||||||||
| _instanceId = instanceId; | |||||||||||||||||
| _instanceDataManager = instanceDataManager; | |||||||||||||||||
| _serverThreadPoolManager = serverThreadPoolManager; | |||||||||||||||||
| } | |||||||||||||||||
|
|
|||||||||||||||||
| public static String getStateModelName() { | |||||||||||||||||
|
|
@@ -162,7 +167,6 @@ private void onConsumingToDropped(String realtimeTableName, String segmentName) | ||||||||||||||||
| public void onBecomeOnlineFromOffline(Message message, NotificationContext context) | |||||||||||||||||
| throws Exception { | |||||||||||||||||
| _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : {}", message); | |||||||||||||||||
|
|
|||||||||||||||||
| try { | |||||||||||||||||
| _instanceDataManager.addOnlineSegment(message.getResourceName(), message.getPartitionName()); | |||||||||||||||||
| } catch (Exception e) { | |||||||||||||||||
|
|
@@ -247,4 +251,13 @@ public void onBecomeDroppedFromError(Message message, NotificationContext contex | ||||||||||||||||
| } | |||||||||||||||||
| } | |||||||||||||||||
| } | |||||||||||||||||
|
|
|||||||||||||||||
| @Override | |||||||||||||||||
| @Nullable | |||||||||||||||||
| public ExecutorService getExecutorService(String resourceName, String fromState, String toState) { | |||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The helix docs of StateModelFactory states "This method is called just once per transition type per resource. " What is a transition type ? Will this be an issue for us ?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Transition type means "from which state to which state". For each pair of states, this method will be called with it at most once, to register the thread pool to that transition type. This would not be an issue since it constructs the thread pools for the transitions in the new
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we shouldn't change default behaviour . so by default we should return null here so that it goes through existing code flow.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Get it. Example configs:
Added a note there will be no effect on the change from
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ideally lets add a seperate flag to create a threadpool per resource (table). that way we achieve more isolation
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I'm unsure is that this allows the limit of ongoing transitions to go from 40 to something way bigger. Especially in the case during server start, a cluster with 100 tables would then results in 4000 state transitions concurrently. If we have all expensive operations throttled, then having thread pool per table might be safe.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes we need to configure it appropriatley. Overall, I think we need 3 behaviours
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you agree on thread pool based on resource name, we can add that in a followup PR |
|||||||||||||||||
| if (fromState.equals("CONSUMING") || toState.equals("CONSUMING")) { | |||||||||||||||||
| return _serverThreadPoolManager.getHelixConsumingTransitionExecutor(); | |||||||||||||||||
| } | |||||||||||||||||
| return _serverThreadPoolManager.getHelixTransitionExecutor(); | |||||||||||||||||
| } | |||||||||||||||||
| } | |||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pinot.server.starter.helix; | ||
|
|
||
| import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import javax.annotation.Nullable; | ||
| import org.apache.helix.participant.statemachine.StateModelFactory; | ||
| import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; | ||
| import org.apache.pinot.spi.env.PinotConfiguration; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import static org.apache.pinot.spi.utils.CommonConstants.Server.CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE; | ||
| import static org.apache.pinot.spi.utils.CommonConstants.Server.CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE; | ||
|
|
||
|
|
||
| /** | ||
| * Manages the custom Helix state transition thread pools for Pinot server. | ||
| * Helix state transition will use their default thread pool if the thread pool provided by this class is null. Also see | ||
| * {@link SegmentOnlineOfflineStateModelFactory#getExecutorService(String, String, String)} | ||
| */ | ||
| public class ServerThreadPoolManager implements PinotClusterConfigChangeListener { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(ServerThreadPoolManager.class); | ||
| @Nullable | ||
| private ThreadPoolExecutor _helixTransitionExecutor; | ||
| @Nullable | ||
| private ThreadPoolExecutor _helixConsumingTransitionExecutor; | ||
|
|
||
| public ServerThreadPoolManager(PinotConfiguration serverConf) { | ||
| String helixTransitionThreadPoolSizeStr = | ||
| serverConf.getProperty(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE); | ||
| String helixConsumingTransitionThreadPoolSizeStr = | ||
| serverConf.getProperty(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE); | ||
| if (helixTransitionThreadPoolSizeStr != null) { | ||
| int helixTransitionThreadPoolSize = Integer.parseInt(helixTransitionThreadPoolSizeStr); | ||
| _helixTransitionExecutor = new ThreadPoolExecutor(helixTransitionThreadPoolSize, helixTransitionThreadPoolSize, | ||
| 0L, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(), | ||
| new ThreadFactoryBuilder().setNameFormat("HelixTransitionExecutor-%d").build()); | ||
| LOGGER.info("Created HelixTransitionExecutor with pool size: {}", helixTransitionThreadPoolSize); | ||
| } | ||
| if (helixConsumingTransitionThreadPoolSizeStr != null) { | ||
| int helixConsumingTransitionThreadPoolSize = Integer.parseInt(helixConsumingTransitionThreadPoolSizeStr); | ||
| _helixConsumingTransitionExecutor = | ||
| new ThreadPoolExecutor(helixConsumingTransitionThreadPoolSize, helixConsumingTransitionThreadPoolSize, | ||
| 0L, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(), | ||
| new ThreadFactoryBuilder().setNameFormat("HelixConsumingTransitionExecutor-%d").build()); | ||
| LOGGER.info("Created HelixConsumingTransitionExecutor with pool size: {}", | ||
| helixConsumingTransitionThreadPoolSize); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * There will be no effect on the change to attempt to remove or add in the custom helix transition thread pool. | ||
| * For example, from null to 40 or from 40 to null. Because the thread pool would be registered via | ||
| * {@link StateModelFactory#getExecutorService(String, String, String)} upon the first time a transition type is | ||
| * seen, and will not be changed after that. | ||
| * But the change from 40 to 10, for example, is effective because it just changes the core size and max size of | ||
| * the thread pool, not to reassign the thread pool object. | ||
| */ | ||
| @Override | ||
| public void onChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) { | ||
| if (changedConfigs.contains(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE)) { | ||
| if (clusterConfigs.get(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE) != null) { | ||
| if (_helixTransitionExecutor == null) { | ||
| LOGGER.warn("Custom thread pool HelixTransitionExecutor cannot be created on the fly from the config change. " | ||
| + "Please restart the server to take effect."); | ||
| } else { | ||
| int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE)); | ||
| _helixTransitionExecutor.setCorePoolSize(newPoolSize); | ||
| _helixTransitionExecutor.setMaximumPoolSize(newPoolSize); | ||
| LOGGER.info("Updated HelixTransitionExecutor pool size to: {}", newPoolSize); | ||
| } | ||
| } else if (_helixTransitionExecutor != null) { | ||
| LOGGER.warn( | ||
| "Custom thread pool HelixTransitionExecutor would still be used for Helix state transitions even though " | ||
| + "the config {} is removed from cluster config, using the last known size: {}", | ||
| CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE, _helixTransitionExecutor.getCorePoolSize()); | ||
| } | ||
| } | ||
| if (changedConfigs.contains(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE)) { | ||
| if (clusterConfigs.get(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE) != null) { | ||
| if (_helixConsumingTransitionExecutor == null) { | ||
| LOGGER.warn( | ||
| "Custom thread pool HelixConsumingTransitionExecutor cannot be created on the fly from the config " | ||
| + "change. Please restart the server to take effect."); | ||
| } else { | ||
| int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE)); | ||
| _helixConsumingTransitionExecutor.setCorePoolSize(newPoolSize); | ||
| _helixConsumingTransitionExecutor.setMaximumPoolSize(newPoolSize); | ||
| LOGGER.info("Updated HelixConsumingTransitionExecutor pool size to: {}", newPoolSize); | ||
| } | ||
| } else if (_helixConsumingTransitionExecutor != null) { | ||
| LOGGER.warn("Custom thread pool HelixConsumingTransitionExecutor would still be used for Helix consuming state " | ||
| + "transitions even though the config {} is removed from cluster config, using the last known size: {}", | ||
| CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE, _helixConsumingTransitionExecutor.getCorePoolSize()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Nullable | ||
| public ThreadPoolExecutor getHelixTransitionExecutor() { | ||
| return _helixTransitionExecutor; | ||
| } | ||
|
|
||
| @Nullable | ||
| public ThreadPoolExecutor getHelixConsumingTransitionExecutor() { | ||
| return _helixConsumingTransitionExecutor; | ||
| } | ||
|
|
||
| public void shutdown() { | ||
| if (_helixTransitionExecutor != null) { | ||
| _helixTransitionExecutor.shutdownNow(); | ||
| } | ||
| if (_helixConsumingTransitionExecutor != null) { | ||
| _helixConsumingTransitionExecutor.shutdownNow(); | ||
| } | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.