Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.Cancellable;
Expand Down Expand Up @@ -87,6 +93,14 @@ public class H2MultiplexingRequester extends AsyncRequester {

private final H2ConnPool connPool;

/**
* Hard cap on per-connection queued / in-flight requests.
* {@code <= 0} disables the cap.
*/
private final int maxRequestsPerConnection;

private final Map<IOSession, AtomicInteger> pendingRequestMap;

/**
* Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class.
*/
Expand All @@ -100,11 +114,14 @@ public H2MultiplexingRequester(
final Resolver<HttpHost, InetSocketAddress> addressResolver,
final TlsStrategy tlsStrategy,
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector) {
final IOWorkerSelector workerSelector,
final int maxRequestsPerConnection) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE,
threadPoolListener, workerSelector);
this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy);
this.maxRequestsPerConnection = maxRequestsPerConnection;
this.pendingRequestMap = Collections.synchronizedMap(new WeakHashMap<>());
}

public void closeIdle(final TimeValue idleTime) {
Expand Down Expand Up @@ -166,6 +183,16 @@ public Cancellable execute(
return execute(null, exchangeHandler, null, timeout, context);
}

private AtomicInteger getPendingCounter(final IOSession ioSession) {
final AtomicInteger counter = pendingRequestMap.get(ioSession);
if (counter != null) {
return counter;
}
final AtomicInteger newCounter = new AtomicInteger(0);
pendingRequestMap.put(ioSession, newCounter);
return newCounter;
}

private void execute(
final HttpHost target,
final AsyncClientExchangeHandler exchangeHandler,
Expand All @@ -182,83 +209,54 @@ private void execute(
if (request.getAuthority() == null) {
request.setAuthority(new URIAuthority(host));
}
if (request.getScheme() == null) {
request.setScheme(host.getSchemeName());
}
connPool.getSession(host, timeout, new FutureCallback<IOSession>() {

@Override
public void completed(final IOSession ioSession) {
final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler() {

@Override
public void releaseResources() {
final int max = maxRequestsPerConnection;
final AtomicInteger pendingCounter;
if (max > 0) {
pendingCounter = getPendingCounter(ioSession);
final int current = pendingCounter.incrementAndGet();
if (current > max) {
pendingCounter.decrementAndGet();
exchangeHandler.failed(new RejectedExecutionException(
"Maximum number of pending requests per connection reached (max=" + max + ")"));
exchangeHandler.releaseResources();
return;
}
} else {
pendingCounter = null;
}

@Override
public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
channel.sendRequest(request, entityDetails, httpContext);
}

@Override
public int available() {
return exchangeHandler.available();
}

@Override
public void produce(final DataStreamChannel channel) throws IOException {
exchangeHandler.produce(channel);
}

@Override
public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
exchangeHandler.consumeInformation(response, httpContext);
}

@Override
public void consumeResponse(
final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
exchangeHandler.consumeResponse(response, entityDetails, httpContext);
}

@Override
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
exchangeHandler.updateCapacity(capacityChannel);
}

@Override
public void consume(final ByteBuffer src) throws IOException {
exchangeHandler.consume(src);
}

@Override
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
exchangeHandler.streamEnd(trailers);
}

@Override
public void cancel() {
exchangeHandler.cancel();
}

@Override
public void failed(final Exception cause) {
exchangeHandler.failed(cause);
}
final AsyncClientExchangeHandler handlerProxy;
if (pendingCounter != null) {
handlerProxy = new SlotReleasingExchangeHandler(exchangeHandler, pendingCounter);
} else {
handlerProxy = exchangeHandler;
}

};
final Timeout socketTimeout = ioSession.getSocketTimeout();
ioSession.enqueue(new RequestExecutionCommand(
handlerProxy,
pushHandlerFactory,
context,
streamControl -> {
cancellableDependency.setDependency(streamControl);
if (socketTimeout != null) {
streamControl.setTimeout(socketTimeout);
}
}),
Command.Priority.NORMAL);
final RequestExecutionCommand command = new RequestExecutionCommand(
handlerProxy,
pushHandlerFactory,
context,
streamControl -> {
cancellableDependency.setDependency(streamControl);
if (socketTimeout != null) {
streamControl.setTimeout(socketTimeout);
}
});

ioSession.enqueue(command, Command.Priority.NORMAL);

if (!ioSession.isOpen()) {
exchangeHandler.failed(new ConnectionClosedException());
handlerProxy.failed(new ConnectionClosedException());
handlerProxy.releaseResources();
}
}

Expand Down Expand Up @@ -350,4 +348,106 @@ public H2ConnPool getConnPool() {
return connPool;
}

private static final class SlotReleasingExchangeHandler implements AsyncClientExchangeHandler {

private final AsyncClientExchangeHandler exchangeHandler;
private final AtomicInteger pendingCounter;
private final AtomicBoolean released;

private SlotReleasingExchangeHandler(final AsyncClientExchangeHandler exchangeHandler, final AtomicInteger pendingCounter) {
this.exchangeHandler = exchangeHandler;
this.pendingCounter = pendingCounter;
this.released = new AtomicBoolean(false);
}

@Override
public void releaseResources() {
if (released.compareAndSet(false, true)) {
pendingCounter.decrementAndGet();
}
exchangeHandler.releaseResources();
}

@Override
public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
exchangeHandler.produceRequest(channel, httpContext);
}

@Override
public int available() {
return exchangeHandler.available();
}

@Override
public void produce(final DataStreamChannel channel) throws IOException {
exchangeHandler.produce(channel);
}

@Override
public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
exchangeHandler.consumeInformation(response, httpContext);
}

@Override
public void consumeResponse(
final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
exchangeHandler.consumeResponse(response, entityDetails, httpContext);
}

@Override
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
exchangeHandler.updateCapacity(capacityChannel);
}

