Skip to content

Commit 80e6518

Browse files
authored
Subscription message is now a ByteBuffer (#118)
1 parent c96054c commit 80e6518

File tree

4 files changed

+35
-24
lines changed

4 files changed

+35
-24
lines changed

Sources/Valkey/Subscriptions/PushToken.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ struct PushToken: RESPTokenDecodable {
3030
enum TokenType: CustomStringConvertible {
3131
case subscribe(subscriptionCount: Int)
3232
case unsubscribe(subscriptionCount: Int)
33-
case message(channel: String, message: String)
33+
case message(channel: String, message: ByteBuffer)
3434
case invalidate(keys: [ValkeyKey])
3535

3636
var description: String {
@@ -76,7 +76,7 @@ struct PushToken: RESPTokenDecodable {
7676
}
7777
let channel = try String(fromRESP: arrayIterator.next()!)
7878
self.value = .channel(channel)
79-
self.type = try TokenType.message(channel: channel, message: String(fromRESP: arrayIterator.next()!))
79+
self.type = try TokenType.message(channel: channel, message: ByteBuffer(fromRESP: arrayIterator.next()!))
8080

8181
case Self.psubscribeString:
8282
guard respArray.count == 3 else {
@@ -99,7 +99,7 @@ struct PushToken: RESPTokenDecodable {
9999
self.value = .pattern(try String(fromRESP: arrayIterator.next()!))
100100
self.type = try TokenType.message(
101101
channel: String(fromRESP: arrayIterator.next()!),
102-
message: String(fromRESP: arrayIterator.next()!)
102+
message: ByteBuffer(fromRESP: arrayIterator.next()!)
103103
)
104104

105105
case Self.ssubscribeString:
@@ -122,7 +122,7 @@ struct PushToken: RESPTokenDecodable {
122122
}
123123
let channel = try String(fromRESP: arrayIterator.next()!)
124124
self.value = .shardChannel(channel)
125-
self.type = try TokenType.message(channel: channel, message: String(fromRESP: arrayIterator.next()!))
125+
self.type = try TokenType.message(channel: channel, message: ByteBuffer(fromRESP: arrayIterator.next()!))
126126

127127
case Self.invalidateString:
128128
guard respArray.count == 2 else {

Sources/Valkey/Subscriptions/ValkeySubscription.swift

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,23 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import NIOCore
16+
1517
/// Message received from subscription
1618
public struct ValkeySubscriptionMessage: Sendable, Equatable {
1719
public let channel: String
18-
public let message: String
20+
public let message: ByteBuffer
1921

20-
package init(channel: String, message: String) {
22+
package init(channel: String, message: ByteBuffer) {
2123
self.channel = channel
2224
self.message = message
2325
}
26+
27+
/// helper function used by tests
28+
package init(channel: String, message: String) {
29+
self.channel = channel
30+
self.message = ByteBuffer(string: message)
31+
}
2432
}
2533

2634
/// Sequence of messages from Valkey subscription

Sources/Valkey/Subscriptions/ValkeySubscriptions.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Logging
16+
import NIOCore
1617
import Synchronization
1718

1819
/// Container for all subscriptions on one connection
@@ -87,7 +88,7 @@ struct ValkeySubscriptions {
8788
case .forwardMessage(let subscriptions):
8889
for subscription in subscriptions {
8990
for key in keys {
90-
subscription.sendMessage(.init(channel: Self.invalidateChannel, message: String(valkeyKey: key)))
91+
subscription.sendMessage(.init(channel: Self.invalidateChannel, message: ByteBuffer(valkeyKey: key)))
9192
}
9293
}
9394
case .doNothing, .none:

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,8 @@ struct GeneratedCommands {
417417
try await connection.subscribe(to: "testSubscriptions") { subscription in
418418
cont.finish()
419419
var iterator = subscription.makeAsyncIterator()
420-
await #expect(throws: Never.self) { try await iterator.next()?.message == "hello" }
421-
await #expect(throws: Never.self) { try await iterator.next()?.message == "goodbye" }
420+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "hello" }
421+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "goodbye" }
422422
}
423423
#expect(await connection.isSubscriptionsEmpty())
424424
}
@@ -450,14 +450,14 @@ struct GeneratedCommands {
450450
try await connection.subscribe(to: "testDoubleSubscription") { stream2 in
451451
var iterator2 = stream2.makeAsyncIterator()
452452
cont.yield()
453-
await #expect(throws: Never.self) { try await iterator.next()?.message == "hello" }
454-
await #expect(throws: Never.self) { try await iterator2.next()?.message == "hello" }
453+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "hello" }
454+
await #expect(throws: Never.self) { try await iterator2.next().map { String(buffer: $0.message) } == "hello" }
455455
// ensure we only see the message once, by waiting for second message.
456-
await #expect(throws: Never.self) { try await iterator.next()?.message == "world" }
457-
await #expect(throws: Never.self) { try await iterator2.next()?.message == "world" }
456+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "world" }
457+
await #expect(throws: Never.self) { try await iterator2.next().map { String(buffer: $0.message) } == "world" }
458458
}
459459
cont.yield()
460-
await #expect(throws: Never.self) { try await iterator.next()?.message == "!" }
460+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "!" }
461461
}
462462
#expect(await connection.isSubscriptionsEmpty())
463463
}
@@ -489,8 +489,8 @@ struct GeneratedCommands {
489489
var iterator = stream.makeAsyncIterator()
490490
var iterator2 = stream2.makeAsyncIterator()
491491
cont.finish()
492-
await #expect(throws: Never.self) { try await iterator.next()?.message == "hello" }
493-
await #expect(throws: Never.self) { try await iterator2.next()?.message == "goodbye" }
492+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "hello" }
493+
await #expect(throws: Never.self) { try await iterator2.next().map { String(buffer: $0.message) } == "goodbye" }
494494
}
495495
}
496496
#expect(await connection.isSubscriptionsEmpty())
@@ -520,9 +520,9 @@ struct GeneratedCommands {
520520
try await connection.subscribe(to: "multi1", "multi2", "multi3") { stream in
521521
var iterator = stream.makeAsyncIterator()
522522
cont.yield()
523-
await #expect(throws: Never.self) { try await iterator.next()?.message == "1" }
524-
await #expect(throws: Never.self) { try await iterator.next()?.message == "2" }
525-
await #expect(throws: Never.self) { try await iterator.next()?.message == "3" }
523+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "1" }
524+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "2" }
525+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "3" }
526526
}
527527
#expect(await connection.isSubscriptionsEmpty())
528528
}
@@ -584,7 +584,9 @@ struct GeneratedCommands {
584584
var iterator = stream.makeAsyncIterator()
585585
var iterator2 = stream2.makeAsyncIterator()
586586
cont.finish()
587-
try #expect(await iterator.next() == .init(channel: "PatternChannelSubscriptions1", message: "hello"))
587+
try #expect(
588+
await iterator.next() == .init(channel: "PatternChannelSubscriptions1", message: "hello")
589+
)
588590
try #expect(await iterator2.next() == .init(channel: "PatternChannelSubscriptions1", message: "hello"))
589591
try #expect(await iterator2.next() == .init(channel: "PatternChannelSubscriptions2", message: "goodbye"))
590592
}
@@ -645,15 +647,15 @@ struct GeneratedCommands {
645647
try await connection.subscribe(to: "testSubscriptions") { subscription in
646648
cont.finish()
647649
var iterator = subscription.makeAsyncIterator()
648-
await #expect(throws: Never.self) { try await iterator.next()?.message == "hello" }
650+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "hello" }
649651
// test we can send commands on subscription connection
650652
try await withKey(connection: connection) { key in
651653
_ = try await connection.set(key: key, value: "Hello")
652654
let response = try await connection.get(key: key)
653655
#expect(response.map { String(buffer: $0) } == "Hello")
654656
}
655657

656-
await #expect(throws: Never.self) { try await iterator.next()?.message == "goodbye" }
658+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "goodbye" }
657659
}
658660
#expect(await connection.isSubscriptionsEmpty())
659661
}
@@ -715,10 +717,10 @@ struct GeneratedCommands {
715717
var iterator = subscription.makeAsyncIterator()
716718
var value = try await iterator.next()
717719
#expect(value?.channel == "__keyspace@0__:\(key)")
718-
#expect(value?.message == "set")
720+
#expect(value.map { String(buffer: $0.message) } == "set")
719721
value = try await iterator.next()
720722
#expect(value?.channel == "__keyspace@0__:\(key)")
721-
#expect(value?.message == "incrby")
723+
#expect(value.map { String(buffer: $0.message) } == "incrby")
722724
}
723725
}
724726
}

0 commit comments

Comments
 (0)