Skip to content

Commit f1d9ece

Browse files
author
Jay Herron
committed
Adds wrapper class for Observables, generalizes
1 parent 5f18287 commit f1d9ece

File tree

4 files changed

+68
-44
lines changed

4 files changed

+68
-44
lines changed

Sources/GraphQL/GraphQL.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ public struct GraphQLResult : Equatable, Codable, CustomStringConvertible {
4242

4343
/// SubscriptionResult wraps the observable and error data returned by the subscribe request.
4444
public struct SubscriptionResult {
45-
public let observable: SubscriptionObservable?
45+
public let stream: SubscriptionEventStream?
4646
public let errors: [GraphQLError]
4747

48-
public init(observable: SubscriptionObservable? = nil, errors: [GraphQLError] = []) {
49-
self.observable = observable
48+
public init(stream: SubscriptionEventStream? = nil, errors: [GraphQLError] = []) {
49+
self.stream = stream
5050
self.errors = errors
5151
}
5252
}
5353
/// SubscriptionObservable represents an event stream of fully resolved GraphQL subscription results. Subscribers can be added to this stream.
54-
public typealias SubscriptionObservable = Observable<Future<GraphQLResult>>
54+
public typealias SubscriptionEventStream = EventStream<Future<GraphQLResult>>
5555

5656
/// This is the primary entry point function for fulfilling GraphQL operations
5757
/// by parsing, validating, and executing a GraphQL document along side a
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) 2021 PassiveLogic, Inc.
2+
3+
import RxSwift
4+
5+
public class EventStream<Element> {
6+
/// This class should be overridden
7+
func transform<To>(_ closure: @escaping (Element) throws -> To) -> EventStream<To> {
8+
fatalError("This function should be overridden by implementing classes")
9+
}
10+
}
11+
12+
//extension Observable: EventStream<Element> {
13+
// func transform<To>(_ closure: @escaping (Element) throws -> To) -> EventStream<To> {
14+
// return self.map(closure)
15+
// }
16+
//}
17+
18+
public class ObservableEventStream<Element> : EventStream<Element> {
19+
var observable: Observable<Element>
20+
init(observable: Observable<Element>) {
21+
self.observable = observable
22+
}
23+
override func transform<To>(_ closure: @escaping (Element) throws -> To) -> EventStream<To> {
24+
return ObservableEventStream<To>(observable: observable.map(closure))
25+
}
26+
}

Sources/GraphQL/Subscription/Subscribe.swift

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func subscribe(
5252
)
5353

5454
return sourceFuture.map{ sourceResult -> SubscriptionResult in
55-
if let sourceObservable = sourceResult.observable {
56-
let subscriptionObservable = sourceObservable.map { eventPayload -> Future<GraphQLResult> in
55+
if let sourceStream = sourceResult.stream {
56+
let subscriptionStream = sourceStream.transform { eventPayload -> Future<GraphQLResult> in
5757

5858
// For each payload yielded from a subscription, map it over the normal
5959
// GraphQL `execute` function, with `payload` as the rootValue.
@@ -75,7 +75,7 @@ func subscribe(
7575
operationName: operationName
7676
)
7777
}
78-
return SubscriptionResult(observable: subscriptionObservable, errors: sourceResult.errors)
78+
return SubscriptionResult(stream: subscriptionStream, errors: sourceResult.errors)
7979
} else {
8080
return SubscriptionResult(errors: sourceResult.errors)
8181
}
@@ -251,8 +251,8 @@ func executeSubscription(
251251
return SourceEventStreamResult(errors: context.errors)
252252
} else if let error = resolved as? GraphQLError {
253253
return SourceEventStreamResult(errors: [error])
254-
} else if let observable = resolved as? SourceEventObservable {
255-
return SourceEventStreamResult(observable: observable)
254+
} else if let stream = resolved as? EventStream<Any> {
255+
return SourceEventStreamResult(stream: stream)
256256
} else if resolved == nil {
257257
return SourceEventStreamResult(errors: [
258258
GraphQLError(message: "Resolved subscription was nil")
@@ -261,23 +261,21 @@ func executeSubscription(
261261
let resolvedObj = resolved as AnyObject
262262
return SourceEventStreamResult(errors: [
263263
GraphQLError(
264-
message: "Subscription field resolver must return SourceEventObservable. Received: '\(resolvedObj)'"
264+
message: "Subscription field resolver must return EventStream<Any>. Received: '\(resolvedObj)'"
265265
)
266266
])
267267
}
268268
}
269269
}
270270

271+
// Subscription resolvers MUST return observables that are declared as 'Any' due to Swift not having covariant generic support for type
272+
// checking. Normal resolvers for subscription fields should handle type casting, same as resolvers for query fields.
271273
struct SourceEventStreamResult {
272-
public let observable: SourceEventObservable?
274+
public let stream: EventStream<Any>?
273275
public let errors: [GraphQLError]
274276

275-
public init(observable: SourceEventObservable? = nil, errors: [GraphQLError] = []) {
276-
self.observable = observable
277+
public init(stream: EventStream<Any>? = nil, errors: [GraphQLError] = []) {
278+
self.stream = stream
277279
self.errors = errors
278280
}
279281
}
280-
281-
// Subscription resolvers MUST return observables that are declared as 'Any' due to Swift not having covariant generic support for type
282-
// checking. Normal resolvers for subscription fields should handle type casting, same as resolvers for query fields.
283-
typealias SourceEventObservable = Observable<Any>

Tests/GraphQLTests/Subscription/SubscriptionTests.swift

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ class SubscriptionTests : XCTestCase {
3232
request: query,
3333
eventLoopGroup: eventLoopGroup
3434
).wait()
35-
36-
let observable = subscriptionResult.observable!
35+
print(subscriptionResult)
36+
let subscription = subscriptionResult.stream! as! ObservableEventStream<Future<GraphQLResult>>
3737

3838
var currentResult = GraphQLResult()
39-
let _ = observable.subscribe { event in
39+
let _ = subscription.observable.subscribe { event in
4040
currentResult = try! event.element!.wait()
4141
}.disposed(by: db.disposeBag)
4242

@@ -123,10 +123,10 @@ class SubscriptionTests : XCTestCase {
123123
}
124124
}
125125
}
126-
""")
126+
""") as! ObservableEventStream<Future<GraphQLResult>>
127127

128128
var currentResult = GraphQLResult()
129-
let _ = subscription.subscribe { event in
129+
let _ = subscription.observable.subscribe { event in
130130
currentResult = try! event.element!.wait()
131131
}.disposed(by: db.disposeBag)
132132

@@ -193,9 +193,9 @@ class SubscriptionTests : XCTestCase {
193193
importantEmail
194194
notImportantEmail
195195
}
196-
""")
196+
""") as! ObservableEventStream<Future<GraphQLResult>>
197197

198-
let _ = subscription.subscribe{ event in
198+
let _ = subscription.observable.subscribe{ event in
199199
let _ = try! event.element!.wait()
200200
}.disposed(by: db.disposeBag)
201201
db.trigger(email: Email(
@@ -261,7 +261,7 @@ class SubscriptionTests : XCTestCase {
261261
let graphQLError = error as! GraphQLError
262262
XCTAssertEqual(
263263
graphQLError.message,
264-
"Subscription field resolver must return SourceEventObservable. Received: 'test'"
264+
"Subscription field resolver must return EventStream<Any>. Received: 'test'"
265265
)
266266
}
267267
}
@@ -360,10 +360,10 @@ class SubscriptionTests : XCTestCase {
360360
}
361361
}
362362
}
363-
""")
363+
""") as! ObservableEventStream<Future<GraphQLResult>>
364364

365365
var currentResult = GraphQLResult()
366-
let _ = subscription.subscribe { event in
366+
let _ = subscription.observable.subscribe { event in
367367
currentResult = try! event.element!.wait()
368368
}.disposed(by: db.disposeBag)
369369

@@ -403,17 +403,17 @@ class SubscriptionTests : XCTestCase {
403403
}
404404
}
405405
}
406-
""")
406+
""") as! ObservableEventStream<Future<GraphQLResult>>
407407

408408
// Subscription 1
409409
var sub1Value = GraphQLResult()
410-
let _ = subscription.subscribe { event in
410+
let _ = subscription.observable.subscribe { event in
411411
sub1Value = try! event.element!.wait()
412412
}.disposed(by: db.disposeBag)
413413

414414
// Subscription 2
415415
var sub2Value = GraphQLResult()
416-
let _ = subscription.subscribe { event in
416+
let _ = subscription.observable.subscribe { event in
417417
sub2Value = try! event.element!.wait()
418418
}.disposed(by: db.disposeBag)
419419

@@ -457,10 +457,10 @@ class SubscriptionTests : XCTestCase {
457457
}
458458
}
459459
}
460-
""")
460+
""") as! ObservableEventStream<Future<GraphQLResult>>
461461

