diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 0b33ed2dd8dc..8b1f23cf29b0 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -178,6 +178,7 @@ public abstract class BaseServerStarter implements ServiceStartable { protected RealtimeLuceneIndexRefreshManager _realtimeLuceneTextIndexRefreshManager; protected PinotEnvironmentProvider _pinotEnvironmentProvider; protected SegmentOperationsThrottler _segmentOperationsThrottler; + protected ServerThreadPoolManager _serverThreadPoolManager; protected ThreadAccountant _threadAccountant; protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler; protected volatile boolean _isServerReadyToServeQueries = false; @@ -701,6 +702,9 @@ public void start() segmentDownloadThrottler, segmentMultiColTextIndexPreprocessThrottler); } + _serverThreadPoolManager = new ServerThreadPoolManager(_serverConf); + _clusterConfigChangeHandler.registerClusterConfigChangeListener(_serverThreadPoolManager); + // Enable/disable thread CPU time measurement through instance config. ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled( _serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, @@ -732,7 +736,7 @@ public void start() initSegmentFetcher(_serverConf); StateModelFactory stateModelFactory = - new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); + new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, _serverThreadPoolManager); _helixManager.getStateMachineEngine() .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory); // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access @@ -945,7 +949,7 @@ public void stop() { _adminApiApplication.startShuttingDown(); _helixAdmin.setConfig(_instanceConfigScope, Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, Boolean.toString(true))); - + _serverThreadPoolManager.shutdown(); long endTimeMs = startTimeMs + _serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS, Server.DEFAULT_SHUTDOWN_TIMEOUT_MS); if (_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK, diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index d547741706b8..01b9d3c26d5c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.server.starter.helix; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nullable; import org.apache.helix.NotificationContext; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateModel; @@ -40,10 +42,13 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory(), + new ThreadFactoryBuilder().setNameFormat("HelixTransitionExecutor-%d").build()); + LOGGER.info("Created HelixTransitionExecutor with pool size: {}", helixTransitionThreadPoolSize); + } + if (helixConsumingTransitionThreadPoolSizeStr != null) { + int helixConsumingTransitionThreadPoolSize = Integer.parseInt(helixConsumingTransitionThreadPoolSizeStr); + _helixConsumingTransitionExecutor = + new ThreadPoolExecutor(helixConsumingTransitionThreadPoolSize, helixConsumingTransitionThreadPoolSize, + 0L, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setNameFormat("HelixConsumingTransitionExecutor-%d").build()); + LOGGER.info("Created HelixConsumingTransitionExecutor with pool size: {}", + helixConsumingTransitionThreadPoolSize); + } + } + + /** + * There will be no effect on the change to attempt to remove or add in the custom helix transition thread pool. + * For example, from null to 40 or from 40 to null. Because the thread pool would be registered via + * {@link StateModelFactory#getExecutorService(String, String, String)} upon the first time a transition type is + * seen, and will not be changed after that. + * But the change from 40 to 10, for example, is effective because it just changes the core size and max size of + * the thread pool, not to reassign the thread pool object. + */ + @Override + public void onChange(Set changedConfigs, Map clusterConfigs) { + if (changedConfigs.contains(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE)) { + if (clusterConfigs.get(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE) != null) { + if (_helixTransitionExecutor == null) { + LOGGER.warn("Custom thread pool HelixTransitionExecutor cannot be created on the fly from the config change. " + + "Please restart the server to take effect."); + } else { + int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE)); + _helixTransitionExecutor.setCorePoolSize(newPoolSize); + _helixTransitionExecutor.setMaximumPoolSize(newPoolSize); + LOGGER.info("Updated HelixTransitionExecutor pool size to: {}", newPoolSize); + } + } else if (_helixTransitionExecutor != null) { + LOGGER.warn( + "Custom thread pool HelixTransitionExecutor would still be used for Helix state transitions even though " + + "the config {} is removed from cluster config, using the last known size: {}", + CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE, _helixTransitionExecutor.getCorePoolSize()); + } + } + if (changedConfigs.contains(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE)) { + if (clusterConfigs.get(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE) != null) { + if (_helixConsumingTransitionExecutor == null) { + LOGGER.warn( + "Custom thread pool HelixConsumingTransitionExecutor cannot be created on the fly from the config " + + "change. Please restart the server to take effect."); + } else { + int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE)); + _helixConsumingTransitionExecutor.setCorePoolSize(newPoolSize); + _helixConsumingTransitionExecutor.setMaximumPoolSize(newPoolSize); + LOGGER.info("Updated HelixConsumingTransitionExecutor pool size to: {}", newPoolSize); + } + } else if (_helixConsumingTransitionExecutor != null) { + LOGGER.warn("Custom thread pool HelixConsumingTransitionExecutor would still be used for Helix consuming state " + + "transitions even though the config {} is removed from cluster config, using the last known size: {}", + CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE, _helixConsumingTransitionExecutor.getCorePoolSize()); + } + } + } + + @Nullable + public ThreadPoolExecutor getHelixTransitionExecutor() { + return _helixTransitionExecutor; + } + + @Nullable + public ThreadPoolExecutor getHelixConsumingTransitionExecutor() { + return _helixConsumingTransitionExecutor; + } + + public void shutdown() { + if (_helixTransitionExecutor != null) { + _helixTransitionExecutor.shutdownNow(); + } + if (_helixConsumingTransitionExecutor != null) { + _helixConsumingTransitionExecutor.shutdownNow(); + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 9a7f8804a203..6ca80aa6d65f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1112,6 +1112,12 @@ public static class Server { INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + RELOAD_CONSUMING_SEGMENT; public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true; + // Configs for helix thread pools + public static final String CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE = + "pinot.server.helix.transition.thread.pool.size"; + public static final String CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE = + "pinot.server.helix.transition.consuming.thread.pool.size"; + // Query logger related configs public static final String CONFIG_OF_QUERY_LOG_MAX_RATE = "pinot.server.query.log.maxRatePerSecond"; @Deprecated