Skip to content

Commit 3ac1c75

Browse files
committed
changed: made a separate product library AsyncDataLoader
1 parent b2ceab5 commit 3ac1c75

14 files changed

+1676
-619
lines changed

Package.resolved

Lines changed: 20 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,29 @@ let package = Package(
88
platforms: [.macOS(.v12), .iOS(.v15), .tvOS(.v15), .watchOS(.v8)],
99
products: [
1010
.library(name: "DataLoader", targets: ["DataLoader"]),
11+
.library(name: "AsyncDataLoader", targets: ["AsyncDataLoader"]),
1112
],
1213
dependencies: [
1314
.package(url: "https://github.com/apple/swift-algorithms.git", from: "1.0.0"),
1415
.package(url: "https://github.com/adam-fowler/async-collections", from: "0.0.1"),
16+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"),
1517
],
1618
targets: [
1719
.target(
1820
name: "DataLoader",
21+
dependencies: [
22+
.product(name: "NIO", package: "swift-nio"),
23+
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
24+
]
25+
),
26+
.target(
27+
name: "AsyncDataLoader",
1928
dependencies: [
2029
.product(name: "Algorithms", package: "swift-algorithms"),
2130
.product(name: "AsyncCollections", package: "async-collections"),
2231
]
2332
),
2433
.testTarget(name: "DataLoaderTests", dependencies: ["DataLoader"]),
34+
.testTarget(name: "AsyncDataLoaderTests", dependencies: ["AsyncDataLoader"]),
2535
]
2636
)
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
import Algorithms
2+
import AsyncCollections
3+
4+
public enum DataLoaderValue<T: Sendable>: Sendable {
5+
case success(T)
6+
case failure(Error)
7+
}
8+
9+
public typealias BatchLoadFunction<Key: Hashable & Sendable, Value: Sendable> = @Sendable (_ keys: [Key]) async throws -> [DataLoaderValue<Value>]
10+
private typealias LoaderQueue<Key: Hashable & Sendable, Value: Sendable> = [(key: Key, channel: Channel<Value, Error>)]
11+
12+
/// DataLoader creates a public API for loading data from a particular
13+
/// data back-end with unique keys such as the id column of a SQL table
14+
/// or document name in a MongoDB database, given a batch loading function.
15+
///
16+
/// Each DataLoader instance contains a unique memoized cache. Use caution
17+
/// when used in long-lived applications or those which serve many users
18+
/// with different access permissions and consider creating a new instance
19+
/// per data request.
20+
public actor DataLoader<Key: Hashable & Sendable, Value: Sendable> {
21+
private let batchLoadFunction: BatchLoadFunction<Key, Value>
22+
private let options: DataLoaderOptions<Key, Value>
23+
24+
private var cache = [Key: Channel<Value, Error>]()
25+
private var queue = LoaderQueue<Key, Value>()
26+
27+
private var dispatchScheduled = false
28+
29+
public init(
30+
options: DataLoaderOptions<Key, Value> = DataLoaderOptions(),
31+
batchLoadFunction: @escaping BatchLoadFunction<Key, Value>
32+
) {
33+
self.options = options
34+
self.batchLoadFunction = batchLoadFunction
35+
}
36+
37+
/// Loads a key, returning the value represented by that key.
38+
public func load(key: Key) async throws -> Value {
39+
let cacheKey = options.cacheKeyFunction?(key) ?? key
40+
41+
if options.cachingEnabled, let cached = cache[cacheKey] {
42+
return try await cached.value
43+
}
44+
45+
let channel = Channel<Value, Error>()
46+
47+
if options.batchingEnabled {
48+
queue.append((key: key, channel: channel))
49+
50+
if let executionPeriod = options.executionPeriod, !dispatchScheduled {
51+
Task.detached {
52+
try await Task.sleep(nanoseconds: executionPeriod)
53+
try await self.execute()
54+
}
55+
56+
dispatchScheduled = true
57+
}
58+
} else {
59+
Task.detached {
60+
do {
61+
let results = try await self.batchLoadFunction([key])
62+
63+
if results.isEmpty {
64+
await channel.fail(DataLoaderError.noValueForKey("Did not return value for key: \(key)"))
65+
} else {
66+
let result = results[0]
67+
68+
switch result {
69+
case let .success(value):
70+
await channel.fulfill(value)
71+
case let .failure(error):
72+
await channel.fail(error)
73+
}
74+
}
75+
} catch {
76+
await channel.fail(error)
77+
}
78+
}
79+
}
80+
81+
if options.cachingEnabled {
82+
cache[cacheKey] = channel
83+
}
84+
85+
return try await channel.value
86+
}
87+
88+
/// Loads multiple keys, promising an array of values:
89+
///
90+
/// ```swift
91+
/// async let aAndB = try myLoader.loadMany(keys: [ "a", "b" ])
92+
/// ```
93+
///
94+
/// This is equivalent to the more verbose:
95+
///
96+
/// ```swift
97+
/// async let aAndB = [
98+
/// myLoader.load(key: "a"),
99+
/// myLoader.load(key: "b")
100+
/// ]
101+
/// ```
102+
/// or
103+
/// ```swift
104+
/// async let a = myLoader.load(key: "a")
105+
/// async let b = myLoader.load(key: "b")
106+
/// ```
107+
public func loadMany(keys: [Key]) async throws -> [Value] {
108+
guard !keys.isEmpty else {
109+
return []
110+
}
111+
112+
return try await keys.concurrentMap { try await self.load(key: $0) }
113+
}
114+
115+
/// Clears the value at `key` from the cache, if it exists. Returns itself for
116+
/// method chaining.
117+
@discardableResult
118+
public func clear(key: Key) -> DataLoader<Key, Value> {
119+
let cacheKey = options.cacheKeyFunction?(key) ?? key
120+
121+
cache.removeValue(forKey: cacheKey)
122+
123+
return self
124+
}
125+
126+
/// Clears the entire cache. To be used when some event results in unknown
127+
/// invalidations across this particular `DataLoader`. Returns itself for
128+
/// method chaining.
129+
@discardableResult
130+
public func clearAll() -> DataLoader<Key, Value> {
131+
cache.removeAll()
132+
133+
return self
134+
}
135+
136+
/// Adds the provied key and value to the cache. If the key already exists, no
137+
/// change is made. Returns itself for method chaining.
138+
@discardableResult
139+
public func prime(key: Key, value: Value) async throws -> DataLoader<Key, Value> {
140+
let cacheKey = options.cacheKeyFunction?(key) ?? key
141+
142+
if cache[cacheKey] == nil {
143+
let channel = Channel<Value, Error>()
144+
145+
Task.detached {
146+
await channel.fulfill(value)
147+
}
148+
149+
cache[cacheKey] = channel
150+
}
151+
152+
return self
153+
}
154+
155+
public func execute() async throws {
156+
// Take the current loader queue, replacing it with an empty queue.
157+
let batch = queue
158+
159+
queue = []
160+
161+
if dispatchScheduled {
162+
dispatchScheduled = false
163+
}
164+
165+
guard !batch.isEmpty else {
166+
return ()
167+
}
168+
169+
// If a maxBatchSize was provided and the queue is longer, then segment the
170+
// queue into multiple batches, otherwise treat the queue as a single batch.
171+
if let maxBatchSize = options.maxBatchSize, maxBatchSize > 0, maxBatchSize < batch.count {
172+
try await batch.chunks(ofCount: maxBatchSize).asyncForEach { slicedBatch in
173+
try await self.executeBatch(batch: Array(slicedBatch))
174+
}
175+
} else {
176+
try await executeBatch(batch: batch)
177+
}
178+
}
179+
180+
private func executeBatch(batch: LoaderQueue<Key, Value>) async throws {
181+
let keys = batch.map { $0.key }
182+
183+
if keys.isEmpty {
184+
return
185+
}
186+
187+
// Step through the values, resolving or rejecting each Promise in the
188+
// loaded queue.
189+
do {
190+
let values = try await batchLoadFunction(keys)
191+
192+
if values.count != keys.count {
193+
throw DataLoaderError.typeError("The function did not return an array of the same length as the array of keys. \nKeys count: \(keys.count)\nValues count: \(values.count)")
194+
}
195+
196+
for entry in batch.enumerated() {
197+
let result = values[entry.offset]
198+
199+
switch result {
200+
case let .failure(error):
201+
await entry.element.channel.fail(error)
202+
case let .success(value):
203+
await entry.element.channel.fulfill(value)
204+
}
205+
}
206+
} catch {
207+
await failedExecution(batch: batch, error: error)
208+
}
209+
}
210+
211+
private func failedExecution(batch: LoaderQueue<Key, Value>, error: Error) async {
212+
for (key, channel) in batch {
213+
_ = clear(key: key)
214+
215+
await channel.fail(error)
216+
}
217+
}
218+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
public enum DataLoaderError: Error {
2+
case typeError(String)
3+
case noValueForKey(String)
4+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
public struct DataLoaderOptions<Key: Hashable, Value>: Sendable {
2+
/// Default `true`. Set to `false` to disable batching, invoking
3+
/// `batchLoadFunction` with a single load key. This is
4+
/// equivalent to setting `maxBatchSize` to `1`.
5+
public let batchingEnabled: Bool
6+
7+
/// Default `nil`. Limits the number of items that get passed in to the
8+
/// `batchLoadFn`. May be set to `1` to disable batching.
9+
public let maxBatchSize: Int?
10+
11+
/// Default `true`. Set to `false` to disable memoization caching, creating a
12+
/// new `EventLoopFuture` and new key in the `batchLoadFunction`
13+
/// for every load of the same key.
14+
public let cachingEnabled: Bool
15+
16+
/// Default `2ms`. Defines the period of time that the DataLoader should
17+
/// wait and collect its queue before executing. Faster times result
18+
/// in smaller batches quicker resolution, slower times result in larger
19+
/// batches but slower resolution.
20+
/// This is irrelevant if batching is disabled.
21+
public let executionPeriod: UInt64?
22+
23+
/// Default `nil`. Produces cache key for a given load key. Useful
24+
/// when objects are keys and two objects should be considered equivalent.
25+
public let cacheKeyFunction: (@Sendable (Key) -> Key)?
26+
27+
public init(
28+
batchingEnabled: Bool = true,
29+
cachingEnabled: Bool = true,
30+
maxBatchSize: Int? = nil,
31+
executionPeriod: UInt64? = 2_000_000,
32+
cacheKeyFunction: (@Sendable (Key) -> Key)? = nil
33+
) {
34+
self.batchingEnabled = batchingEnabled
35+
self.cachingEnabled = cachingEnabled
36+
self.executionPeriod = executionPeriod
37+
self.maxBatchSize = maxBatchSize
38+
self.cacheKeyFunction = cacheKeyFunction
39+
}
40+
}

0 commit comments

Comments
 (0)