Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public abstract class BaseServerStarter implements ServiceStartable {
protected RealtimeLuceneIndexRefreshManager _realtimeLuceneTextIndexRefreshManager;
protected PinotEnvironmentProvider _pinotEnvironmentProvider;
protected SegmentOperationsThrottler _segmentOperationsThrottler;
protected ServerThreadPoolManager _serverThreadPoolManager;
protected ThreadAccountant _threadAccountant;
protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler;
protected volatile boolean _isServerReadyToServeQueries = false;
Expand Down Expand Up @@ -701,6 +702,9 @@ public void start()
segmentDownloadThrottler, segmentMultiColTextIndexPreprocessThrottler);
}

_serverThreadPoolManager = new ServerThreadPoolManager(_serverConf);
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_serverThreadPoolManager);

// Enable/disable thread CPU time measurement through instance config.
ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
Expand Down Expand Up @@ -732,7 +736,7 @@ public void start()

initSegmentFetcher(_serverConf);
StateModelFactory<?> stateModelFactory =
new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager);
new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, _serverThreadPoolManager);
_helixManager.getStateMachineEngine()
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory);
// Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access
Expand Down Expand Up @@ -945,7 +949,7 @@ public void stop() {
_adminApiApplication.startShuttingDown();
_helixAdmin.setConfig(_instanceConfigScope,
Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, Boolean.toString(true)));

