1414package io .streamnative .pulsar .handlers .mqtt .proxy .web ;
1515
1616import io .prometheus .client .CollectorRegistry ;
17- import io .prometheus .client .jetty .JettyStatisticsCollector ;
1817import io .streamnative .pulsar .handlers .mqtt .common .MQTTCommonConfiguration ;
1918import io .streamnative .pulsar .handlers .mqtt .proxy .MQTTProxyService ;
2019import io .streamnative .pulsar .handlers .mqtt .proxy .impl .MQTTProxyException ;
3231import org .apache .pulsar .broker .web .UnrecognizedPropertyExceptionMapper ;
3332import org .apache .pulsar .broker .web .WebExecutorThreadPool ;
3433import org .apache .pulsar .common .util .PulsarSslFactory ;
34+ import org .apache .pulsar .jetty .metrics .JettyStatisticsCollector ;
35+ import org .eclipse .jetty .ee8 .servlet .ServletContextHandler ;
36+ import org .eclipse .jetty .ee8 .servlet .ServletHolder ;
3537import org .eclipse .jetty .server .ConnectionFactory ;
36- import org .eclipse .jetty .server .ConnectionLimit ;
3738import org .eclipse .jetty .server .ForwardedRequestCustomizer ;
3839import org .eclipse .jetty .server .Handler ;
3940import org .eclipse .jetty .server .HttpConfiguration ;
4041import org .eclipse .jetty .server .HttpConnectionFactory ;
42+ import org .eclipse .jetty .server .NetworkConnectionLimit ;
4143import org .eclipse .jetty .server .ProxyConnectionFactory ;
42- import org .eclipse .jetty .server .RequestLog ;
4344import org .eclipse .jetty .server .Server ;
4445import org .eclipse .jetty .server .ServerConnector ;
4546import org .eclipse .jetty .server .handler .ContextHandler ;
4647import org .eclipse .jetty .server .handler .ContextHandlerCollection ;
4748import org .eclipse .jetty .server .handler .DefaultHandler ;
48- import org .eclipse .jetty .server .handler .HandlerCollection ;
49- import org .eclipse .jetty .server .handler .RequestLogHandler ;
49+ import org .eclipse .jetty .server .handler .QoSHandler ;
5050import org .eclipse .jetty .server .handler .ResourceHandler ;
5151import org .eclipse .jetty .server .handler .StatisticsHandler ;
52- import org .eclipse .jetty .servlet .ServletContextHandler ;
53- import org .eclipse .jetty .servlet .ServletHolder ;
54- import org .eclipse .jetty .util .resource .Resource ;
52+ import org .eclipse .jetty .util .resource .ResourceFactory ;
5553import org .glassfish .jersey .media .multipart .MultiPartFeature ;
5654import org .glassfish .jersey .server .ResourceConfig ;
5755import org .glassfish .jersey .servlet .ServletContainer ;
@@ -99,7 +97,7 @@ public WebService(MQTTProxyService proxyService) {
9997 config .getHttpServerThreadPoolQueueSize ());
10098 this .server = new Server (webServiceExecutor );
10199 if (config .getMaxHttpServerConnections () > 0 ) {
102- server .addBean (new ConnectionLimit (config .getMaxHttpServerConnections (), server ));
100+ server .addBean (new NetworkConnectionLimit (config .getMaxHttpServerConnections (), server ));
103101 }
104102 List <ServerConnector > connectors = new ArrayList <>();
105103
@@ -176,14 +174,15 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require
176174 if (attributeMap != null ) {
177175 attributeMap .forEach (servletContextHandler ::setAttribute );
178176 }
179- handlers .add (servletContextHandler );
177+ handlers .add (servletContextHandler . get () );
180178 }
181179
182180 public void addStaticResources (String basePath , String resourcePath ) {
183181 ContextHandler capHandler = new ContextHandler ();
184182 capHandler .setContextPath (basePath );
185183 ResourceHandler resHandler = new ResourceHandler ();
186- resHandler .setBaseResource (Resource .newClassPathResource (resourcePath ));
184+ ResourceFactory resourceFactory = ResourceFactory .root ();
185+ resHandler .setBaseResource (resourceFactory .newClassLoaderResource (resourcePath , true ));
187186 resHandler .setEtags (true );
188187 resHandler .setCacheControl (WebService .HANDLER_CACHE_CONTROL );
189188 capHandler .setHandler (resHandler );
@@ -192,19 +191,15 @@ public void addStaticResources(String basePath, String resourcePath) {
192191
193192 public void start () throws MQTTProxyException {
194193 try {
195- RequestLogHandler requestLogHandler = new RequestLogHandler ();
196- RequestLog requestLogger = JettyRequestLogFactory .createRequestLogger (false , server );
197- requestLogHandler .setRequestLog (requestLogger );
198- handlers .add (0 , new ContextHandlerCollection ());
199- handlers .add (requestLogHandler );
194+ server .setRequestLog (JettyRequestLogFactory .createRequestLogger (false , server ));
200195
201196 ContextHandlerCollection contexts = new ContextHandlerCollection ();
202- contexts .setHandlers (handlers . toArray ( new Handler [ handlers . size ()]) );
197+ contexts .setHandlers (handlers );
203198
204199 Handler handlerForContexts = GzipHandlerUtil .wrapWithGzipHandler (contexts ,
205200 config .getHttpServerGzipCompressionExcludedPaths ());
206- HandlerCollection handlerCollection = new HandlerCollection ();
207- handlerCollection .setHandlers (new Handler [] { handlerForContexts , new DefaultHandler (), requestLogHandler } );
201+ Handler . Collection handlerCollection = new Handler . Sequence ();
202+ handlerCollection .setHandlers (handlerForContexts , new DefaultHandler ());
208203
209204 // Metrics handler
210205 StatisticsHandler stats = new StatisticsHandler ();
@@ -216,7 +211,14 @@ public void start() throws MQTTProxyException {
216211 // Already registered. Eg: in unit tests
217212 }
218213
219- server .setHandler (stats );
214+ Handler serverHandler = stats ;
215+ if (config .getMaxConcurrentHttpRequests () > 0 ) {
216+ QoSHandler qoSHandler = new QoSHandler (serverHandler );
217+ qoSHandler .setMaxRequestCount (config .getMaxConcurrentHttpRequests ());
218+ serverHandler = qoSHandler ;
219+ }
220+ server .setHandler (serverHandler );
221+
220222 server .start ();
221223
222224 if (httpConnector != null ) {
0 commit comments