File tree Expand file tree Collapse file tree 4 files changed +24
-6
lines changed
main/java/org/apache/beam/sdk/io/solace/broker
test/java/org/apache/beam/sdk/io/solace Expand file tree Collapse file tree 4 files changed +24
-6
lines changed Original file line number Diff line number Diff line change @@ -46,6 +46,7 @@ public class BasicAuthJcsmpSessionService extends SessionService {
4646 private final String password ;
4747 private final String vpnName ;
4848 @ Nullable private JCSMPSession jcsmpSession ;
49+ @ Nullable private MessageReceiver messageReceiver ;
4950 private final RetryCallableManager retryCallableManager = RetryCallableManager .create ();
5051
5152 /**
@@ -73,21 +74,25 @@ public void connect() {
7374
7475 @ Override
7576 public void close () {
76- if (isClosed ()) {
77- return ;
78- }
7977 retryCallableManager .retryCallable (
8078 () -> {
81- checkStateNotNull (jcsmpSession ).closeSession ();
79+ if (messageReceiver != null ) {
80+ messageReceiver .close ();
81+ }
82+ if (!isClosed ()) {
83+ checkStateNotNull (jcsmpSession ).closeSession ();
84+ }
8285 return 0 ;
8386 },
8487 ImmutableSet .of (IOException .class ));
8588 }
8689
8790 @ Override
8891 public MessageReceiver createReceiver () {
89- return retryCallableManager .retryCallable (
90- this ::createFlowReceiver , ImmutableSet .of (JCSMPException .class ));
92+ this .messageReceiver =
93+ retryCallableManager .retryCallable (
94+ this ::createFlowReceiver , ImmutableSet .of (JCSMPException .class ));
95+ return this .messageReceiver ;
9196 }
9297
9398 @ Override
Original file line number Diff line number Diff line change @@ -49,6 +49,9 @@ public interface MessageReceiver {
4949 */
5050 BytesXMLMessage receive () throws IOException ;
5151
52+ /** Closes the message receiver. */
53+ void close ();
54+
5255 /**
5356 * Test clients may return {@literal true} to signal that all expected messages have been pulled
5457 * and the test may complete. Real clients should always return {@literal false}.
Original file line number Diff line number Diff line change @@ -69,4 +69,11 @@ public BytesXMLMessage receive() throws IOException {
6969 throw new IOException (e );
7070 }
7171 }
72+
73+ @ Override
74+ public void close () {
75+ if (!isClosed ()) {
76+ this .flowReceiver .close ();
77+ }
78+ }
7279}
Original file line number Diff line number Diff line change @@ -92,6 +92,9 @@ public BytesXMLMessage receive() throws IOException {
9292 return getRecordFn .apply (counter .getAndIncrement ());
9393 }
9494
95+ @ Override
96+ public void close () {}
97+
9598 @ Override
9699 public boolean isEOF () {
97100 return counter .get () >= minMessagesReceived ;
You can’t perform that action at this time.
0 commit comments