_serverThreadPoolManager.shutdown();
long endTimeMs =
startTimeMs + _serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS, Server.DEFAULT_SHUTDOWN_TIMEOUT_MS);
if (_serverConf.getProperty(Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pinot.server.starter.helix;

import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
Expand All @@ -40,10 +42,13 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta

private final String _instanceId;
private final InstanceDataManager _instanceDataManager;
private final ServerThreadPoolManager _serverThreadPoolManager;

public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) {
public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager,
ServerThreadPoolManager serverThreadPoolManager) {
_instanceId = instanceId;
_instanceDataManager = instanceDataManager;
_serverThreadPoolManager = serverThreadPoolManager;
}

public static String getStateModelName() {
Expand Down Expand Up @@ -162,7 +167,6 @@ private void onConsumingToDropped(String realtimeTableName, String segmentName)
public void onBecomeOnlineFromOffline(Message message, NotificationContext context)
throws Exception {
_logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : {}", message);

try {
_instanceDataManager.addOnlineSegment(message.getResourceName(), message.getPartitionName());
} catch (Exception e) {
Expand Down Expand Up @@ -247,4 +251,13 @@ public void onBecomeDroppedFromError(Message message, NotificationContext contex
}
}
}

@Override
@Nullable
public ExecutorService getExecutorService(String resourceName, String fromState, String toState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The helix docs of StateModelFactory states "This method is called just once per transition type per resource. "

What is a transition type ? Will this be an issue for us ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transition type means "from which state to which state". For each pair of states, this method will be called with it at most once, to register the thread pool to that transition type.

This would not be an issue since it constructs the thread pools for the transitions in the new ServerThreadPoolManager at server start, then reuse them throughout the server lifetime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't change default behaviour . so by default we should return null here so that it goes through existing code flow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it. Example configs:

helix.transition.thread.pool.size helix.transition.consuming.thread.pool.size result
null null original path
null 10 transition from/to consuming use the new thread pool, others use the original
40 null transition from/to consuming use the original path, others use the new thread pool
40 10 all transitions use the new thread pools

Added a note there will be no effect on the change from null to 40 or from 40 to null, because the thread pool would be registered via getExecutor upon the first time a transition type is seen, and will not be changed after that. But the change from 40 to 10, for example, is effective because it just changes the core size and max size of the thread pool, not changing the thread pool object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally lets add a seperate flag to create a threadpool per resource (table). that way we achieve more isolation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm unsure is that this allows the limit of ongoing transitions to go from 40 to something way bigger. Especially in the case during server start, a cluster with 100 tables would then results in 4000 state transitions concurrently. If we have all expensive operations throttled, then having thread pool per table might be safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we need to configure it appropriatley. Overall, I think we need 3 behaviours

  1. default - use existing helix thread pool executor
  2. thread pool based on state transition type (already added in the PR). I agree this is a more manageable conf
  3. thread pool based on resource name. This will be required in extreme cases where tables are stepping on each other. Having this capability will enable configuring it when required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you agree on thread pool based on resource name, we can add that in a followup PR

if (fromState.equals("CONSUMING") || toState.equals("CONSUMING")) {
return _serverThreadPoolManager.getHelixConsumingTransitionExecutor();
}
return _serverThreadPoolManager.getHelixTransitionExecutor();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.server.starter.helix;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import javax.annotation.Nullable;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.spi.utils.CommonConstants.Server.CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE;
import static org.apache.pinot.spi.utils.CommonConstants.Server.CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE;


/**
* Manages the custom Helix state transition thread pools for Pinot server.
* Helix state transition will use their default thread pool if the thread pool provided by this class is null. Also see
* {@link SegmentOnlineOfflineStateModelFactory#getExecutorService(String, String, String)}
*/
public class ServerThreadPoolManager implements PinotClusterConfigChangeListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerThreadPoolManager.class);
@Nullable
private ThreadPoolExecutor _helixTransitionExecutor;
@Nullable
private ThreadPoolExecutor _helixConsumingTransitionExecutor;

public ServerThreadPoolManager(PinotConfiguration serverConf) {
String helixTransitionThreadPoolSizeStr =
serverConf.getProperty(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE);
String helixConsumingTransitionThreadPoolSizeStr =
serverConf.getProperty(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE);
if (helixTransitionThreadPoolSizeStr != null) {
int helixTransitionThreadPoolSize = Integer.parseInt(helixTransitionThreadPoolSizeStr);
_helixTransitionExecutor = new ThreadPoolExecutor(helixTransitionThreadPoolSize, helixTransitionThreadPoolSize,
0L, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("HelixTransitionExecutor-%d").build());
LOGGER.info("Created HelixTransitionExecutor with pool size: {}", helixTransitionThreadPoolSize);
}
if (helixConsumingTransitionThreadPoolSizeStr != null) {
int helixConsumingTransitionThreadPoolSize = Integer.parseInt(helixConsumingTransitionThreadPoolSizeStr);
_helixConsumingTransitionExecutor =
new ThreadPoolExecutor(helixConsumingTransitionThreadPoolSize, helixConsumingTransitionThreadPoolSize,
0L, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("HelixConsumingTransitionExecutor-%d").build());
LOGGER.info("Created HelixConsumingTransitionExecutor with pool size: {}",
helixConsumingTransitionThreadPoolSize);
}
}

/**
* There will be no effect on the change to attempt to remove or add in the custom helix transition thread pool.
* For example, from null to 40 or from 40 to null. Because the thread pool would be registered via
* {@link StateModelFactory#getExecutorService(String, String, String)} upon the first time a transition type is
* seen, and will not be changed after that.
* But the change from 40 to 10, for example, is effective because it just changes the core size and max size of
* the thread pool, not to reassign the thread pool object.
*/
@Override
public void onChange(Set<String> changedConfigs, Map<String, String> clusterConfigs) {
if (changedConfigs.contains(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE)) {
if (clusterConfigs.get(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE) != null) {
if (_helixTransitionExecutor == null) {
LOGGER.warn("Custom thread pool HelixTransitionExecutor cannot be created on the fly from the config change. "
+ "Please restart the server to take effect.");
} else {
int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE));
_helixTransitionExecutor.setCorePoolSize(newPoolSize);
_helixTransitionExecutor.setMaximumPoolSize(newPoolSize);
LOGGER.info("Updated HelixTransitionExecutor pool size to: {}", newPoolSize);
}
} else if (_helixTransitionExecutor != null) {
LOGGER.warn(
"Custom thread pool HelixTransitionExecutor would still be used for Helix state transitions even though "
+ "the config {} is removed from cluster config, using the last known size: {}",
CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE, _helixTransitionExecutor.getCorePoolSize());
}
}
if (changedConfigs.contains(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE)) {
if (clusterConfigs.get(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE) != null) {
if (_helixConsumingTransitionExecutor == null) {
LOGGER.warn(
"Custom thread pool HelixConsumingTransitionExecutor cannot be created on the fly from the config "
+ "change. Please restart the server to take effect.");
} else {
int newPoolSize = Integer.parseInt(clusterConfigs.get(CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE));
_helixConsumingTransitionExecutor.setCorePoolSize(newPoolSize);
_helixConsumingTransitionExecutor.setMaximumPoolSize(newPoolSize);
LOGGER.info("Updated HelixConsumingTransitionExecutor pool size to: {}", newPoolSize);
}
} else if (_helixConsumingTransitionExecutor != null) {
LOGGER.warn("Custom thread pool HelixConsumingTransitionExecutor would still be used for Helix consuming state "
+ "transitions even though the config {} is removed from cluster config, using the last known size: {}",
CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE, _helixConsumingTransitionExecutor.getCorePoolSize());
}
}
}

@Nullable
public ThreadPoolExecutor getHelixTransitionExecutor() {
return _helixTransitionExecutor;
}

@Nullable
public ThreadPoolExecutor getHelixConsumingTransitionExecutor() {
return _helixConsumingTransitionExecutor;
}

public void shutdown() {
if (_helixTransitionExecutor != null) {
_helixTransitionExecutor.shutdownNow();
}
if (_helixConsumingTransitionExecutor != null) {
_helixConsumingTransitionExecutor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,12 @@ public static class Server {
INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + RELOAD_CONSUMING_SEGMENT;
public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true;

// Configs for helix thread pools
public static final String CONFIG_OF_HELIX_TRANSITION_THREAD_POOL_SIZE =
"pinot.server.helix.transition.thread.pool.size";
public static final String CONFIG_OF_HELIX_CONSUMING_TRANSITION_THREAD_POOL_SIZE =
"pinot.server.helix.transition.consuming.thread.pool.size";

// Query logger related configs
public static final String CONFIG_OF_QUERY_LOG_MAX_RATE = "pinot.server.query.log.maxRatePerSecond";
@Deprecated
Expand Down
Loading