@@ -18,21 +18,23 @@ package com.expediagroup.graphql.examples.subscriptions
1818
1919import com.expediagroup.graphql.examples.SUBSCRIPTION_ENDPOINT
2020import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage
21+ import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_INIT
2122import com.expediagroup.graphql.spring.model.SubscriptionOperationMessage.ClientMessages.GQL_START
2223import com.expediagroup.graphql.types.GraphQLRequest
2324import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
2425import com.fasterxml.jackson.module.kotlin.readValue
2526import com.fasterxml.jackson.module.kotlin.registerKotlinModule
27+ import java.net.URI
2628import org.junit.jupiter.api.Test
2729import org.springframework.boot.autoconfigure.EnableAutoConfiguration
2830import org.springframework.boot.test.context.SpringBootTest
2931import org.springframework.boot.web.server.LocalServerPort
3032import org.springframework.web.reactive.socket.WebSocketSession
3133import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient
3234import reactor.core.publisher.Mono
35+ import reactor.kotlin.core.publisher.toMono
3336import reactor.test.StepVerifier
3437import reactor.test.publisher.TestPublisher
35- import java.net.URI
3638
3739@SpringBootTest(
3840 webEnvironment = SpringBootTest .WebEnvironment .RANDOM_PORT ,
@@ -93,22 +95,28 @@ class SimpleSubscriptionIT(@LocalServerPort private var port: Int) {
9395 }
9496
9597 private fun subscribe (query : String , id : String ): TestPublisher <String > {
96- val message = toMessage(query, id)
9798 val output = TestPublisher .create<String >()
9899
99100 val client = ReactorNettyWebSocketClient ()
100101 val uri = URI .create(" ws://localhost:$port$SUBSCRIPTION_ENDPOINT " )
101102
102- client.execute(uri) { session -> executeSubscription(session, message, output) }.subscribe()
103+ client.execute(uri) { session -> executeSubscription(session, query, id, output) }.subscribe()
104+
103105 return output
104106 }
105107
106108 private fun executeSubscription (
107109 session : WebSocketSession ,
108- message : String ,
110+ query : String ,
111+ id : String ,
109112 output : TestPublisher <String >
110113 ): Mono <Void > {
111- return session.send(Mono .just(session.textMessage(message)))
114+ val initMessage = getInitMessage(id)
115+ val startMessage = getStartMessage(query, id)
116+ val firstMessage = session.textMessage(initMessage).toMono()
117+ .concatWith(session.textMessage(startMessage).toMono())
118+
119+ return session.send(firstMessage)
112120 .thenMany(
113121 session.receive()
114122 .map { objectMapper.readValue<SubscriptionOperationMessage >(it.payloadAsText) }
@@ -124,9 +132,10 @@ class SimpleSubscriptionIT(@LocalServerPort private var port: Int) {
124132 .then()
125133 }
126134
127- private fun toMessage (query : String , id : String ): String {
135+ private fun SubscriptionOperationMessage.toJson () = objectMapper.writeValueAsString(this )
136+ private fun getInitMessage (id : String ) = SubscriptionOperationMessage (GQL_CONNECTION_INIT .type, id).toJson()
137+ private fun getStartMessage (query : String , id : String ): String {
128138 val request = GraphQLRequest (" subscription { $query }" )
129- val subscriptionOperationMessage = SubscriptionOperationMessage (GQL_START .type, id = id, payload = request)
130- return objectMapper.writeValueAsString(subscriptionOperationMessage)
139+ return SubscriptionOperationMessage (GQL_START .type, id = id, payload = request).toJson()
131140 }
132141}
0 commit comments