Skip to content

Commit 4606f25

Browse files
committed
DuplexConnection exposes remoteAddress()
Closes gh-735 Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 6959390 commit 4606f25

File tree

14 files changed

+143
-17
lines changed

14 files changed

+143
-17
lines changed

rsocket-core/src/main/java/io/rsocket/DuplexConnection.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.ByteBufAllocator;
21+
import java.net.SocketAddress;
2122
import java.nio.channels.ClosedChannelException;
2223
import org.reactivestreams.Publisher;
2324
import org.reactivestreams.Subscriber;
@@ -86,6 +87,17 @@ default Mono<Void> sendOne(ByteBuf frame) {
8687
*/
8788
ByteBufAllocator alloc();
8889

90+
/**
91+
* Return the remote address that this connection is connected to. The returned {@link
92+
* SocketAddress} varies by transport type and should be downcast to obtain more detailed
93+
* information. For TCP and WebSocket, the address type is {@link java.net.InetSocketAddress}. For
94+
* local transport, it is {@link io.rsocket.transport.local.LocalSocketAddress}.
95+
*
96+
* @return the address
97+
* @since 1.1
98+
*/
99+
SocketAddress remoteAddress();
100+
89101
@Override
90102
default double availability() {
91103
return isDisposed() ? 0.0 : 1.0;

rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.rsocket.frame.FrameUtil;
2525
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
2626
import io.rsocket.plugins.InitializingInterceptorRegistry;
27+
import java.net.SocketAddress;
2728
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2829
import org.reactivestreams.Publisher;
2930
import org.reactivestreams.Subscription;
@@ -372,6 +373,11 @@ public ByteBufAllocator alloc() {
372373
return source.alloc();
373374
}
374375

376+
@Override
377+
public SocketAddress remoteAddress() {
378+
return source.remoteAddress();
379+
}
380+
375381
@Override
376382
public void dispose() {
377383
source.dispose();

rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.rsocket.frame.FrameUtil;
2525
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
2626
import io.rsocket.plugins.InitializingInterceptorRegistry;
27+
import java.net.SocketAddress;
2728
import org.reactivestreams.Publisher;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
@@ -221,6 +222,11 @@ public ByteBufAllocator alloc() {
221222
return source.alloc();
222223
}
223224

225+
@Override
226+
public SocketAddress remoteAddress() {
227+
return source.remoteAddress();
228+
}
229+
224230
@Override
225231
public void dispose() {
226232
source.dispose();

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import io.rsocket.Closeable;
2222
import io.rsocket.DuplexConnection;
2323
import io.rsocket.frame.FrameHeaderCodec;
24+
import java.net.SocketAddress;
2425
import java.nio.channels.ClosedChannelException;
2526
import java.time.Duration;
2627
import java.util.Queue;
@@ -111,6 +112,11 @@ public ByteBufAllocator alloc() {
111112
return curConnection.alloc();
112113
}
113114

115+
@Override
116+
public SocketAddress remoteAddress() {
117+
return curConnection.remoteAddress();
118+
}
119+
114120
public void disconnect() {
115121
DuplexConnection c = this.curConnection;
116122
if (c != null) {

rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.rsocket.DuplexConnection;
22+
import java.net.SocketAddress;
2223
import org.reactivestreams.Publisher;
2324
import org.reactivestreams.Subscription;
2425
import reactor.core.CoreSubscriber;
@@ -93,6 +94,11 @@ public ByteBufAllocator alloc() {
9394
return allocator;
9495
}
9596

97+
@Override
98+
public SocketAddress remoteAddress() {
99+
return new TestLocalSocketAddress(name);
100+
}
101+
96102
@Override
97103
public void dispose() {
98104
onClose.onComplete();

rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.rsocket.DuplexConnection;
22+
import java.net.SocketAddress;
2223
import java.util.concurrent.BlockingQueue;
2324
import java.util.concurrent.LinkedBlockingQueue;
2425
import org.reactivestreams.Publisher;
@@ -111,6 +112,11 @@ public ByteBufAllocator alloc() {
111112
return allocator;
112113
}
113114

115+
@Override
116+
public SocketAddress remoteAddress() {
117+
return new TestLocalSocketAddress("TestDuplexConnection");
118+
}
119+
114120
@Override
115121
public double availability() {
116122
return availability;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.test.util;
17+
18+
import java.net.SocketAddress;
19+
import java.util.Objects;
20+
21+
public final class TestLocalSocketAddress extends SocketAddress {
22+
23+
private static final long serialVersionUID = 2608695156052100164L;
24+
25+
private final String name;
26+
27+
/**
28+
* Creates a new instance.
29+
*
30+
* @param name the name representing the address
31+
* @throws NullPointerException if {@code name} is {@code null}
32+
*/
33+
public TestLocalSocketAddress(String name) {
34+
this.name = Objects.requireNonNull(name, "name must not be null");
35+
}
36+
37+
/** Return the name for this connection. */
38+
public String getName() {
39+
return name;
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return "[local address] " + name;
45+
}
46+
}

rsocket-micrometer/src/main/java/io/rsocket/micrometer/MicrometerDuplexConnection.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import io.rsocket.frame.FrameHeaderCodec;
2626
import io.rsocket.frame.FrameType;
2727
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
28+
import java.net.SocketAddress;
2829
import java.util.Objects;
2930
import java.util.function.Consumer;
3031
import org.reactivestreams.Publisher;
@@ -88,6 +89,11 @@ public ByteBufAllocator alloc() {
8889
return delegate.alloc();
8990
}
9091

92+
@Override
93+
public SocketAddress remoteAddress() {
94+
return delegate.remoteAddress();
95+
}
96+
9197
@Override
9298
public void dispose() {
9399
delegate.dispose();

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.rsocket.util.ByteBufPayload;
3434
import java.io.BufferedReader;
3535
import java.io.InputStreamReader;
36+
import java.net.SocketAddress;
3637
import java.time.Duration;
3738
import java.util.concurrent.CancellationException;
3839
import java.util.concurrent.Executors;
@@ -568,6 +569,11 @@ public ByteBufAllocator alloc() {
568569
return duplexConnection.alloc();
569570
}
570571

572+
@Override
573+
public SocketAddress remoteAddress() {
574+
return duplexConnection.remoteAddress();
575+
}
576+
571577
@Override
572578
public Mono<Void> onClose() {
573579
return duplexConnection.onClose();

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,13 @@ public Mono<DuplexConnection> connect() {
8282
UnboundedProcessor<ByteBuf> out = new UnboundedProcessor<>();
8383
MonoProcessor<Void> closeNotifier = MonoProcessor.create();
8484

85-
server.apply(new LocalDuplexConnection(allocator, out, in, closeNotifier)).subscribe();
85+
server
86+
.apply(new LocalDuplexConnection(name, allocator, out, in, closeNotifier))
87+
.subscribe();
8688

8789
return Mono.just(
88-
(DuplexConnection) new LocalDuplexConnection(allocator, in, out, closeNotifier));
90+
(DuplexConnection)
91+
new LocalDuplexConnection(name, allocator, in, out, closeNotifier));
8992
});
9093
}
9194
}

0 commit comments

Comments
 (0)