Skip to content

Commit b6f04ee

Browse files
authored
Add remove pending request method (#81)
Signed-off-by: Ivan Santiago Paunovic <[email protected]>
1 parent f2eeac7 commit b6f04ee

File tree

5 files changed

+78
-12
lines changed

5 files changed

+78
-12
lines changed

rcljava/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ set(${PROJECT_NAME}_sources
143143
"src/main/java/org/ros2/rcljava/action/GoalStatus.java"
144144
"src/main/java/org/ros2/rcljava/client/Client.java"
145145
"src/main/java/org/ros2/rcljava/client/ClientImpl.java"
146+
"src/main/java/org/ros2/rcljava/client/ResponseFuture.java"
146147
"src/main/java/org/ros2/rcljava/concurrent/Callback.java"
147148
"src/main/java/org/ros2/rcljava/concurrent/RCLFuture.java"
148149
"src/main/java/org/ros2/rcljava/contexts/Context.java"

rcljava/src/main/java/org/ros2/rcljava/client/Client.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ public interface Client<T extends ServiceDefinition> extends Disposable {
3030

3131
<U extends MessageDefinition> void handleResponse(RMWRequestId header, U response);
3232

33-
<U extends MessageDefinition, V extends MessageDefinition> Future<V> asyncSendRequest(
33+
<U extends MessageDefinition, V extends MessageDefinition> ResponseFuture<V> asyncSendRequest(
3434
final U request);
3535

36-
<U extends MessageDefinition, V extends MessageDefinition> Future<V> asyncSendRequest(
36+
<U extends MessageDefinition, V extends MessageDefinition> ResponseFuture<V> asyncSendRequest(
3737
final U request, final Consumer<Future<V>> callback);
38+
39+
<V extends MessageDefinition> boolean removePendingRequest(ResponseFuture<V> future);
3840

3941
/**
4042
* Check if the service server is available.

rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import org.ros2.rcljava.RCLJava;
3030
import org.ros2.rcljava.common.JNIUtils;
31-
import org.ros2.rcljava.concurrent.RCLFuture;
3231
import org.ros2.rcljava.consumers.Consumer;
3332
import org.ros2.rcljava.interfaces.MessageDefinition;
3433
import org.ros2.rcljava.interfaces.ServiceDefinition;
@@ -53,7 +52,7 @@ public class ClientImpl<T extends ServiceDefinition> implements Client<T> {
5352
private final WeakReference<Node> nodeReference;
5453
private long handle;
5554
private final String serviceName;
56-
private Map<Long, Map.Entry<Consumer, RCLFuture>> pendingRequests;
55+
private Map<Long, Map.Entry<Consumer, ResponseFuture>> pendingRequests;
5756

5857
private final ServiceDefinition serviceDefinition;
5958

@@ -67,43 +66,52 @@ public ClientImpl(
6766
this.handle = handle;
6867
this.serviceName = serviceName;
6968
this.serviceDefinition = serviceDefinition;
70-
this.pendingRequests = new HashMap<Long, Map.Entry<Consumer, RCLFuture>>();
69+
this.pendingRequests = new HashMap<Long, Map.Entry<Consumer, ResponseFuture>>();
7170
}
7271

7372
public ServiceDefinition getServiceDefinition() {
7473
return this.serviceDefinition;
7574
}
7675

77-
public final <U extends MessageDefinition, V extends MessageDefinition> Future<V>
76+
public final <U extends MessageDefinition, V extends MessageDefinition> ResponseFuture<V>
7877
asyncSendRequest(final U request) {
7978
return asyncSendRequest(request, new Consumer<Future<V>>() {
8079
public void accept(Future<V> input) {}
8180
});
8281
}
8382

84-
public final <U extends MessageDefinition, V extends MessageDefinition> Future<V>
83+
public final <U extends MessageDefinition, V extends MessageDefinition> ResponseFuture<V>
8584
asyncSendRequest(final U request, final Consumer<Future<V>> callback) {
8685
synchronized (pendingRequests) {
8786
long sequenceNumber = nativeSendClientRequest(
8887
handle, request.getFromJavaConverterInstance(),
8988
request.getDestructorInstance(), request);
90-
RCLFuture<V> future = new RCLFuture<V>();
89+
ResponseFuture<V> future = new ResponseFuture<V>(sequenceNumber);
9190

92-
Map.Entry<Consumer, RCLFuture> entry =
93-
new AbstractMap.SimpleEntry<Consumer, RCLFuture>(callback, future);
91+
Map.Entry<Consumer, ResponseFuture> entry =
92+
new AbstractMap.SimpleEntry<Consumer, ResponseFuture>(callback, future);
9493
pendingRequests.put(sequenceNumber, entry);
9594
return future;
9695
}
9796
}
9897

98+
public final <V extends MessageDefinition> boolean
99+
removePendingRequest(ResponseFuture<V> future) {
100+
synchronized (pendingRequests) {
101+
Map.Entry<Consumer, ResponseFuture> entry = pendingRequests.remove(
102+
future.getRequestSequenceNumber());
103+
return entry != null;
104+
}
105+
}
106+
99107
public final <U extends MessageDefinition> void handleResponse(
100108
final RMWRequestId header, final U response) {
101109
synchronized (pendingRequests) {
102110
long sequenceNumber = header.sequenceNumber;
103-
Map.Entry<Consumer, RCLFuture> entry = pendingRequests.remove(sequenceNumber);
111+
Map.Entry<Consumer, ResponseFuture> entry = pendingRequests.remove(sequenceNumber);
104112
if (entry != null) {
105113
Consumer<Future> callback = entry.getKey();
106-
RCLFuture<U> future = entry.getValue();
114+
ResponseFuture<U> future = entry.getValue();
107115
future.set(response);
108116
callback.accept(future);
109117
return;
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/* Copyright 2021 Open Source Robotics Foundation, Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package org.ros2.rcljava.client;
17+
18+
import org.ros2.rcljava.concurrent.RCLFuture;
19+
20+
21+
public class ResponseFuture<V> extends RCLFuture<V> {
22+
public ResponseFuture(long requestSequenceNumber) {
23+
this.requestSequenceNumber = requestSequenceNumber;
24+
}
25+
public long getRequestSequenceNumber() {
26+
return this.requestSequenceNumber;
27+
}
28+
private long requestSequenceNumber;
29+
}

rcljava/src/test/java/org/ros2/rcljava/client/ClientTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.ros2.rcljava.client;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
1920
import static org.junit.Assert.assertNotEquals;
2021
import static org.junit.Assert.assertTrue;
2122

@@ -131,4 +132,29 @@ public final void testAdd() throws Exception {
131132
service.dispose();
132133
assertEquals(0, service.getHandle());
133134
}
135+
136+
@Test
137+
public final void testRemovePendingRequest() throws Exception {
138+
RCLFuture<rcljava.srv.AddTwoInts_Response> consumerFuture =
139+
new RCLFuture<rcljava.srv.AddTwoInts_Response>();
140+
141+
TestClientConsumer clientConsumer = new TestClientConsumer(consumerFuture);
142+
143+
Service<rcljava.srv.AddTwoInts> service = node.<rcljava.srv.AddTwoInts>createService(
144+
rcljava.srv.AddTwoInts.class, "add_two_ints", clientConsumer);
145+
146+
rcljava.srv.AddTwoInts_Request request = new rcljava.srv.AddTwoInts_Request();
147+
request.setA(2);
148+
request.setB(3);
149+
150+
Client<rcljava.srv.AddTwoInts> client =
151+
node.<rcljava.srv.AddTwoInts>createClient(rcljava.srv.AddTwoInts.class, "add_two_ints");
152+
153+
assertTrue(client.waitForService(Duration.ofSeconds(10)));
154+
155+
ResponseFuture<rcljava.srv.AddTwoInts_Response> responseFuture = client.asyncSendRequest(request);
156+
157+
assertTrue(client.removePendingRequest(responseFuture));
158+
assertFalse(client.removePendingRequest(responseFuture));
159+
}
134160
}

0 commit comments

Comments
 (0)