Skip to content

Commit 0c5e369

Browse files
committed
handle close and clear
1 parent 2eaca05 commit 0c5e369

File tree

5 files changed

+38
-8
lines changed

5 files changed

+38
-8
lines changed

Sources/OpenSeaSwift/Internal/Phoenix/PhoenixInternalEvent.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import Foundation
55

66
struct PhoenixInternalEvent: PhoenixEvent {
77
let kind: PhoenixInternalEventKind
8-
let status: Status
8+
let status: Status?
99
}
1010

1111
// MARK: PhoenixEventCodable
1212
extension PhoenixInternalEvent: PhoenixEventCodable {
1313
init(from decoder: Decoder, kind: PhoenixInternalEventKind) throws {
1414
let container = try decoder.container(keyedBy: CodingKeys.self)
15-
let status = try container.decode(Status.self, forKey: .status)
15+
let status = try container.decodeIfPresent(Status.self, forKey: .status)
1616
self.init(kind: kind, status: status)
1717
}
1818

Sources/OpenSeaSwift/Internal/Phoenix/PhoenixInternalEventKind.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ enum PhoenixInternalEventKind: String, Codable {
88
case heartbeat = "heartbeat"
99
case reply = "phx_reply"
1010
case leave = "phx_leave"
11+
case close = "phx_close"
1112
}

Sources/OpenSeaSwift/Internal/Phoenix/PhoenixReplySubscriptionRef.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
import Foundation
55

66
struct PhoenixReplySubscriptionRef: Hashable {
7-
let joinRef: String?
7+
// We do not support joining the same topic multiple times
8+
// let joinRef: String?
89
let ref: String?
910
let topic: String
1011
}

Sources/OpenSeaSwift/Internal/Phoenix/PhoenixSocketService.swift

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ private extension PhoenixSocketServiceImpl {
8989
func clear() {
9090
heartbeatTask?.cancel()
9191
heartbeatTask = nil
92+
topicSubscriptions.removeAll()
93+
replySubscriptions.removeAll()
9294
}
9395

9496
func handle(rawMessage: String) {
@@ -102,7 +104,7 @@ private extension PhoenixSocketServiceImpl {
102104
try handleMessage(messageData: data)
103105
}
104106
} catch {
105-
print("Invalid message: \(rawMessage)")
107+
print("Invalid message: \(rawMessage) (\(error))")
106108
}
107109
}
108110

@@ -113,6 +115,8 @@ private extension PhoenixSocketServiceImpl {
113115
let ref = replySubscriptionRef(forMessage: phoenixMessage)
114116
guard let subscription = replySubscription(forRef: ref) else { return /* No subscription */ }
115117
subscription.resume(message: phoenixMessage)
118+
case .close:
119+
clearTopicSubscription(topic: phoenixMessage.topic)
116120
default:
117121
break
118122
}
@@ -140,8 +144,8 @@ private extension PhoenixSocketServiceImpl {
140144
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
141145
subscription.sink { message in
142146
switch message.event.status {
143-
case .ok: continuation.resume()
144-
case .error: continuation.resume(throwing: PhoenixSocketError.replyError)
147+
case .ok, .none: continuation.resume()
148+
case .error: continuation.resume(throwing: PhoenixSocketError.replyError)
145149
}
146150
}
147151
}
@@ -156,7 +160,7 @@ private extension PhoenixSocketServiceImpl {
156160
// MARK: Subscription
157161
private extension PhoenixSocketServiceImpl {
158162
func replySubscriptionRef<Event>(forMessage message: PhoenixMessage<Event>) -> PhoenixReplySubscriptionRef {
159-
.init(joinRef: message.joinRef, ref: message.ref, topic: message.topic)
163+
.init(ref: message.ref, topic: message.topic)
160164
}
161165

162166
func replySubscription(forRef ref: PhoenixReplySubscriptionRef) -> PhoenixReplySubscription? {

Sources/OpenSeaSwift/Stream/Events/StreamEventPayload.swift

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,31 @@ public extension StreamEvent {
2727
}
2828
}
2929

30-
protocol StreamEventPayload {
30+
extension StreamEvent.Payload: StreamEventPayload {
31+
public var item: StreamItem { subPayload.item }
32+
public var collection: StreamCollection { subPayload.collection }
33+
34+
private var subPayload: StreamEventPayload {
35+
switch self {
36+
case let .itemMetadataUpdated(itemMetadataUpdated):
37+
return itemMetadataUpdated
38+
case let .itemListed(itemListed):
39+
return itemListed
40+
case let .itemSold(itemSold):
41+
return itemSold
42+
case let .itemTransferred(itemTransferred):
43+
return itemTransferred
44+
case let .itemCancelled(itemCancelled):
45+
return itemCancelled
46+
case let .itemReceivedOffer(itemReceivedOffer):
47+
return itemReceivedOffer
48+
case let .itemReceivedBid(itemReceivedBid):
49+
return itemReceivedBid
50+
}
51+
}
52+
}
53+
54+
public protocol StreamEventPayload {
3155
var item: StreamItem { get }
3256
var collection: StreamCollection { get }
3357
}

0 commit comments

Comments
 (0)