Skip to content

Commit b52c087

Browse files
Added events to create/update/delete operations that propagate to registered observers
1 parent 36d635f commit b52c087

File tree

6 files changed

+168
-10
lines changed

6 files changed

+168
-10
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ As this project matures towards its first beta, a number of features still need
5555
- Fleshing out historical edit metadata
5656
- Migrating entries
5757
- Ranged reads
58-
- Observations
5958

6059
The above list will be kept up to date during development and will likely see additions during that process.
6160

Sources/CodableDatastore/Datastore/Datastore.swift

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -335,15 +335,20 @@ extension Datastore where AccessMode == ReadWrite {
335335
/// Create any missing indexes or prime the datastore for writing.
336336
try await transaction.apply(descriptor: updatedDescriptor, for: self.key)
337337

338-
let existingEntry: (cursor: any InstanceCursorProtocol, instance: CodedType)? = try await {
338+
let existingEntry: (cursor: any InstanceCursorProtocol, instance: CodedType, versionData: Data, instanceData: Data)? = try await {
339339
do {
340340
let existingEntry = try await transaction.primaryIndexCursor(for: idenfifier, datastoreKey: self.key)
341341

342342
let existingVersion = try Version(existingEntry.versionData)
343343
let decoder = try await self.decoder(for: existingVersion)
344344
let existingInstance = try await decoder(existingEntry.instanceData)
345345

346-
return (cursor: existingEntry.cursor, instance: existingInstance)
346+
return (
347+
cursor: existingEntry.cursor,
348+
instance: existingInstance,
349+
versionData: existingEntry.versionData,
350+
instanceData: existingEntry.instanceData
351+
)
347352
} catch DatastoreInterfaceError.instanceNotFound {
348353
return nil
349354
} catch {
@@ -358,6 +363,34 @@ extension Datastore where AccessMode == ReadWrite {
358363
return try await transaction.primaryIndexCursor(inserting: idenfifier, datastoreKey: self.key)
359364
}()
360365

366+
if let existingEntry {
367+
try await transaction.emit(
368+
event: .updated(
369+
id: idenfifier,
370+
oldEntry: ObservationEntry(
371+
versionData: existingEntry.versionData,
372+
instanceData: existingEntry.instanceData
373+
),
374+
newEntry: ObservationEntry(
375+
versionData: versionData,
376+
instanceData: instanceData
377+
)
378+
),
379+
datastoreKey: self.key
380+
)
381+
} else {
382+
try await transaction.emit(
383+
event: .created(
384+
id: idenfifier,
385+
newEntry: ObservationEntry(
386+
versionData: versionData,
387+
instanceData: instanceData
388+
)
389+
),
390+
datastoreKey: self.key
391+
)
392+
}
393+
361394
/// Persist the entry in the primary index
362395
try await transaction.persistPrimaryIndexEntry(
363396
versionData: versionData,
@@ -516,10 +549,11 @@ extension Datastore where AccessMode == ReadWrite {
516549
try await persist(instance, to: instance[keyPath: keypath])
517550
}
518551

519-
public func delete(_ idenfifier: IdentifierType) async throws {
552+
@discardableResult
553+
public func delete(_ idenfifier: IdentifierType) async throws -> CodedType {
520554
try await warmupIfNeeded()
521555

522-
try await persistence._withTransaction(options: [.idempotent]) { transaction in
556+
return try await persistence._withTransaction(options: [.idempotent]) { transaction in
523557

524558
/// Get a cursor to the entry within the primary index.
525559
let existingEntry = try await transaction.primaryIndexCursor(for: idenfifier, datastoreKey: self.key)
@@ -532,6 +566,16 @@ extension Datastore where AccessMode == ReadWrite {
532566
let decoder = try await self.decoder(for: existingVersion)
533567
let existingInstance = try await decoder(existingEntry.instanceData)
534568

569+
try await transaction.emit(
570+
event: .deleted(
571+
id: idenfifier,
572+
oldEntry: ObservationEntry(
573+
versionData: existingEntry.versionData,
574+
instanceData: existingEntry.instanceData
575+
)
576+
),
577+
datastoreKey: self.key
578+
)
535579

536580
var queriedIndexes: Set<String> = []
537581

@@ -602,6 +646,8 @@ extension Datastore where AccessMode == ReadWrite {
602646
datastoreKey: self.key
603647
)
604648
}
649+
650+
return existingInstance
605651
}
606652
}
607653

@@ -629,10 +675,8 @@ extension Datastore where CodedType: Identifiable, IdentifierType == CodedType.I
629675
try await self.load(instance.id)
630676
}
631677

632-
public func observe(_ instance: CodedType) -> AsyncStream<ObservedEvent<CodedType, IdentifierType>> {
633-
return AsyncStream<ObservedEvent<CodedType, IdentifierType>> { continuation in
634-
continuation.finish()
635-
}
678+
public func observe(_ instance: CodedType) async throws -> some TypedAsyncSequence<ObservedEvent<IdentifierType, CodedType>> {
679+
try await observe(instance.id)
636680
}
637681
}
638682

Sources/CodableDatastore/Persistence/DatastoreInterfaceProtocol.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,11 @@ public protocol DatastoreInterfaceProtocol {
247247
datastoreKey: DatastoreKey,
248248
bufferingPolicy limit: ObservationBufferingPolicy
249249
) async throws -> AsyncCompactMapSequence<AsyncStream<ObservedEvent<Data, ObservationEntry>>, ObservedEvent<IdentifierType, ObservationEntry>>
250+
251+
func emit<IdentifierType: Indexable>(
252+
event: ObservedEvent<IdentifierType, ObservationEntry>,
253+
datastoreKey: DatastoreKey
254+
) async throws
250255
}
251256

252257
/// A strategy that handles exhaustion of a buffer’s capacity.

Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/PersistenceDatastore.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,4 +219,14 @@ extension DiskPersistence.Datastore {
219219
private func unregisterObserver(for id: ObserverID) {
220220
observers.removeValue(forKey: id)
221221
}
222+
223+
func emit(
224+
_ event: ObservedEvent<Data, ObservationEntry>
225+
) {
226+
for (_, observer) in observers {
227+
observer.yield(event)
228+
}
229+
}
230+
231+
var hasObservers: Bool { !observers.isEmpty }
222232
}

Sources/CodableDatastore/Persistence/Disk Persistence/Transaction/Transaction.swift

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ extension DiskPersistence {
2929
var deletedIndexes: Set<Datastore.Index> = []
3030
var deletedPages: Set<Datastore.Page> = []
3131

32-
// TODO: entryMutations, so we can send events to observers once the whole thing is finished.
32+
var entryMutations: [(DatastoreKey, ObservedEvent<Data, ObservationEntry>)] = []
33+
var observerCache: [DatastoreKey : Bool] = [:]
3334

3435
var isActive = false
3536

@@ -83,6 +84,7 @@ extension DiskPersistence {
8384

8485
func apply(
8586
rootObjects: [DatastoreKey : Datastore.RootObject],
87+
entryMutations: [(DatastoreKey, ObservedEvent<Data, ObservationEntry>)],
8688
createdRootObjects: Set<Datastore.RootObject>,
8789
createdIndexes: Set<Datastore.Index>,
8890
createdPages: Set<Datastore.Page>,
@@ -96,6 +98,8 @@ extension DiskPersistence {
9698
self.rootObjects[key] = value
9799
}
98100

101+
self.entryMutations.append(contentsOf: entryMutations)
102+
99103
/// We only want to persist the new objects that we didn't also create in this transaction, so if we deleted any objects that we previously just created, remove any references to them as they will only cause bloat once we persist to disk.
100104
let transientRootObjects = self.createdRootObjects.intersection(deletedRootObjects)
101105
let deletedRootObjects = deletedRootObjects.subtracting(transientRootObjects)
@@ -120,6 +124,7 @@ extension DiskPersistence {
120124
if let parent {
121125
try await parent.apply(
122126
rootObjects: rootObjects,
127+
entryMutations: entryMutations,
123128
createdRootObjects: createdRootObjects,
124129
createdIndexes: createdIndexes,
125130
createdPages: createdPages,
@@ -143,6 +148,18 @@ extension DiskPersistence {
143148
}
144149

145150
try await persistence.persist(roots: rootObjects)
151+
152+
var datastores: [DatastoreKey : Datastore] = [:]
153+
for (datastoreKey, event) in entryMutations {
154+
let datastore: Datastore
155+
if let cachedDatastore = datastores[datastoreKey] {
156+
datastore = cachedDatastore
157+
} else {
158+
datastore = try await persistence.persistenceDatastore(for: datastoreKey).0
159+
datastores[datastoreKey] = datastore
160+
}
161+
await datastore.emit(event)
162+
}
146163
}
147164

148165
static func makeTransaction<T>(
@@ -221,6 +238,23 @@ extension DiskPersistence {
221238
return rootObject
222239
}
223240

241+
func hasObservers(for datastoreKey: DatastoreKey) async throws -> Bool {
242+
if let hasObservers = observerCache[datastoreKey] {
243+
return hasObservers
244+
}
245+
246+
if let parent = parent {
247+
let hasObservers = try await parent.hasObservers(for: datastoreKey)
248+
observerCache[datastoreKey] = hasObservers
249+
return hasObservers
250+
}
251+
252+
let (datastore, _) = try await persistence.persistenceDatastore(for: datastoreKey)
253+
let hasObservers = await datastore.hasObservers
254+
observerCache[datastoreKey] = hasObservers
255+
return hasObservers
256+
}
257+
224258
func cursor(for cursor: any CursorProtocol) throws -> Cursor {
225259
guard cursor.persistence as? DiskPersistence === persistence
226260
else { throw DatastoreInterfaceError.unknownCursor }
@@ -862,6 +896,18 @@ extension DiskPersistence.Transaction {
862896
}
863897
}
864898
}
899+
900+
func emit<IdentifierType: Indexable>(
901+
event: ObservedEvent<IdentifierType, ObservationEntry>,
902+
datastoreKey: DatastoreKey
903+
) async throws {
904+
try checkIsActive()
905+
906+
guard try await hasObservers(for: datastoreKey) else { return }
907+
908+
let id = try JSONEncoder.shared.encode(event.id)
909+
entryMutations.append((datastoreKey, event.with(id: id)))
910+
}
865911
}
866912

867913
// MARK: - Helper Types

Tests/CodableDatastoreTests/DiskPersistenceDatastoreTests.swift

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,60 @@ final class DiskPersistenceDatastoreTests: XCTestCase {
8282
XCTAssertEqual(count, 3)
8383
}
8484

85+
func testObservingEntries() async throws {
86+
enum Version: Int, CaseIterable {
87+
case zero
88+
}
89+
90+
struct TestStruct: Codable, Identifiable {
91+
var id: String
92+
var value: Int
93+
}
94+
95+
let persistence = try DiskPersistence(readWriteURL: temporaryStoreURL)
96+
97+
let datastore = Datastore.JSONStore(
98+
persistence: persistence,
99+
key: "test",
100+
version: Version.zero,
101+
migrations: [
102+
.zero: { data, decoder in
103+
try decoder.decode(TestStruct.self, from: data)
104+
}
105+
]
106+
)
107+
108+
let events = try await datastore.observe()
109+
110+
let observations = Task {
111+
var total = 0
112+
loop: for try await event in events {
113+
switch event {
114+
case .created(_, let entry):
115+
total += entry.value
116+
case .updated(_, _, let entry):
117+
total += entry.value
118+
case .deleted(_, let entry):
119+
total += entry.value
120+
break loop
121+
}
122+
}
123+
return total
124+
}
125+
126+
try await datastore.persist(TestStruct(id: "3", value: 3))
127+
try await datastore.persist(TestStruct(id: "1", value: 1))
128+
try await datastore.persist(TestStruct(id: "2", value: 2))
129+
try await datastore.persist(TestStruct(id: "1", value: 5))
130+
try await datastore.delete("2")
131+
try await datastore.persist(TestStruct(id: "1", value: 3))
132+
133+
let count = try await datastore.count
134+
XCTAssertEqual(count, 2)
135+
let total = try await observations.value
136+
XCTAssertEqual(total, 13)
137+
}
138+
85139
func testWritingManyEntries() async throws {
86140
enum Version: Int, CaseIterable {
87141
case zero

0 commit comments

Comments
 (0)