|
| 1 | +# Subscriptions |
| 2 | + |
| 3 | +Implementing Pub/Sub using valkey-swift |
| 4 | + |
| 5 | +## Overview |
| 6 | + |
| 7 | +Valkey provides publish and subscribe (Pub/Sub) messaging support using the `PUBLISH`, `SUBSCRIBE` and `UNSUBSCRIBE` commands. It has the concept of a channel that a client can both publish to and subscribe to. The server sends any messages published to a channel to clients subscribed to that channel. Valkey channels are not persisted, for instance if a message is published to a channel that has no subscribers, that message is lost. |
| 8 | + |
| 9 | +### Publishing |
| 10 | + |
| 11 | +Valkey has one function for publishing to a channel ``ValkeyClientProtocol/publish(channel:message:)``. As a member function of ``ValkeyClientProtocol``, it is available from the types that conform to it which include ``ValkeyConnection``, ``ValkeyClient`` and ``ValkeyClusterClient``. |
| 12 | + |
| 13 | +```swift |
| 14 | +try await valkeyClient.publish(channel: "channel1", message: "Hello, World!") |
| 15 | +``` |
| 16 | + |
| 17 | +### Subscribing |
| 18 | + |
| 19 | +Use ``ValkeyConnection/subscribe(to:isolation:process:)-(String...,_,_)`` to subscribe to a single or multiple channels and receive every message published to the channel via an AsyncSequence. When you exit the closure provided, the Valkey client sends the relevant `UNSUBSCRIBE` messages. |
| 20 | + |
| 21 | +```swift |
| 22 | +try await valkeyClient.withConnection { connection in |
| 23 | + try await connection.subscribe(to: ["channel1", "channel2"]) { subscription in |
| 24 | + for try await item in subscription { |
| 25 | + // a subscription item includes the channel the message was published on |
| 26 | + // as well as the message |
| 27 | + print(item.channel) |
| 28 | + print(item.message) |
| 29 | + } |
| 30 | + } |
| 31 | +} |
| 32 | +``` |
| 33 | + |
| 34 | +Valkey-swift uses the RESP3 protocol, which allows for commands to be run on the same connection as subsciption. This allows you to call `SET` with the same connection as the subscription, as the next example illustrates: |
| 35 | + |
| 36 | +```swift |
| 37 | +try await connection.subscribe(to: ["channel1"]) { subscription in |
| 38 | + for try await entry in subscription { |
| 39 | + try await connection.set("channel1/last", value: entry.message) |
| 40 | + } |
| 41 | +} |
| 42 | +``` |
| 43 | + |
| 44 | +### Patterns |
| 45 | + |
| 46 | +Valkey allows you to use glob style patterns to subscribe to a range of channels. These are available with the function ``ValkeyConnection/psubscribe(to:isolation:process:)-([String],_,_)``. This is formatted in a similar manner to normal subscriptions. |
| 47 | + |
| 48 | +```swift |
| 49 | +try await connection.subscribe(to: ["channel*"]) { subscription in |
| 50 | + for try await entry in subscription { |
| 51 | + let channel = "\(entry.channel)/last" |
| 52 | + try await connection.set(channel, value: entry.message) |
| 53 | + } |
| 54 | +} |
| 55 | +``` |
| 56 | + |
| 57 | +The code above receives all messages sent to channels prefixed with the string "channel". |
| 58 | + |
| 59 | +More can be found out about Valkey pub/sub in the [Valkey documentation](https://valkey.io/topics/pubsub/). |
| 60 | + |
| 61 | +### Support for Client Side Caching |
| 62 | + |
| 63 | +Client side caching is a way to improve performance of a service using Valkey. Caching the values of specific keys locally avoids putting pressure on your Valkey server unnecessarily. For a client side cache to work, you need to know when to invalidate the local data. Valkey provides two different ways to do this, both using pub/sub: |
| 64 | + |
| 65 | +1) The server remembers the keys a connection has accessed and publishes events to the invalidation channel whenever those keys are modified. |
| 66 | +2) Broadcasting, where the server doesn't keep a record of what keys have been accessed. The client provides a prefix of the keys they are interested in and the server sends events to the invalidation channel whenever any key that matches that prefix is modified. |
| 67 | + |
| 68 | +#### Enabling Tracking |
| 69 | + |
| 70 | +Connections start without invalidation tracking enabled. To enable receiving invalidation events on a connection, call ``ValkeyConnection/clientTracking(status:clientId:prefixes:bcast:optin:optout:noloop:)``. For example: |
| 71 | + |
| 72 | +```swift |
| 73 | +try await connection.clientTracking(status: .on) |
| 74 | +``` |
| 75 | + |
| 76 | +This tells the server to use the first invalidation method where it remembers the keys accessed by a connection. If you would like to track changes to all keys with a prefix, use: |
| 77 | + |
| 78 | +```swift |
| 79 | +try await connection.clientTracking( |
| 80 | + status: .on, |
| 81 | + prefixes: ["object:"], |
| 82 | + bcast: true |
| 83 | +) |
| 84 | +``` |
| 85 | + |
| 86 | +#### Subscribing to Invalidation Events |
| 87 | + |
| 88 | +Once tracking is enabled you can subscribe to invalidation events using ``ValkeyConnection/subscribeKeyInvalidations(process:)``. The AsyncSequence passed to the `process` closure is a list of keys that have been invalidated. |
| 89 | + |
| 90 | +```swift |
| 91 | +try await connection.subscribeKeyInvalidations { keys in |
| 92 | + for try await key in keys { |
| 93 | + myCache.invalidate(key) |
| 94 | + } |
| 95 | +} |
| 96 | +``` |
| 97 | + |
| 98 | +#### Redirecting Invalidation Events |
| 99 | + |
| 100 | +With RESP3 it is possible to perform data operations and receive the invalidation events on the same connection, but Valkey client tracking also allows you to redirect invalidation events to another connection. Given that the Valkey client uses a persistent connection pool, it is preferable to use a single connection for receiving invalidation messages to implement a system wide cache. |
| 101 | + |
| 102 | +For this to work you need to know the id of the connection that is subscribed to the key invalidation events. Get the connection id using ``ValkeyConnection/clientId()`` and use it when you set up tracking. |
| 103 | + |
| 104 | +```swift |
| 105 | +try await connection.clientTracking(status: .on, clientId: id) |
| 106 | +``` |
| 107 | + |
| 108 | +More can be found out about Valkey client side caching in the [Valkey documentation](https://valkey.io/topics/client-side-caching/). |
0 commit comments