Skip to content

Commit 9af5502

Browse files
authored
Merge pull request #5374 from 1kuko3/feat/ably-link-unsubscribe
Support .unsubscribe in AblyLink
2 parents 5b26f34 + fe33846 commit 9af5502

File tree

2 files changed

+173
-39
lines changed

2 files changed

+173
-39
lines changed

javascript_client/src/subscriptions/AblyLink.ts

Lines changed: 125 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,26 @@
3636
// // Do something with `data` and/or `errors`
3737
// }})
3838
//
39-
import { ApolloLink, Observable, FetchResult, NextLink, Operation } from "@apollo/client/core"
40-
import { Realtime } from "ably"
39+
import {
40+
ApolloLink,
41+
Observable,
42+
FetchResult,
43+
NextLink,
44+
Operation,
45+
Observer
46+
} from "@apollo/client/core"
47+
import { Realtime, Types } from "ably"
4148

42-
type RequestResult = Observable<FetchResult<{ [key: string]: any; }, Record<string, any>, Record<string, any>>>
49+
type RequestResult = FetchResult<
50+
{ [key: string]: any },
51+
Record<string, any>,
52+
Record<string, any>
53+
>
54+
55+
type Subscription = {
56+
closed: boolean
57+
unsubscribe(): void
58+
}
4359