@Override
public void consume(final ByteBuffer src) throws IOException {
exchangeHandler.consume(src);
}

@Override
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
exchangeHandler.streamEnd(trailers);
}

@Override
public void cancel() {
exchangeHandler.cancel();
}

@Override
public void failed(final Exception cause) {
exchangeHandler.failed(cause);
}

}

/**
* Cancellable that can be wired to the stream control once it becomes available.
*/
private static final class CancellableExecution implements Cancellable, CancellableDependency {

private volatile Cancellable dependency;

@Override
public void setDependency(final Cancellable dependency) {
this.dependency = dependency;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean cancel() {
final Cancellable local = this.dependency;
if (local != null) {
local.cancel();
return true;
}
return false;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.function.Supplier;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class H2MultiplexingRequesterBootstrap {

private IOReactorMetricsListener threadPoolListener;

private int maxRequestsPerConnection;

private H2MultiplexingRequesterBootstrap() {
this.routeEntries = new ArrayList<>();
}
Expand Down Expand Up @@ -180,6 +183,23 @@ public final H2MultiplexingRequesterBootstrap setIOReactorMetricsListener(final
return this;
}

/**
* Sets a hard limit on the number of pending request execution commands that can be queued per connection.
* When the limit is reached, new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
* A value {@code <= 0} disables the limit (default).
* Note: this limit applies to commands waiting in the connection's internal queue (backlog). HTTP/2 in-flight
* concurrency is governed separately by protocol settings (e.g. MAX_CONCURRENT_STREAMS).
*
* @param max maximum number of pending requests per connection; {@code <= 0} to disable the limit.
* @return this instance.
* @since 5.5
*/
@Experimental
public final H2MultiplexingRequesterBootstrap setMaxRequestsPerConnection(final int max) {
this.maxRequestsPerConnection = max;
return this;
}

/**
* Sets {@link H2StreamListener} instance.
*
Expand Down Expand Up @@ -274,7 +294,8 @@ public H2MultiplexingRequester create() {
DefaultAddressResolver.INSTANCE,
tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(),
threadPoolListener,
null);
null,
maxRequestsPerConnection);
}

}
Loading