diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java index f03bd20577..997d05b32f 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java @@ -30,9 +30,15 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.WeakHashMap; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.Cancellable; @@ -87,6 +93,14 @@ public class H2MultiplexingRequester extends AsyncRequester { private final H2ConnPool connPool; + /** + * Hard cap on per-connection queued / in-flight requests. + * {@code <= 0} disables the cap. + */ + private final int maxRequestsPerConnection; + + private final Map pendingRequestMap; + /** * Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class. */ @@ -100,11 +114,14 @@ public H2MultiplexingRequester( final Resolver addressResolver, final TlsStrategy tlsStrategy, final IOReactorMetricsListener threadPoolListener, - final IOWorkerSelector workerSelector) { + final IOWorkerSelector workerSelector, + final int maxRequestsPerConnection) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener, workerSelector); this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy); + this.maxRequestsPerConnection = maxRequestsPerConnection; + this.pendingRequestMap = Collections.synchronizedMap(new WeakHashMap<>()); } public void closeIdle(final TimeValue idleTime) { @@ -166,6 +183,16 @@ public Cancellable execute( return execute(null, exchangeHandler, null, timeout, context); } + private AtomicInteger getPendingCounter(final IOSession ioSession) { + final AtomicInteger counter = pendingRequestMap.get(ioSession); + if (counter != null) { + return counter; + } + final AtomicInteger newCounter = new AtomicInteger(0); + pendingRequestMap.put(ioSession, newCounter); + return newCounter; + } + private void execute( final HttpHost target, final AsyncClientExchangeHandler exchangeHandler, @@ -182,83 +209,54 @@ private void execute( if (request.getAuthority() == null) { request.setAuthority(new URIAuthority(host)); } + if (request.getScheme() == null) { + request.setScheme(host.getSchemeName()); + } connPool.getSession(host, timeout, new FutureCallback() { @Override public void completed(final IOSession ioSession) { - final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler() { - @Override - public void releaseResources() { + final int max = maxRequestsPerConnection; + final AtomicInteger pendingCounter; + if (max > 0) { + pendingCounter = getPendingCounter(ioSession); + final int current = pendingCounter.incrementAndGet(); + if (current > max) { + pendingCounter.decrementAndGet(); + exchangeHandler.failed(new RejectedExecutionException( + "Maximum number of pending requests per connection reached (max=" + max + ")")); exchangeHandler.releaseResources(); + return; } + } else { + pendingCounter = null; + } - @Override - public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException { - channel.sendRequest(request, entityDetails, httpContext); - } - - @Override - public int available() { - return exchangeHandler.available(); - } - - @Override - public void produce(final DataStreamChannel channel) throws IOException { - exchangeHandler.produce(channel); - } - - @Override - public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { - exchangeHandler.consumeInformation(response, httpContext); - } - - @Override - public void consumeResponse( - final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { - exchangeHandler.consumeResponse(response, entityDetails, httpContext); - } - - @Override - public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { - exchangeHandler.updateCapacity(capacityChannel); - } - - @Override - public void consume(final ByteBuffer src) throws IOException { - exchangeHandler.consume(src); - } - - @Override - public void streamEnd(final List trailers) throws HttpException, IOException { - exchangeHandler.streamEnd(trailers); - } - - @Override - public void cancel() { - exchangeHandler.cancel(); - } - - @Override - public void failed(final Exception cause) { - exchangeHandler.failed(cause); - } + final AsyncClientExchangeHandler handlerProxy; + if (pendingCounter != null) { + handlerProxy = new SlotReleasingExchangeHandler(exchangeHandler, pendingCounter); + } else { + handlerProxy = exchangeHandler; + } - }; final Timeout socketTimeout = ioSession.getSocketTimeout(); - ioSession.enqueue(new RequestExecutionCommand( - handlerProxy, - pushHandlerFactory, - context, - streamControl -> { - cancellableDependency.setDependency(streamControl); - if (socketTimeout != null) { - streamControl.setTimeout(socketTimeout); - } - }), - Command.Priority.NORMAL); + final RequestExecutionCommand command = new RequestExecutionCommand( + handlerProxy, + pushHandlerFactory, + context, + streamControl -> { + cancellableDependency.setDependency(streamControl); + if (socketTimeout != null) { + streamControl.setTimeout(socketTimeout); + } + }); + + ioSession.enqueue(command, Command.Priority.NORMAL); + if (!ioSession.isOpen()) { - exchangeHandler.failed(new ConnectionClosedException()); + handlerProxy.failed(new ConnectionClosedException()); + handlerProxy.releaseResources(); } } @@ -350,4 +348,106 @@ public H2ConnPool getConnPool() { return connPool; } + private static final class SlotReleasingExchangeHandler implements AsyncClientExchangeHandler { + + private final AsyncClientExchangeHandler exchangeHandler; + private final AtomicInteger pendingCounter; + private final AtomicBoolean released; + + private SlotReleasingExchangeHandler(final AsyncClientExchangeHandler exchangeHandler, final AtomicInteger pendingCounter) { + this.exchangeHandler = exchangeHandler; + this.pendingCounter = pendingCounter; + this.released = new AtomicBoolean(false); + } + + @Override + public void releaseResources() { + if (released.compareAndSet(false, true)) { + pendingCounter.decrementAndGet(); + } + exchangeHandler.releaseResources(); + } + + @Override + public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException { + exchangeHandler.produceRequest(channel, httpContext); + } + + @Override + public int available() { + return exchangeHandler.available(); + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + exchangeHandler.produce(channel); + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { + exchangeHandler.consumeInformation(response, httpContext); + } + + @Override + public void consumeResponse( + final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { + exchangeHandler.consumeResponse(response, entityDetails, httpContext); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + exchangeHandler.updateCapacity(capacityChannel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + exchangeHandler.consume(src); + } + + @Override + public void streamEnd(final List trailers) throws HttpException, IOException { + exchangeHandler.streamEnd(trailers); + } + + @Override + public void cancel() { + exchangeHandler.cancel(); + } + + @Override + public void failed(final Exception cause) { + exchangeHandler.failed(cause); + } + + } + + /** + * Cancellable that can be wired to the stream control once it becomes available. + */ + private static final class CancellableExecution implements Cancellable, CancellableDependency { + + private volatile Cancellable dependency; + + @Override + public void setDependency(final Cancellable dependency) { + this.dependency = dependency; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean cancel() { + final Cancellable local = this.dependency; + if (local != null) { + local.cancel(); + return true; + } + return false; + } + + } + } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java index a19e7913fc..8ee8e8cd81 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hc.core5.annotation.Experimental; import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.function.Decorator; import org.apache.hc.core5.function.Supplier; @@ -76,6 +77,8 @@ public class H2MultiplexingRequesterBootstrap { private IOReactorMetricsListener threadPoolListener; + private int maxRequestsPerConnection; + private H2MultiplexingRequesterBootstrap() { this.routeEntries = new ArrayList<>(); } @@ -180,6 +183,23 @@ public final H2MultiplexingRequesterBootstrap setIOReactorMetricsListener(final return this; } + /** + * Sets a hard limit on the number of pending request execution commands that can be queued per connection. + * When the limit is reached, new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}. + * A value {@code <= 0} disables the limit (default). + * Note: this limit applies to commands waiting in the connection's internal queue (backlog). HTTP/2 in-flight + * concurrency is governed separately by protocol settings (e.g. MAX_CONCURRENT_STREAMS). + * + * @param max maximum number of pending requests per connection; {@code <= 0} to disable the limit. + * @return this instance. + * @since 5.5 + */ + @Experimental + public final H2MultiplexingRequesterBootstrap setMaxRequestsPerConnection(final int max) { + this.maxRequestsPerConnection = max; + return this; + } + /** * Sets {@link H2StreamListener} instance. * @@ -274,7 +294,8 @@ public H2MultiplexingRequester create() { DefaultAddressResolver.INSTANCE, tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(), threadPoolListener, - null); + null, + maxRequestsPerConnection); } } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionLocalExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionLocalExample.java new file mode 100644 index 0000000000..d82975d041 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2MaxRequestsPerConnectionLocalExample.java @@ -0,0 +1,245 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.nio.AsyncRequestConsumer; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestConsumer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.util.Timeout; + +/** + * Local integration example that exercises {@code H2MultiplexingRequesterBootstrap#setMaxRequestsPerConnection(int)}. + *

+ * The example starts a local HTTP/2 server and a single-connection HTTP/2 requester. The server responds with a fixed + * delay to keep streams busy and make the client build up a backlog of request execution commands on the connection. + * With {@code maxRequestsPerConnection} set to a small value, submissions beyond the configured cap fail fast with + * {@link java.util.concurrent.RejectedExecutionException}. This demonstrates a per-connection hard cap on queued + * (pending) requests using the {@link org.apache.hc.core5.reactor.IOSession} command queue, not a separate client-side + * queue. + *

+ * Note this cap limits the number of pending execution commands associated with a single connection. Protocol-level + * concurrency (in-flight streams) is still governed by HTTP/2 settings (for example {@code MAX_CONCURRENT_STREAMS}) + * and server behaviour. + * + * @since 5.5 + */ +public final class H2MaxRequestsPerConnectionLocalExample { + + public static void main(final String[] args) throws Exception { + final int maxPerConn = 2; // keep small + final int totalRequests = 50; // make larger + final Timeout timeout = Timeout.ofSeconds(30); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setIoThreadCount(1) + .build(); + + final H2Config serverH2Config = H2Config.custom() + .setPushEnabled(false) + .build(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(serverH2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setCanonicalHostName("127.0.0.1") // avoids 421 + .register("*", new AsyncServerRequestHandler>() { + + @Override + public AsyncRequestConsumer> prepare( + final HttpRequest request, + final EntityDetails entityDetails, + final HttpContext context) { + return new BasicRequestConsumer<>( + entityDetails != null ? new DiscardingEntityConsumer() : null); + } + + @Override + public void handle( + final Message message, + final ResponseTrigger responseTrigger, + final HttpContext context) { + + final String path = message.getHead().getPath(); + System.out.println("server accepted " + path + " (reply in 2s)"); + + scheduler.schedule(() -> { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(200) + .setEntity("ok\n", ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ex) { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(500) + .setEntity(ex.toString(), ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ignore) { + // ignore + } + } + }, 2, TimeUnit.SECONDS); + } + + }) + .create(); + + server.start(); + final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get(); + final int port = ((InetSocketAddress) ep.getAddress()).getPort(); + System.out.println("server on 127.0.0.1:" + port); + + final H2Config clientH2Config = H2Config.custom() + .setPushEnabled(false) + .build(); + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(clientH2Config) + .setMaxRequestsPerConnection(maxPerConn) + .create(); + + requester.start(); + + final HttpHost target = new HttpHost("http", "127.0.0.1", port); + + // Warmup (establish connection) + requester.execute( + target, + AsyncRequestBuilder.get().setHttpHost(target).setPath("/warmup").build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + @Override + public void completed(final Message result) { + System.out.println("warmup -> " + result.getHead().getCode()); + } + + @Override + public void failed(final Exception ex) { + System.out.println("warmup failed -> " + ex.getClass().getName() + ": " + ex.getMessage()); + } + + @Override + public void cancelled() { + System.out.println("warmup cancelled"); + } + }).get(); + + final AtomicInteger ok = new AtomicInteger(0); + final AtomicInteger rejected = new AtomicInteger(0); + final AtomicInteger failed = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(totalRequests); + + final ExecutorService exec = Executors.newFixedThreadPool(16); + + for (int i = 0; i < totalRequests; i++) { + final int id = i; + exec.execute(() -> { + final String path = "/slow?i=" + id; + requester.execute( + target, + AsyncRequestBuilder.get().setHttpHost(target).setPath(path).build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + + @Override + public void completed(final Message message) { + ok.incrementAndGet(); + latch.countDown(); + } + + @Override + public void failed(final Exception ex) { + if (ex instanceof RejectedExecutionException) { + rejected.incrementAndGet(); + } else { + failed.incrementAndGet(); + } + latch.countDown(); + } + + @Override + public void cancelled() { + failed.incrementAndGet(); + latch.countDown(); + } + }); + }); + } + + final boolean done = latch.await(60, TimeUnit.SECONDS); + System.out.println("done=" + done + " ok=" + ok.get() + ", rejected=" + rejected.get() + ", failed=" + failed.get()); + + exec.shutdownNow(); + + requester.close(CloseMode.GRACEFUL); + server.close(CloseMode.GRACEFUL); + scheduler.shutdownNow(); + } +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/TestH2MultiplexingRequesterMaxRequestsPerConnection.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/TestH2MultiplexingRequesterMaxRequestsPerConnection.java new file mode 100644 index 0000000000..7d9bb191f0 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/bootstrap/TestH2MultiplexingRequesterMaxRequestsPerConnection.java @@ -0,0 +1,229 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.impl.nio.bootstrap; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.nio.AsyncRequestConsumer; +import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; +import org.apache.hc.core5.http.nio.support.BasicRequestConsumer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestH2MultiplexingRequesterMaxRequestsPerConnection { + + @Test + @org.junit.jupiter.api.Timeout(value = 60, unit = TimeUnit.SECONDS) + void testRejectsWhenLimitReached() throws Exception { + final int maxPerConn = 2; + final int totalRequests = 30; + final Timeout timeout = Timeout.ofSeconds(30); + final long serverDelayMillis = 5000; + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setIoThreadCount(1) + .build(); + + final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + final H2Config serverH2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(1) // force backlog + .build(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(serverH2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setCanonicalHostName("127.0.0.1") // avoid 421 + .register("*", new AsyncServerRequestHandler>() { + + @Override + public AsyncRequestConsumer> prepare( + final HttpRequest request, + final EntityDetails entityDetails, + final HttpContext context) { + return new BasicRequestConsumer<>( + entityDetails != null ? new DiscardingEntityConsumer<>() : null); + } + + @Override + public void handle( + final Message message, + final ResponseTrigger responseTrigger, + final HttpContext localContext) { + + final HttpCoreContext context = HttpCoreContext.cast(localContext); + final String path = message.getHead().getPath(); + + final Runnable task = () -> { + try { + responseTrigger.submitResponse( + AsyncResponseBuilder.create(200) + .setEntity("ok\n", ContentType.TEXT_PLAIN) + .build(), + context); + } catch (final Exception ignore) { + // ignore + } + }; + + if ("/warmup".equals(path)) { + task.run(); + } else { + scheduler.schedule(task, serverDelayMillis, TimeUnit.MILLISECONDS); + } + } + + }) + .create(); + + server.start(); + final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get(); + final int port = ((InetSocketAddress) ep.getAddress()).getPort(); + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(H2Config.custom().setPushEnabled(false).build()) + .setMaxRequestsPerConnection(maxPerConn) + .create(); + + + requester.start(); + + try { + final HttpHost target = new HttpHost("http", "127.0.0.1", port); + + // Warmup + requester.execute( + target, + AsyncRequestBuilder.get().setHttpHost(target).setPath("/warmup").build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + null).get(); + + final AtomicInteger ok = new AtomicInteger(0); + final AtomicInteger rejected = new AtomicInteger(0); + final AtomicInteger failed = new AtomicInteger(0); + + final CountDownLatch done = new CountDownLatch(totalRequests); + final CountDownLatch start = new CountDownLatch(1); + final ExecutorService exec = Executors.newFixedThreadPool(16); + + for (int i = 0; i < totalRequests; i++) { + final int id = i; + exec.execute(new Runnable() { + @Override + public void run() { + try { + start.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + done.countDown(); + return; + } + + requester.execute( + target, + AsyncRequestBuilder.get().setHttpHost(target).setPath("/slow?i=" + id).build(), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + timeout, + HttpCoreContext.create(), + new FutureCallback>() { + + @Override + public void completed(final Message message) { + ok.incrementAndGet(); + done.countDown(); + } + + @Override + public void failed(final Exception ex) { + if (ex instanceof RejectedExecutionException) { + rejected.incrementAndGet(); + } else { + failed.incrementAndGet(); + } + done.countDown(); + } + + @Override + public void cancelled() { + failed.incrementAndGet(); + done.countDown(); + } + }); + } + }); + } + + start.countDown(); + + final boolean allDone = done.await(60, TimeUnit.SECONDS); + exec.shutdownNow(); + + Assertions.assertTrue(allDone, "Timed out"); + Assertions.assertEquals(totalRequests, ok.get() + rejected.get() + failed.get()); + Assertions.assertTrue(rejected.get() > 0, "Expected at least one RejectedExecutionException"); + Assertions.assertEquals(0, failed.get(), "Unexpected non-rejection failures: " + failed.get()); + } finally { + requester.close(CloseMode.GRACEFUL); + server.close(CloseMode.GRACEFUL); + scheduler.shutdownNow(); + } + } +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java index d0bd866287..f4d6c58414 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java @@ -408,5 +408,4 @@ public String toString() { final IOSession currentSession = currentSessionRef.get(); return Objects.toString(currentSession != null ? currentSession : ioSession, null); } - } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java index 9cb55fcd43..2b4e5153d3 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java @@ -931,5 +931,4 @@ public String toString() { this.session.getLock().unlock(); } } - } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestIOSessionImplMaxPendingCommands.java b/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestIOSessionImplMaxPendingCommands.java new file mode 100644 index 0000000000..f705a4e64f --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/reactor/TestIOSessionImplMaxPendingCommands.java @@ -0,0 +1,79 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.reactor; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +class TestIOSessionImplMaxPendingCommands { + + private static final class DummyCommand implements Command { + + @Override + public boolean cancel() { + return true; + } + + } + + @Test + @Timeout(5) + void enqueueAndPollShouldWork() throws Exception { + final ServerSocketChannel server = ServerSocketChannel.open(); + server.bind(new InetSocketAddress("127.0.0.1", 0)); + final int port = ((InetSocketAddress) server.getLocalAddress()).getPort(); + + final SocketChannel client = SocketChannel.open(); + client.connect(new InetSocketAddress("127.0.0.1", port)); + final SocketChannel accepted = server.accept(); + + client.configureBlocking(false); + final Selector selector = Selector.open(); + final SelectionKey key = client.register(selector, SelectionKey.OP_READ); + + final IOSessionImpl session = new IOSessionImpl("c", key, client, null); + + try { + session.enqueue(new DummyCommand(), Command.Priority.NORMAL); + assertNotNull(session.poll()); + } finally { + session.close(); + accepted.close(); + server.close(); + selector.close(); + } + } + +}