@@ -84,13 +84,7 @@ class StreamTests {
8484 delay(100 )
8585 }
8686 assertThat(subscriber.messages.map { it.message.data.toString() })
87- .containsExactly(
88- " {chunk=hello}" ,
89- " {chunk=world}" ,
90- " {chunk=this}" ,
91- " {chunk=is}" ,
92- " {chunk=cool}"
93- )
87+ .containsExactly(" hello" , " world" , " this" , " is" , " cool" )
9488 assertThat(subscriber.result).isNotNull()
9589 assertThat(subscriber.result!! .result.data.toString()).isEqualTo(" hello world this is cool" )
9690 assertThat(subscriber.throwable).isNull()
@@ -123,114 +117,102 @@ class StreamTests {
123117 }
124118
125119 assertThat(messages.map { it.message.data.toString() })
126- .containsExactly(
127- " {chunk=hello}" ,
128- " {chunk=world}" ,
129- " {chunk=this}" ,
130- " {chunk=is}" ,
131- " {chunk=cool}"
132- )
120+ .containsExactly(" hello" , " world" , " this" , " is" , " cool" )
133121 assertThat(result).isNotNull()
134122 assertThat(result!! .result.data.toString()).isEqualTo(" hello world this is cool" )
135123 assertThat(throwable).isNull()
136124 assertThat(isComplete).isTrue()
137125 }
138126
139127 @Test
140- fun genStreamError_receivesErrorAndStops () = runBlocking {
141- val input = mapOf (" data" to " Why is the sky blue " )
128+ fun genStreamError_receivesError () = runBlocking {
129+ val input = mapOf (" data" to " test error " )
142130 val function =
143131 functions.getHttpsCallable(" genStreamError" ).withTimeout(2000 , TimeUnit .MILLISECONDS )
144132 val subscriber = StreamSubscriber ()
145133
146134 function.stream(input).subscribe(subscriber)
135+
147136 withTimeout(2000 ) {
148137 while (subscriber.throwable == null ) {
149138 delay(100 )
150139 }
151140 }
152141
153- assertThat(subscriber.messages.map { it.message.data.toString() }).contains(" {chunk=hello}" )
154142 assertThat(subscriber.throwable).isNotNull()
155143 assertThat(subscriber.throwable).isInstanceOf(FirebaseFunctionsException ::class .java)
156- assertThat(subscriber.throwable!! .message).isEqualTo(" {message=INTERNAL, status=INTERNAL}" )
157- assertThat(subscriber.isComplete).isFalse()
158144 }
159145
160146 @Test
161- fun genStreamNoReturn_receivesOnlyMessages () = runBlocking {
162- val input = mapOf (" data" to " Why is the sky blue" )
163- val function = functions.getHttpsCallable(" genStreamNoReturn" )
147+ fun genStreamWeather_receivesWeatherForecasts () = runBlocking {
148+ val inputData = listOf (mapOf (" name" to " Toronto" ), mapOf (" name" to " London" ))
149+ val input = mapOf (" data" to inputData)
150+
151+ val function = functions.getHttpsCallable(" genStreamWeather" )
164152 val subscriber = StreamSubscriber ()
165153
166154 function.stream(input).subscribe(subscriber)
167155
168- withTimeout(2000 ) {
169- while (subscriber.messages.size < 5 ) {
170- delay(100 )
171- }
156+ while (! subscriber.isComplete) {
157+ delay(100 )
172158 }
159+
173160 assertThat(subscriber.messages.map { it.message.data.toString() })
174161 .containsExactly(
175- " {chunk=hello}" ,
176- " {chunk=world}" ,
177- " {chunk=this}" ,
178- " {chunk=is}" ,
179- " {chunk=cool}"
162+ " {temperature=25, location={name=Toronto}, conditions=snowy}" ,
163+ " {temperature=50, location={name=London}, conditions=rainy}"
180164 )
181- assertThat(subscriber.result).isNull()
165+ assertThat(subscriber.result).isNotNull()
166+ assertThat(subscriber.result!! .result.data.toString()).contains(" forecasts" )
182167 assertThat(subscriber.throwable).isNull()
183- assertThat(subscriber.isComplete).isFalse ()
168+ assertThat(subscriber.isComplete).isTrue ()
184169 }
185170
186171 @Test
187- fun genStream_cancelStream_receivesPartialMessagesAndError () = runBlocking {
188- val input = mapOf (" data" to " Why is the sky blue" )
189- val function = functions.getHttpsCallable(" genStreamNoReturn" )
190- val publisher = function.stream(input)
191- val cancelableSubscriber = StreamSubscriber ()
192- publisher.subscribe(cancelableSubscriber)
193- withTimeout(2000 ) {
194- while (cancelableSubscriber.messages.isEmpty()) {
195- delay(50 )
196- }
197- }
198- withTimeout(2000 ) { cancelableSubscriber.subscription.cancel() }
199- withTimeout(1500 ) {
200- while (cancelableSubscriber.throwable == null ) {
201- delay(300 )
202- }
203- }
204- val messagesAsStringList = cancelableSubscriber.messages.map { it.message.data.toString() }
205- assertThat(messagesAsStringList).contains(" {chunk=hello}" )
206- assertThat(messagesAsStringList).doesNotContain(" {chunk=cool}" )
207- assertThat(cancelableSubscriber.throwable).isInstanceOf(FirebaseFunctionsException ::class .java)
208- assertThat(cancelableSubscriber.throwable!! .message!! .uppercase()).contains(" CANCEL" )
209- assertThat(cancelableSubscriber.isComplete).isFalse()
172+ fun genStreamEmpty_receivesNoMessages () = runBlocking {
173+ val function = functions.getHttpsCallable(" genStreamEmpty" )
174+ val subscriber = StreamSubscriber ()
175+
176+ function.stream(mapOf (" data" to " test" )).subscribe(subscriber)
177+
178+ withTimeout(2000 ) { delay(500 ) }
179+ assertThat(subscriber.messages).isEmpty()
180+ assertThat(subscriber.result).isNull()
210181 }
211182
212183 @Test
213- fun genStreamWeather_receivesWeatherForecasts () = runBlocking {
214- val inputData = listOf (mapOf (" name" to " Toronto" ), mapOf (" name" to " London" ))
215- val input = mapOf (" data" to inputData)
216-
217- val function = functions.getHttpsCallable(" genStreamWeather" )
184+ fun genStreamResultOnly_receivesOnlyResult () = runBlocking {
185+ val function = functions.getHttpsCallable(" genStreamResultOnly" )
218186 val subscriber = StreamSubscriber ()
219187
220- function.stream(input ).subscribe(subscriber)
188+ function.stream(mapOf ( " data " to " test " ) ).subscribe(subscriber)
221189
222190 while (! subscriber.isComplete) {
223191 delay(100 )
224192 }
193+ assertThat(subscriber.messages).isEmpty()
194+ assertThat(subscriber.result).isNotNull()
195+ assertThat(subscriber.result!! .result.data.toString()).isEqualTo(" Only a result" )
196+ }
225197
226- assertThat(subscriber.messages.map { it.message.data.toString() })
227- .containsExactly(
228- " {temperature=25, location=Toronto, conditions=snowy}" ,
229- " {temperature=50, location=London, conditions=rainy}"
230- )
198+ @Test
199+ fun genStreamLargeData_receivesMultipleChunks () = runBlocking {
200+ val function = functions.getHttpsCallable(" genStreamLargeData" )
201+ val subscriber = StreamSubscriber ()
202+
203+ function.stream(mapOf (" data" to " test large data" )).subscribe(subscriber)
204+
205+ while (! subscriber.isComplete) {
206+ delay(100 )
207+ }
208+ assertThat(subscriber.messages).isNotEmpty()
209+ assertThat(subscriber.messages.size).isEqualTo(10 )
210+ val receivedString =
211+ subscriber.messages.joinToString(separator = " " ) { it.message.data.toString() }
212+ val expectedString = " A" .repeat(10000 )
213+ assertThat(receivedString.length).isEqualTo(10000 )
214+ assertThat(receivedString).isEqualTo(expectedString)
231215 assertThat(subscriber.result).isNotNull()
232- assertThat(subscriber.result!! .result.data.toString()).contains(" forecasts" )
233- assertThat(subscriber.throwable).isNull()
234- assertThat(subscriber.isComplete).isTrue()
216+ assertThat(subscriber.result!! .result.data.toString()).isEqualTo(" Stream Completed" )
235217 }
236218}
0 commit comments