Skip to content

Commit 72fccff

Browse files
author
Guilherme Souza
committed
trigger event
1 parent 4ef61bb commit 72fccff

File tree

5 files changed

+101
-30
lines changed

5 files changed

+101
-30
lines changed

Sources/Realtime/V2/CallbackManager.swift

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,19 @@ final class CallbackManager: Sendable {
154154
}
155155
}
156156

157+
func triggerSystem(message: RealtimeMessageV2) {
158+
let systemCallbacks = mutableState.callbacks.compactMap {
159+
if case .system(let callback) = $0 {
160+
return callback
161+
}
162+
return nil
163+
}
164+
165+
for systemCallback in systemCallbacks {
166+
systemCallback.callback(message)
167+
}
168+
}
169+
157170
func reset() {
158171
mutableState.setValue(MutableState())
159172
}

Sources/Realtime/V2/RealtimeChannelV2.swift

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
import ConcurrencyExtras
99
import Foundation
10-
import Helpers
1110
import HTTPTypes
11+
import Helpers
1212

1313
#if canImport(FoundationNetworking)
1414
import FoundationNetworking
@@ -59,7 +59,9 @@ extension Socket {
5959
addChannel: { [weak client] in client?.addChannel($0) },
6060
removeChannel: { [weak client] in await client?.removeChannel($0) },
6161
push: { [weak client] in await client?.push($0) },
62-
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
62+
httpSend: { [weak client] in
63+
try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse())
64+
}
6365
)
6466
}
6567
}
@@ -185,7 +187,8 @@ public final class RealtimeChannelV2: Sendable {
185187
@available(
186188
*,
187189
deprecated,
188-
message: "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
190+
message:
191+
"manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
189192
)
190193
public func updateAuth(jwt: String?) async {
191194
logger?.debug("Updating auth token for channel \(topic)")
@@ -238,8 +241,8 @@ public final class RealtimeChannelV2: Sendable {
238241
event: event,
239242
payload: message,
240243
private: config.isPrivate
241-
),
242-
],
244+
)
245+
]
243246
]
244247
)
245248
)
@@ -295,20 +298,27 @@ public final class RealtimeChannelV2: Sendable {
295298

296299
func onMessage(_ message: RealtimeMessageV2) async {
297300
do {
298-
guard let eventType = message.eventType else {
301+
guard let eventType = message._eventType else {
299302
logger?.debug("Received message without event type: \(message)")
300303
return
301304
}
302305

303306
switch eventType {
304307
case .tokenExpired:
305-
logger?.debug(
306-
"Received token expired event. This should not happen, please report this warning."
307-
)
308+
// deprecated type
309+
break
308310

309311
case .system:
310-
logger?.debug("Subscribed to channel \(message.topic)")
311-
status = .subscribed
312+
if message.status == .ok {
313+
logger?.debug("Subscribed to channel \(message.topic)")
314+
status = .subscribed
315+
} else {
316+
logger?.debug(
317+
"Failed to subscribe to channel \(message.topic): \(message.payload)"
318+
)
319+
}
320+
321+
callbackManager.triggerSystem(message: message)
312322

313323
case .reply:
314324
guard
@@ -545,14 +555,24 @@ public final class RealtimeChannelV2: Sendable {
545555
}
546556
}
547557

548-
public func onSystem(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Subscription {
558+
/// Listen for `system` event.
559+
public func onSystem(
560+
callback: @escaping @Sendable (RealtimeMessageV2) -> Void
561+
) -> RealtimeSubscription {
549562
let id = callbackManager.addSystemCallback(callback: callback)
550-
return Subscription { [weak callbackManager, logger] in
563+
return RealtimeSubscription { [weak callbackManager, logger] in
551564
logger?.debug("Removing system callback with id: \(id)")
552565
callbackManager?.removeCallback(id: id)
553566
}
554567
}
555568

569+
/// Listen for `system` event.
570+
public func onSystem(
571+
callback: @escaping @Sendable () -> Void
572+
) -> RealtimeSubscription {
573+
self.onSystem { _ in callback() }
574+
}
575+
556576
@discardableResult
557577
func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus {
558578
let push = mutableState.withValue {

Sources/Realtime/V2/RealtimeMessageV2.swift

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,22 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
2323
self.payload = payload
2424
}
2525

26-
var status: PushStatus? {
26+
/// Status for the received message if any.
27+
public var status: PushStatus? {
2728
payload["status"]
2829
.flatMap(\.stringValue)
2930
.flatMap(PushStatus.init(rawValue:))
3031
}
3132

32-
public var eventType: EventType? {
33+
@available(
34+
*, deprecated,
35+
message: "Access to event type will be removed, please inspect raw event value instead."
36+
)
37+
public var eventType: EventType? { _eventType }
38+
39+
var _eventType: EventType? {
3340
switch event {
34-
case ChannelEvent.system where status == .ok: .system
41+
case ChannelEvent.system: .system
3542
case ChannelEvent.postgresChanges:
3643
.postgresChanges
3744
case ChannelEvent.broadcast:
@@ -44,9 +51,6 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
4451
.presenceDiff
4552
case ChannelEvent.presenceState:
4653
.presenceState
47-
case ChannelEvent.system
48-
where payload["message"]?.stringValue?.contains("access token has expired") == true:
49-
.tokenExpired
5054
case ChannelEvent.reply:
5155
.reply
5256
default:
@@ -62,6 +66,11 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
6266
case error
6367
case presenceDiff
6468
case presenceState
69+
@available(
70+
*, deprecated,
71+
message:
72+
"tokenExpired gets returned as system, check payload for verifying if is a token expiration."
73+
)
6574
case tokenExpired
6675
case reply
6776
}

Tests/RealtimeTests/CallbackManagerTests.swift

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
import ConcurrencyExtras
99
import CustomDump
1010
import Helpers
11-
@testable import Realtime
1211
import XCTest
1312

13+
@testable import Realtime
14+
1415
final class CallbackManagerTests: XCTestCase {
1516
func testIntegration() {
1617
let callbackManager = CallbackManager()
@@ -52,13 +53,15 @@ final class CallbackManagerTests: XCTestCase {
5253
let callbackManager = CallbackManager()
5354
XCTAssertNoLeak(callbackManager)
5455

55-
let changes = [PostgresJoinConfig(
56-
event: .update,
57-
schema: "public",
58-
table: "users",
59-
filter: nil,
60-
id: 1
61-
)]
56+
let changes = [
57+
PostgresJoinConfig(
58+
event: .update,
59+
schema: "public",
60+
table: "users",
61+
filter: nil,
62+
id: 1
63+
)
64+
]
6265

6366
callbackManager.setServerChanges(changes: changes)
6467

@@ -118,7 +121,8 @@ final class CallbackManagerTests: XCTestCase {
118121
receivedActions.withValue { $0.append(action) }
119122
}
120123

121-
let deleteSpecificUserId = callbackManager
124+
let deleteSpecificUserId =
125+
callbackManager
122126
.addPostgresCallback(filter: deleteSpecificUserFilter) { action in
123127
receivedActions.withValue { $0.append(action) }
124128
}
@@ -215,6 +219,22 @@ final class CallbackManagerTests: XCTestCase {
215219
expectNoDifference(receivedAction.value?.joins, joins)
216220
expectNoDifference(receivedAction.value?.leaves, leaves)
217221
}
222+
223+
func testTriggerSystem() {
224+
let callbackManager = CallbackManager()
225+
226+
let receivedMessage = LockIsolated(RealtimeMessageV2?.none)
227+
callbackManager.addSystemCallback { message in
228+
receivedMessage.setValue(message)
229+
}
230+
231+
callbackManager.triggerSystem(
232+
message: RealtimeMessageV2(
233+
joinRef: nil, ref: nil, topic: "test", event: "system", payload: ["status": "ok"]))
234+
235+
XCTAssertEqual(receivedMessage.value?._eventType, .system)
236+
XCTAssertEqual(receivedMessage.value?.status, .ok)
237+
}
218238
}
219239

220240
extension XCTestCase {

Tests/RealtimeTests/RealtimeChannelTests.swift

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
//
77

88
import InlineSnapshotTesting
9-
@testable import Realtime
109
import XCTest
1110
import XCTestDynamicOverlay
1211

12+
@testable import Realtime
13+
1314
final class RealtimeChannelTests: XCTestCase {
1415
let sut = RealtimeChannelV2(
1516
topic: "topic",
@@ -48,9 +49,13 @@ final class RealtimeChannelTests: XCTestCase {
4849

4950
sut.onPresenceChange { _ in }.store(in: &subscriptions)
5051

52+
sut.onSystem {
53+
}
54+
.store(in: &subscriptions)
55+
5156
assertInlineSnapshot(of: sut.callbackManager.callbacks, as: .dump) {
5257
"""
53-
7 elements
58+
8 elements
5459
▿ RealtimeCallback
5560
▿ postgres: PostgresCallback
5661
- callback: (Function)
@@ -112,6 +117,10 @@ final class RealtimeChannelTests: XCTestCase {
112117
▿ presence: PresenceCallback
113118
- callback: (Function)
114119
- id: 7
120+
▿ RealtimeCallback
121+
▿ system: SystemCallback
122+
- callback: (Function)
123+
- id: 8
115124
116125
"""
117126
}

0 commit comments

Comments
 (0)