462462
var currentResult = GraphQLResult()
463-
let _ = subscription.subscribe { event in
463+
let _ = subscription.observable.subscribe { event in
464464
currentResult = try! event.element!.wait()
465465
print(currentResult)
466466
}.disposed(by: db.disposeBag)
@@ -521,10 +521,10 @@ class SubscriptionTests : XCTestCase {
521521
}
522522
}
523523
}
524-
""")
524+
""") as! ObservableEventStream<Future<GraphQLResult>>
525525

526526
var currentResult = GraphQLResult()
527-
let _ = subscription.subscribe { event in
527+
let _ = subscription.observable.subscribe { event in
528528
currentResult = try! event.element!.wait()
529529
}.disposed(by: db.disposeBag)
530530

@@ -597,10 +597,10 @@ class SubscriptionTests : XCTestCase {
597597
}
598598
}
599599
}
600-
""")
600+
""") as! ObservableEventStream<Future<GraphQLResult>>
601601

602602
var currentResult = GraphQLResult()
603-
let subscriber = subscription.subscribe { event in
603+
let subscriber = subscription.observable.subscribe { event in
604604
currentResult = try! event.element!.wait()
605605
}
606606

@@ -671,10 +671,10 @@ class SubscriptionTests : XCTestCase {
671671
}
672672
}
673673
}
674-
""")
674+
""") as! ObservableEventStream<Future<GraphQLResult>>
675675

