Skip to content

Commit a99f92a

Browse files
Fix: Fixed Subscription test to not discard ConcurrentEventStream immediately
1 parent 8840faf commit a99f92a

File tree

2 files changed

+27
-5
lines changed

2 files changed

+27
-5
lines changed

Sources/GraphQL/Subscription/EventStream.swift

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class ConcurrentEventStream<Element>: EventStream<Element> {
3131
extension AsyncThrowingStream {
3232
func mapStream<To>(_ closure: @escaping (Element) throws -> To) -> AsyncThrowingStream<To, Error> {
3333
return AsyncThrowingStream<To, Error> { continuation in
34-
Task {
34+
let task = Task {
3535
do {
3636
for try await event in self {
3737
let newEvent = try closure(event)
@@ -42,12 +42,16 @@ extension AsyncThrowingStream {
4242
continuation.finish(throwing: error)
4343
}
4444
}
45+
46+
continuation.onTermination = { @Sendable reason in
47+
task.cancel()
48+
}
4549
}
4650
}
4751

4852
func filterStream(_ isIncluded: @escaping (Element) throws -> Bool) -> AsyncThrowingStream<Element, Error> {
4953
return AsyncThrowingStream<Element, Error> { continuation in
50-
Task {
54+
let task = Task {
5155
do {
5256
for try await event in self {
5357
if try isIncluded(event) {
@@ -59,6 +63,10 @@ extension AsyncThrowingStream {
5963
continuation.finish(throwing: error)
6064
}
6165
}
66+
67+
continuation.onTermination = { @Sendable _ in
68+
task.cancel()
69+
}
6270
}
6371
}
6472
}

Tests/GraphQLTests/SubscriptionTests/SubscriptionTests.swift

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -605,12 +605,15 @@ class SubscriptionTests : XCTestCase {
605605

606606
var results = [GraphQLResult]()
607607
var expectation = XCTestExpectation()
608-
_ = stream.map { event in
608+
609+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
610+
let keepForNow = stream.map { event in
609611
event.map { result in
610612
results.append(result)
611613
expectation.fulfill()
612614
}
613615
}
616+
614617
var expected = [GraphQLResult]()
615618

616619
db.trigger(email: Email(
@@ -675,6 +678,9 @@ class SubscriptionTests : XCTestCase {
675678
)
676679
wait(for: [expectation], timeout: timeoutDuration)
677680
XCTAssertEqual(results, expected)
681+
682+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
683+
_ = keepForNow
678684
}
679685

680686
/// 'should not trigger when subscription is already done'
@@ -701,7 +707,8 @@ class SubscriptionTests : XCTestCase {
701707

702708
var results = [GraphQLResult]()
703709
var expectation = XCTestExpectation()
704-
_ = stream.map { event in
710+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
711+
let keepForNow = stream.map { event in
705712
event.map { result in
706713
results.append(result)
707714
expectation.fulfill()
@@ -747,6 +754,9 @@ class SubscriptionTests : XCTestCase {
747754
// Ensure that the current result was the one before the db was stopped
748755
wait(for: [expectation], timeout: timeoutDuration)
749756
XCTAssertEqual(results, expected)
757+
758+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
759+
_ = keepForNow
750760
}
751761

752762
/// 'should not trigger when subscription is thrown'
@@ -861,7 +871,8 @@ class SubscriptionTests : XCTestCase {
861871

862872
var results = [GraphQLResult]()
863873
var expectation = XCTestExpectation()
864-
_ = stream.map { event in
874+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
875+
let keepForNow = stream.map { event in
865876
event.map { result in
866877
results.append(result)
867878
expectation.fulfill()
@@ -925,6 +936,9 @@ class SubscriptionTests : XCTestCase {
925936
)
926937
wait(for: [expectation], timeout: timeoutDuration)
927938
XCTAssertEqual(results, expected)
939+
940+
// So that the Task won't immediately be cancelled since the ConcurrentEventStream is discarded
941+
_ = keepForNow
928942
}
929943

930944
/// 'should pass through error thrown in source event stream'

0 commit comments

Comments
 (0)