@@ -50,6 +50,8 @@ import kotlin.random.Random
5050class SimpleSubscriptionIT (@LocalServerPort private var port : Int ) {
5151
5252 private val objectMapper = jacksonObjectMapper()
53+ private val client = ReactorNettyWebSocketClient ()
54+ private val uri: URI = URI .create(" ws://localhost:$port$SUBSCRIPTION_ENDPOINT " )
5355
5456 @Test
5557 fun `verify singleValueSubscription query` () {
@@ -129,9 +131,6 @@ class SimpleSubscriptionIT(@LocalServerPort private var port: Int) {
129131 private fun subscribe (query : String , initPayload : Any? = null): TestPublisher <String > {
130132 val output = TestPublisher .create<String >()
131133
132- val client = ReactorNettyWebSocketClient ()
133- val uri = URI .create(" ws://localhost:$port$SUBSCRIPTION_ENDPOINT " )
134-
135134 client.execute(uri) { session -> executeSubscription(session, initPayload, query, output) }.subscribe()
136135
137136 return output
@@ -148,28 +147,26 @@ class SimpleSubscriptionIT(@LocalServerPort private var port: Int) {
148147 val startMessage = getStartMessage(query, id)
149148
150149 return session.send(Flux .just(session.textMessage(initMessage)))
151- .then(
152- session.send(Flux .just(session.textMessage(startMessage)))
153- .thenMany(
154- session.receive()
155- .map { objectMapper.readValue<SubscriptionOperationMessage >(it.payloadAsText) }
156- .doOnNext {
157- if (it.type == ServerMessages .GQL_DATA .type) {
158- val data = objectMapper.writeValueAsString(it.payload)
159- output.next(data)
160- } else if (it.type == ServerMessages .GQL_COMPLETE .type) {
161- output.complete()
162- }
163- }
164- )
165- .doOnError {
166- output.error(it)
167- }
168- .doOnComplete {
169- output.complete()
150+ .then(session.send(Flux .just(session.textMessage(startMessage))))
151+ .thenMany(
152+ session.receive()
153+ .map { objectMapper.readValue<SubscriptionOperationMessage >(it.payloadAsText) }
154+ .doOnNext {
155+ if (it.type == ServerMessages .GQL_DATA .type) {
156+ val data = objectMapper.writeValueAsString(it.payload)
157+ output.next(data)
158+ } else if (it.type == ServerMessages .GQL_COMPLETE .type) {
159+ output.complete()
160+ }
170161 }
171- .then()
172162 )
163+ .doOnError {
164+ output.error(it)
165+ }
166+ .doOnComplete {
167+ output.complete()
168+ }
169+ .then()
173170 }
174171
175172 private fun SubscriptionOperationMessage.toJson () = objectMapper.writeValueAsString(this )
0 commit comments