676676
var currentResult = GraphQLResult()
677-
let _ = subscription.subscribe { event in
677+
let _ = subscription.observable.subscribe { event in
678678
currentResult = try! event.element!.wait()
679679
}.disposed(by: db.disposeBag)
680680

@@ -744,10 +744,10 @@ class SubscriptionTests : XCTestCase {
744744
}
745745
}
746746
}
747-
""")
747+
""") as! ObservableEventStream<Future<GraphQLResult>>
748748

749749
var currentResult = GraphQLResult()
750-
let _ = subscription.subscribe { event in
750+
let _ = subscription.observable.subscribe { event in
751751
currentResult = try! event.element!.wait()
752752
}.disposed(by: db.disposeBag)
753753

@@ -905,7 +905,7 @@ class EmailDb {
905905
func subscription (
906906
query:String,
907907
variableValues: [String: Map] = [:]
908-
) throws -> SubscriptionObservable {
908+
) throws -> SubscriptionEventStream {
909909
return try createSubscription(schema: defaultSchema(), query: query, variableValues: variableValues)
910910
}
911911
}
@@ -937,7 +937,7 @@ private func createSubscription(
937937
schema: GraphQLSchema,
938938
query: String,
939939
variableValues: [String: Map] = [:]
940-
) throws -> SubscriptionObservable {
940+
) throws -> SubscriptionEventStream {
941941
let document = try parse(source: query)
942942
let result = try subscribe(
943943
queryStrategy: SerialFieldExecutionStrategy(),
@@ -953,8 +953,8 @@ private func createSubscription(
953953
operationName: nil
954954
).wait()
955955

956-
if let observable = result.observable {
957-
return observable
956+
if let stream = result.stream {
957+
return stream
958958
} else {
959959
throw result.errors.first! // We may have more than one...
960960
}

0 commit comments

Comments
 (0)