Skip to content

Commit 9ca5fe1

Browse files
committed
rpc: introduce OncRpcSvcBuilder#withCallInterceptor
Motivation: Sometimes code need to perform an action before RPC request is executed, for example: - logging - validation - DoS protection / rate limitation ``` limiter = RateLimiter.create(500); svc = new OncRpcSvcBuilder() .withServiceName("svc") .withCallInterceptor(c -> limiter.acquire()) .build(); svc.start(); ``` Modification: introduce OncRpcSvcBuilder#withCallInterceptor that injects a RPC call consumer that is called before real work is done. Result: additional actions can be performed before rpc call execution. Acked-by: Lea Morschel Acked-by: Paul Millar Target: master
1 parent 8b927c9 commit 9ca5fe1

File tree

3 files changed

+36
-5
lines changed

3 files changed

+36
-5
lines changed

oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcSvc.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2009 - 2021 Deutsches Elektronen-Synchroton,
2+
* Copyright (c) 2009 - 2022 Deutsches Elektronen-Synchroton,
33
* Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY
44
*
55
* This library is free software; you can redistribute it and/or modify
@@ -67,6 +67,7 @@
6767
import java.util.concurrent.Future;
6868
import java.util.concurrent.TimeUnit;
6969
import java.util.concurrent.TimeoutException;
70+
import java.util.function.Consumer;
7071
import javax.net.ssl.SSLContext;
7172

7273
import static com.google.common.base.Throwables.getRootCause;
@@ -129,6 +130,11 @@ public class OncRpcSvc {
129130
*/
130131
private final String _svcName;
131132

133+
/**
134+
* {@code java.util.function.Consumer} that is called before RPC request executed.
135+
*/
136+
private final Consumer<RpcCall> _callInterceptor;
137+
132138
/**
133139
* Create new RPC service with defined configuration.
134140
* @param builder to build this service
@@ -192,6 +198,7 @@ public class OncRpcSvc {
192198
_sslContext = builder.getSSLContext();
193199
_startTLS = builder.isStartTLS();
194200
_sslParams = builder.getSSLParameters();
201+
_callInterceptor = builder.getCallInterceptor();
195202
}
196203

197204
/**
@@ -346,7 +353,7 @@ public void start() throws IOException {
346353
if (_gssSessionManager != null) {
347354
filterChain.add(new GssProtocolFilter(_gssSessionManager));
348355
}
349-
filterChain.add(new RpcDispatcher(_requestExecutor, _programs, _withSubjectPropagation));
356+
filterChain.add(new RpcDispatcher(_requestExecutor, _programs, _withSubjectPropagation, _callInterceptor));
350357

351358
final FilterChain filters = filterChain.build();
352359

oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/OncRpcSvcBuilder.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2009 - 2019 Deutsches Elektronen-Synchroton,
2+
* Copyright (c) 2009 - 2022 Deutsches Elektronen-Synchroton,
33
* Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY
44
*
55
* This library is free software; you can redistribute it and/or modify
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.ExecutorService;
3030
import java.util.concurrent.Executors;
3131
import java.util.concurrent.ThreadFactory;
32+
import java.util.function.Consumer;
3233
import javax.net.ssl.SSLContext;
3334
import javax.net.ssl.SSLParameters;
3435

@@ -84,6 +85,8 @@ public class OncRpcSvcBuilder {
8485
private SSLParameters _sslParams;
8586
private MemoryAllocator _allocator = MemoryAllocator.DEFAULT;
8687

88+
private Consumer<RpcCall> _callInterceptor = c -> {};
89+
8790
public OncRpcSvcBuilder withAutoPublish() {
8891
_autoPublish = true;
8992
return this;
@@ -282,6 +285,15 @@ public GssSessionManager getGssSessionManager() {
282285
return _gssSessionManager;
283286
}
284287

288+
public OncRpcSvcBuilder withCallInterceptor(Consumer<RpcCall> interceptor) {
289+
_callInterceptor = interceptor;
290+
return this;
291+
}
292+
293+
public Consumer<RpcCall> getCallInterceptor() {
294+
return _callInterceptor;
295+
}
296+
285297
public ExecutorService getWorkerThreadExecutorService() {
286298
if (_ioStrategy == IoStrategy.SAME_THREAD ) {
287299
return MoreExecutors.newDirectExecutorService();

oncrpc4j-core/src/main/java/org/dcache/oncrpc4j/rpc/RpcDispatcher.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2009 - 2018 Deutsches Elektronen-Synchroton,
2+
* Copyright (c) 2009 - 2022 Deutsches Elektronen-Synchroton,
33
* Member of the Helmholtz Association, (DESY), HAMBURG, GERMANY
44
*
55
* This library is free software; you can redistribute it and/or modify
@@ -25,6 +25,7 @@
2525
import java.security.PrivilegedExceptionAction;
2626
import java.util.Map;
2727
import java.util.concurrent.ExecutorService;
28+
import java.util.function.Consumer;
2829

2930
import com.google.common.base.Throwables;
3031
import org.slf4j.Logger;
@@ -54,6 +55,12 @@ public class RpcDispatcher extends BaseFilter {
5455
* from request credentials.
5556
*/
5657
private final boolean _withSubjectPropagation;
58+
59+
/**
60+
* {@code java.util.function.Consumer} that is called before RPC request executed.
61+
*/
62+
private final Consumer<RpcCall> _callInterceptor;
63+
5764
/**
5865
* Create new RPC dispatcher for given program.
5966
*
@@ -62,16 +69,18 @@ public class RpcDispatcher extends BaseFilter {
6269
* with a mapping between program number and program
6370
* handler.
6471
* @param withSubjectPropagation use {@link Subject#doAs} to exacerbate request.
72+
* @param callInterceptor consumer that will be called before the dispatcher performs its real work.
6573
*
6674
* @throws NullPointerException if executor or program is null
6775
*/
6876
public RpcDispatcher(ExecutorService executor, Map<OncRpcProgram,
69-
RpcDispatchable> programs, boolean withSubjectPropagation)
77+
RpcDispatchable> programs, boolean withSubjectPropagation, Consumer<RpcCall> callInterceptor)
7078
throws NullPointerException {
7179

7280
_programs = requireNonNull(programs, "Programs is NULL");
7381
_asyncExecutorService = requireNonNull(executor, "ExecutorService is NULL");
7482
_withSubjectPropagation = withSubjectPropagation;
83+
_callInterceptor = callInterceptor;
7584
}
7685

7786
@Override
@@ -91,6 +100,9 @@ public NextAction handleRead(final FilterChainContext ctx) throws IOException {
91100
_asyncExecutorService.execute(new Runnable() {
92101
@Override
93102
public void run() {
103+
104+
_callInterceptor.accept(call);
105+
94106
try {
95107
if (_withSubjectPropagation) {
96108
Subject subject = call.getCredential().getSubject();

0 commit comments

Comments
 (0)