Skip to content

Commit e6b31ee

Browse files
jiangpengchengmattisonchao
authored andcommitted
[fix][broker] Invoke custom BrokerInterceptor's onFilter method if it's defined (#23676)
(cherry picked from commit 7f7e12b)
1 parent dae1e7d commit e6b31ee

File tree

4 files changed

+32
-2
lines changed

4 files changed

+32
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.buffer.ByteBuf;
2323
import java.io.IOException;
2424
import java.util.Map;
25+
import javax.servlet.FilterChain;
2526
import javax.servlet.ServletException;
2627
import javax.servlet.ServletRequest;
2728
import javax.servlet.ServletResponse;
@@ -272,6 +273,18 @@ public void initialize(PulsarService pulsarService) throws Exception {
272273
}
273274
}
274275

276+
@Override
277+
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
278+
throws ServletException, IOException {
279+
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();
280+
try {
281+
Thread.currentThread().setContextClassLoader(narClassLoader);
282+
this.interceptor.onFilter(request, response, chain);
283+
} finally {
284+
Thread.currentThread().setContextClassLoader(previousContext);
285+
}
286+
}
287+
275288
@Override
276289
public void close() {
277290
final ClassLoader previousContext = Thread.currentThread().getContextClassLoader();

pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.pulsar.broker.PulsarServerException;
4242
import org.apache.pulsar.broker.PulsarService;
4343
import org.apache.pulsar.broker.ServiceConfiguration;
44+
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
45+
import org.apache.pulsar.broker.intercept.BrokerInterceptors;
4446
import org.apache.pulsar.common.util.PulsarSslConfiguration;
4547
import org.apache.pulsar.common.util.PulsarSslFactory;
4648
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
@@ -258,7 +260,17 @@ private static class FilterInitializer {
258260
// Enable PreInterceptFilter only when interceptors are enabled
259261
filterHolders.add(
260262
new FilterHolder(new PreInterceptFilter(pulsarService.getBrokerInterceptor(), handler)));
261-
filterHolders.add(new FilterHolder(new ProcessHandlerFilter(pulsarService.getBrokerInterceptor())));
263+
// The `ProcessHandlerFilter` is used to overwrite `doFilter` method, which cannot be called multiple
264+
// times inside one `Filter`, so we cannot use one `ProcessHandlerFilter` with a `BrokerInterceptors` to
265+
// hold all interceptors, instead we need to create a `ProcessHandlerFilter` for each `interceptor`.
266+
if (pulsarService.getBrokerInterceptor() instanceof BrokerInterceptors) {
267+
for (BrokerInterceptor interceptor: ((BrokerInterceptors) pulsarService.getBrokerInterceptor())
268+
.getInterceptors().values()) {
269+
filterHolders.add(new FilterHolder(new ProcessHandlerFilter(interceptor)));
270+
}
271+
} else {
272+
filterHolders.add(new FilterHolder(new ProcessHandlerFilter(pulsarService.getBrokerInterceptor())));
273+
}
262274
}
263275

264276
if (config.isAuthenticationEnabled()) {

tests/docker-images/java-test-plugins/src/main/java/org/apache/pulsar/tests/integration/plugins/LoggingBrokerInterceptor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
package org.apache.pulsar.tests.integration.plugins;
2020

2121
import io.netty.buffer.ByteBuf;
22+
import java.io.IOException;
2223
import java.util.Map;
2324
import javax.servlet.FilterChain;
25+
import javax.servlet.ServletException;
2426
import javax.servlet.ServletRequest;
2527
import javax.servlet.ServletResponse;
2628
import org.apache.bookkeeper.mledger.Entry;
@@ -122,7 +124,9 @@ public void txnEnded(String txnID, long txnAction) {
122124
}
123125

124126
@Override
125-
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain) {
127+
public void onFilter(ServletRequest request, ServletResponse response, FilterChain chain)
128+
throws ServletException, IOException {
126129
log.info("onFilter");
130+
chain.doFilter(request, response);
127131
}
128132
}

tests/integration/src/test/java/org/apache/pulsar/tests/integration/plugins/TestBrokerInterceptors.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public void test(Supplier<String> serviceUrlSupplier) throws Exception {
9696
"consumerCreated",
9797
"messageProduced",
9898
"beforeSendMessage: OK",
99+
"onFilter",
99100
}) {
100101
assertTrue(log.contains("LoggingBrokerInterceptor - " + line), "Log did not contain line '" + line + "'");
101102
}

0 commit comments

Comments
 (0)