From 0615657863b54187dfc8843f463e7252e5ffe68e Mon Sep 17 00:00:00 2001 From: J-HowHuang Date: Tue, 23 Dec 2025 17:50:57 -0800 Subject: [PATCH 1/2] Add custom thread pool for SegmentOnlineOfflineStateModelFactory --- .../starter/helix/BaseServerStarter.java | 8 +- ...SegmentOnlineOfflineStateModelFactory.java | 15 +++- .../helix/ServerThreadPoolManager.java | 79 +++++++++++++++++++ .../pinot/spi/utils/CommonConstants.java | 8 ++ 4 files changed, 106 insertions(+), 4 deletions(-) create mode 100644 pinot-server/src/main/java/org/apache/pinot/server/starter/helix/ServerThreadPoolManager.java diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 0b33ed2dd8dc..8b1f23cf29b0 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -178,6 +178,7 @@ public abstract class BaseServerStarter implements ServiceStartable { protected RealtimeLuceneIndexRefreshManager _realtimeLuceneTextIndexRefreshManager; protected PinotEnvironmentProvider _pinotEnvironmentProvider; protected SegmentOperationsThrottler _segmentOperationsThrottler; + protected ServerThreadPoolManager _serverThreadPoolManager; protected ThreadAccountant _threadAccountant; protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler; protected volatile boolean _isServerReadyToServeQueries = false; @@ -701,6 +702,9 @@ public void start() segmentDownloadThrottler, segmentMultiColTextIndexPreprocessThrottler); } + _serverThreadPoolManager = new ServerThreadPoolManager(_serverConf); + _clusterConfigChangeHandler.registerClusterConfigChangeListener(_serverThreadPoolManager); + // Enable/disable thread CPU time measurement through instance config. ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled( _serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, @@ -732,7 +736,7 @@ public void start() initSegmentFetcher(_serverConf); StateModelFactory stateModelFactory = - new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); + new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, _serverThreadPoolManager); _helixManager.getStateMachineEngine() .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory); // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access @@ -945,7 +949,7 @@ public void stop() { _adminApiApplication.startShuttingDown(); _helixAdmin.setConfig(_instanceConfigScope, Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, Boolean.toString(true))); - + _serverThreadPoolManager.shutdown(); long endTimeMs = startTimeMs + _serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS, Server.DEFAULT_SHUTDOWN_TIMEOUT_MS); if (_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index d547741706b8..4309025fcb53 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.server.starter.helix; +import java.util.concurrent.ExecutorService; import org.apache.helix.NotificationContext; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateModel; @@ -40,10 +41,13 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory(), + new ThreadFactoryBuilder().setNameFormat("HelixTransitionExecutor-%d").build()); + _helixConsumingTransitionExecutor = + new ThreadPoolExecutor(helixConsumingTransitionThreadPoolSize, helixConsumingTransitionThreadPoolSize, + 0L, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setNameFormat("HelixConsumingTransitionExecutor-%d").build()); + } + + @Override + public void onChange(Set changedConfigs, Map clusterConfigs) { + if (changedConfigs.contains(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE)) { + int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE)); + _helixTransitionExecutor.setCorePoolSize(newPoolSize); + _helixTransitionExecutor.setMaximumPoolSize(newPoolSize); + } + if (changedConfigs.contains(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE)) { + int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE)); + _helixConsumingTransitionExecutor.setCorePoolSize(newPoolSize); + _helixConsumingTransitionExecutor.setMaximumPoolSize(newPoolSize); + } + } + + public ThreadPoolExecutor getHelixTransitionExecutor() { + return _helixTransitionExecutor; + } + + public ThreadPoolExecutor getHelixConsumingTransitionExecutor() { + return _helixConsumingTransitionExecutor; + } + + public void shutdown() { + _helixTransitionExecutor.shutdownNow(); + _helixConsumingTransitionExecutor.shutdownNow(); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 9a7f8804a203..7d7c9c6b3ca7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1112,6 +1112,14 @@ public static class Server { INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + RELOAD_CONSUMING_SEGMENT; public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true; + // Configs for helix thread pools + public static final String CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE = + "pinot.server.helix.transition.thread.pool.size"; + public static final int DEFAULT_HELIX_TRANSITION_THREAD_POOL_SIZE = 40; + public static final String CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE = + "pinot.server.helix.transition.consuming.thread.pool.size"; + public static final int DEFAULT_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE = 10; + // Query logger related configs public static final String CONFIG_OF_QUERY_LOG_MAX_RATE = "pinot.server.query.log.maxRatePerSecond"; @Deprecated From 6d3127e08c7bdcc47175a669c34bc0506947d183 Mon Sep 17 00:00:00 2001 From: J-HowHuang Date: Fri, 26 Dec 2025 11:35:57 -0800 Subject: [PATCH 2/2] Remain default behavior --- ...SegmentOnlineOfflineStateModelFactory.java | 2 + .../helix/ServerThreadPoolManager.java | 106 ++++++++++++++---- .../pinot/spi/utils/CommonConstants.java | 2 - 3 files changed, 84 insertions(+), 26 deletions(-) diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index 4309025fcb53..01b9d3c26d5c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -19,6 +19,7 @@ package org.apache.pinot.server.starter.helix; import java.util.concurrent.ExecutorService; +import javax.annotation.Nullable; import org.apache.helix.NotificationContext; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateModel; @@ -252,6 +253,7 @@ public void onBecomeDroppedFromError(Message message, NotificationContext contex } @Override + @Nullable public ExecutorService getExecutorService(String resourceName, String fromState, String toState) { if (fromState.equals("CONSUMING") || toState.equals("CONSUMING")) { return _serverThreadPoolManager.getHelixConsumingTransitionExecutor(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/ServerThreadPoolManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/ServerThreadPoolManager.java index ec6a76997a25..0653c01fccdc 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/ServerThreadPoolManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/ServerThreadPoolManager.java @@ -22,58 +22,116 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; +import javax.annotation.Nullable; +import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener; import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.pinot.spi.utils.CommonConstants.Server.CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE; import static org.apache.pinot.spi.utils.CommonConstants.Server.CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE; -import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE; -import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_HELIX_TRANSITION_THREAD_POOL_SIZE; +/** + * Manages the custom Helix state transition thread pools for Pinot server. + * Helix state transition will use their default thread pool if the thread pool provided by this class is null. Also see + * {@link SegmentOnlineOfflineStateModelFactory#getExecutorService(String, String, String)} + */ public class ServerThreadPoolManager implements PinotClusterConfigChangeListener { - private final ThreadPoolExecutor _helixTransitionExecutor; - private final ThreadPoolExecutor _helixConsumingTransitionExecutor; + private static final Logger LOGGER = LoggerFactory.getLogger(ServerThreadPoolManager.class); + @Nullable + private ThreadPoolExecutor _helixTransitionExecutor; + @Nullable + private ThreadPoolExecutor _helixConsumingTransitionExecutor; public ServerThreadPoolManager(PinotConfiguration serverConf) { - int helixTransitionThreadPoolSize = - serverConf.getProperty(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE, DEFAULT_HELIX_TRANSITION_THREAD_POOL_SIZE); - int helixConsumingTransitionThreadPoolSize = - serverConf.getProperty(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE, - DEFAULT_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE); - _helixTransitionExecutor = new ThreadPoolExecutor(helixTransitionThreadPoolSize, helixTransitionThreadPoolSize, - 0L, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(), - new ThreadFactoryBuilder().setNameFormat("HelixTransitionExecutor-%d").build()); - _helixConsumingTransitionExecutor = - new ThreadPoolExecutor(helixConsumingTransitionThreadPoolSize, helixConsumingTransitionThreadPoolSize, - 0L, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(), - new ThreadFactoryBuilder().setNameFormat("HelixConsumingTransitionExecutor-%d").build()); + String helixTransitionThreadPoolSizeStr = + serverConf.getProperty(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE); + String helixConsumingTransitionThreadPoolSizeStr = + serverConf.getProperty(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE); + if (helixTransitionThreadPoolSizeStr != null) { + int helixTransitionThreadPoolSize = Integer.parseInt(helixTransitionThreadPoolSizeStr); + _helixTransitionExecutor = new ThreadPoolExecutor(helixTransitionThreadPoolSize, helixTransitionThreadPoolSize, + 0L, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setNameFormat("HelixTransitionExecutor-%d").build()); + LOGGER.info("Created HelixTransitionExecutor with pool size: {}", helixTransitionThreadPoolSize); + } + if (helixConsumingTransitionThreadPoolSizeStr != null) { + int helixConsumingTransitionThreadPoolSize = Integer.parseInt(helixConsumingTransitionThreadPoolSizeStr); + _helixConsumingTransitionExecutor = + new ThreadPoolExecutor(helixConsumingTransitionThreadPoolSize, helixConsumingTransitionThreadPoolSize, + 0L, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setNameFormat("HelixConsumingTransitionExecutor-%d").build()); + LOGGER.info("Created HelixConsumingTransitionExecutor with pool size: {}", + helixConsumingTransitionThreadPoolSize); + } } + /** + * There will be no effect on the change to attempt to remove or add in the custom helix transition thread pool. + * For example, from null to 40 or from 40 to null. Because the thread pool would be registered via + * {@link StateModelFactory#getExecutorService(String, String, String)} upon the first time a transition type is + * seen, and will not be changed after that. + * But the change from 40 to 10, for example, is effective because it just changes the core size and max size of + * the thread pool, not to reassign the thread pool object. + */ @Override public void onChange(Set changedConfigs, Map clusterConfigs) { if (changedConfigs.contains(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE)) { - int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE)); - _helixTransitionExecutor.setCorePoolSize(newPoolSize); - _helixTransitionExecutor.setMaximumPoolSize(newPoolSize); + if (clusterConfigs.get(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE) != null) { + if (_helixTransitionExecutor == null) { + LOGGER.warn("Custom thread pool HelixTransitionExecutor cannot be created on the fly from the config change. " + + "Please restart the server to take effect."); + } else { + int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE)); + _helixTransitionExecutor.setCorePoolSize(newPoolSize); + _helixTransitionExecutor.setMaximumPoolSize(newPoolSize); + LOGGER.info("Updated HelixTransitionExecutor pool size to: {}", newPoolSize); + } + } else if (_helixTransitionExecutor != null) { + LOGGER.warn( + "Custom thread pool HelixTransitionExecutor would still be used for Helix state transitions even though " + + "the config {} is removed from cluster config, using the last known size: {}", + CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE, _helixTransitionExecutor.getCorePoolSize()); + } } if (changedConfigs.contains(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE)) { - int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE)); - _helixConsumingTransitionExecutor.setCorePoolSize(newPoolSize); - _helixConsumingTransitionExecutor.setMaximumPoolSize(newPoolSize); + if (clusterConfigs.get(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE) != null) { + if (_helixConsumingTransitionExecutor == null) { + LOGGER.warn( + "Custom thread pool HelixConsumingTransitionExecutor cannot be created on the fly from the config " + + "change. Please restart the server to take effect."); + } else { + int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE)); + _helixConsumingTransitionExecutor.setCorePoolSize(newPoolSize); + _helixConsumingTransitionExecutor.setMaximumPoolSize(newPoolSize); + LOGGER.info("Updated HelixConsumingTransitionExecutor pool size to: {}", newPoolSize); + } + } else if (_helixConsumingTransitionExecutor != null) { + LOGGER.warn("Custom thread pool HelixConsumingTransitionExecutor would still be used for Helix consuming state " + + "transitions even though the config {} is removed from cluster config, using the last known size: {}", + CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE, _helixConsumingTransitionExecutor.getCorePoolSize()); + } } } + @Nullable public ThreadPoolExecutor getHelixTransitionExecutor() { return _helixTransitionExecutor; } + @Nullable public ThreadPoolExecutor getHelixConsumingTransitionExecutor() { return _helixConsumingTransitionExecutor; } public void shutdown() { - _helixTransitionExecutor.shutdownNow(); - _helixConsumingTransitionExecutor.shutdownNow(); + if (_helixTransitionExecutor != null) { + _helixTransitionExecutor.shutdownNow(); + } + if (_helixConsumingTransitionExecutor != null) { + _helixConsumingTransitionExecutor.shutdownNow(); + } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 7d7c9c6b3ca7..6ca80aa6d65f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1115,10 +1115,8 @@ public static class Server { // Configs for helix thread pools public static final String CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE = "pinot.server.helix.transition.thread.pool.size"; - public static final int DEFAULT_HELIX_TRANSITION_THREAD_POOL_SIZE = 40; public static final String CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE = "pinot.server.helix.transition.consuming.thread.pool.size"; - public static final int DEFAULT_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE = 10; // Query logger related configs public static final String CONFIG_OF_QUERY_LOG_MAX_RATE = "pinot.server.query.log.maxRatePerSecond";