4460
class AblyLink extends ApolloLink {
4561
ably: Realtime
@@ -50,24 +66,82 @@ class AblyLink extends ApolloLink {
5066
this.ably = options.ably
5167
}
5268

53-
request(operation: Operation, forward: NextLink): RequestResult {
54-
return new Observable((observer) => {
69+
request(operation: Operation, forward: NextLink): Observable<RequestResult> {
70+
const subscribeObservable = new Observable<RequestResult>(_observer => {})
71+
72+
// Capture the super method
73+
const prevSubscribe = subscribeObservable.subscribe.bind(
74+
subscribeObservable
75+
)
76+
77+
// Override subscribe to return an `unsubscribe` object, see
78+
// https://github.com/apollographql/subscriptions-transport-ws/blob/master/src/client.ts#L182-L212
79+
subscribeObservable.subscribe = (
80+
observerOrNext:
81+
| Observer<RequestResult>
82+
| ((value: RequestResult) => void),
83+
onError?: (error: any) => void,
84+
onComplete?: () => void
85+
): Subscription => {
86+
// Call super
87+
if (typeof observerOrNext == "function") {
88+
prevSubscribe(observerOrNext, onError, onComplete)
89+
} else {
90+
prevSubscribe(observerOrNext)
91+
}
92+
93+
const observer = getObserver(observerOrNext, onError, onComplete)
94+
let ablyChannel: Types.RealtimeChannelCallbacks | null = null
95+
let subscriptionChannelId: string | null = null
96+
5597
// Check the result of the operation
56-
forward(operation).subscribe({ next: (data) => {
57-
// If the operation has the subscription header, it's a subscription
58-
const subscriptionChannelConfig = this._getSubscriptionChannel(operation)
59-
if (subscriptionChannelConfig.channel) {
60-
// This will keep pushing to `.next`
61-
this._createSubscription(subscriptionChannelConfig, observer)
62-
}
63-
else {
64-
// This isn't a subscription,
65-
// So pass the data along and close the observer.
66-
observer.next(data)
67-
observer.complete()
98+
const resultObservable = forward(operation)
99+
// When the operation is done, try to get the subscription ID from the server
100+
const resultSubscription = resultObservable.subscribe({
101+
next: (data: any) => {
102+
// If the operation has the subscription header, it's a subscription
103+
const subscriptionChannelConfig = this._getSubscriptionChannel(
104+
operation
105+
)
106+
if (subscriptionChannelConfig.channel) {
107+
subscriptionChannelId = subscriptionChannelConfig.channel
108+
// This will keep pushing to `.next`
109+
ablyChannel = this._createSubscription(
110+
subscriptionChannelConfig,
111+
observer
112+
)
113+
} else {
114+
// This isn't a subscription,
115+
// So pass the data along and close the observer.
116+
if (data) {
117+
observer.next(data)
118+
}
119+
observer.complete()
120+
}
121+
},
122+
error: observer.error
123+
// complete: observer.complete Don't pass this because Apollo unsubscribes if you do
124+
})
125+
126+
// Return an object that will unsubscribe _if_ the query was a subscription.
127+
return {
128+
closed: false,
129+
unsubscribe: () => {
130+
if (ablyChannel && subscriptionChannelId) {
131+
const ablyClientId = this.ably.auth.clientId
132+
if (ablyClientId) {
133+
ablyChannel.presence.leave()
134+
} else {
135+
ablyChannel.presence.leaveClient("graphql-subscriber")
136+
}
137+
ablyChannel.unsubscribe()
138+
resultSubscription.unsubscribe()
139+
}
68140
}
69-
}})
70-
})
141+
}
142+
}
143+
144+
return subscribeObservable
71145
}
72146

73147
_getSubscriptionChannel(operation: Operation) {
@@ -79,10 +153,15 @@ class AblyLink extends ApolloLink {
79153
return { channel: subscriptionChannel, key: cipherKey }
80154
}
81155

82-
_createSubscription(subscriptionChannelConfig: { channel: string, key: string }, observer: { next: Function, complete: Function}) {
156+
_createSubscription(
157+
subscriptionChannelConfig: { channel: string; key: string },
158+
observer: { next: Function; complete: Function }
159+
) {
83160
const subscriptionChannel = subscriptionChannelConfig["channel"]
84161
const subscriptionKey = subscriptionChannelConfig["key"]
85-
const ablyOptions = subscriptionKey ? { cipher: { key: subscriptionKey } } : {}
162+
const ablyOptions = subscriptionKey
163+
? { cipher: { key: subscriptionKey } }
164+
: {}
86165
const ablyChannel = this.ably.channels.get(subscriptionChannel, ablyOptions)
87166
const ablyClientId = this.ably.auth.clientId
88167
// Register presence, so that we can detect empty channels and clean them up server-side
@@ -110,6 +189,31 @@ class AblyLink extends ApolloLink {
110189
observer.complete()
111190
}
112191
})
192+
return ablyChannel
193+
}
194+
}
195+
196+
// Turn `subscribe` arguments into an observer-like thing, see getObserver
197+
// https://github.com/apollographql/subscriptions-transport-ws/blob/master/src/client.ts#L347-L361
198+
function getObserver<T>(
199+
observerOrNext: Function | Observer<T>,
200+
onError?: (e: Error) => void,
201+
onComplete?: () => void
202+
) {
203+
if (typeof observerOrNext === "function") {
204+
// Duck-type an observer
205+
return {
206+
next: (v: T) => observerOrNext(v),
207+
error: (e: Error) => onError && onError(e),
208+
complete: () => onComplete && onComplete()
209+
}
210+
} else {
211+
// Make an object that calls to the given object, with safety checks
212+
return {
213+
next: (v: T) => observerOrNext.next && observerOrNext.next(v),
214+
error: (e: Error) => observerOrNext.error && observerOrNext.error(e),
215+
complete: () => observerOrNext.complete && observerOrNext.complete()
216+
}
113217
}
114218
}
115219

javascript_client/src/subscriptions/__tests__/AblyLinkTest.ts

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ function createAbly() {
3030
},
3131
unsubscribe(){
3232
log.push(["unsubscribe", channelName])
33+
this._listeners.splice(0, this._listeners.length)
3334
}
3435
}
3536
},
@@ -72,20 +73,29 @@ function createOperation(options: { subscriptionId: string | null }) {
7273
}
7374
} as unknown) as Operation
7475
}
76+
77+
function createNextLink(log: any[]) {
78+
return (operation: any) => {
79+
log.push(["forward", operation.operationName])
80+
return {
81+
subscribe(info: any) {
82+
info.next()
83+
return {
84+
unsubscribe() {
85+
log.push(["request unsubscribed"])
86+
}
87+
}
88+
}
89+
} as any
90+
}
91+
}
92+
7593
describe("AblyLink", () => {
7694
test("delegates to Ably", () => {
7795
var mockAbly = createAbly()
7896
var log = (mockAbly as any).log
7997
var operation = createOperation({subscriptionId: "sub-1234"})
80-
81-
var nextLink = (operation: any) => {
82-
log.push(["forward", operation.operationName])
83-
return {
84-
subscribe(info: any) {
85-
info.next()
86-
}
87-
} as any
88-
}
98+
var nextLink = createNextLink(log)
8999

90100
var observable = new AblyLink({ ably: mockAbly}).request(operation, nextLink)
91101

@@ -111,15 +121,7 @@ describe("AblyLink", () => {
111121
var mockAbly = createAbly()
112122
var log = (mockAbly as any).log
113123
var operation = createOperation({subscriptionId: null})
114-
115-
var nextLink = (operation: any) => {
116-
log.push(["forward", operation.operationName])
117-
return {
118-
subscribe(info: any) {
119-
info.next()
120-
}
121-
} as any
122-
}
124+
var nextLink = createNextLink(log)
123125

124126
var observable = new AblyLink({ ably: mockAbly}).request(operation, nextLink)
125127

@@ -133,4 +135,32 @@ describe("AblyLink", () => {
133135

134136
expect(log).toEqual([["forward", "operationName"]])
135137
})
138+
139+
test("it can unsubscribe", () => {
140+
var mockAbly = createAbly()
141+
var log = (mockAbly as any).log
142+
var operation = createOperation({subscriptionId: "sub-1234"})
143+
var nextLink = createNextLink(log)
144+
145+
var observable = new AblyLink({ ably: mockAbly}).request(operation, nextLink)
146+
147+
var subscription = observable.subscribe(function(result: any) {
148+
log.push(["received", result])
149+
});
150+
151+
(mockAbly as any).__testTrigger("sub-1234", "update", { data: { result: { data: "data1" }, more: true} });
152+
subscription.unsubscribe();
153+
// This is not received:
154+
(mockAbly as any).__testTrigger("sub-1234", "update", { data: { result: { data: "data2" }, more: true} });
155+
156+
expect(log).toEqual([
157+
["forward", "operationName"],
158+
["subscribe", "sub-1234", "update"],
159+
["received", { data: "data1" }],
160+
["unsubscribe", "sub-1234"],
161+
["request unsubscribed"]
162+
])
163+
})
164+
165+
136166
})

0 commit comments

Comments
 (0)