Skip to content

Commit 6eef27e

Browse files
Added file-based binary search for locating entries
1 parent 1565750 commit 6eef27e

File tree

6 files changed

+444
-54
lines changed

6 files changed

+444
-54
lines changed

Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastoreIndex.swift

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
//
88

99
import Foundation
10+
import Bytes
1011

1112
typealias DatastoreIndexIdentifier = TypedIdentifier<DiskPersistence<ReadOnly>.Datastore.Index>
1213

@@ -130,4 +131,338 @@ extension DiskPersistence.Datastore.Index {
130131
return pages
131132
}
132133
}
134+
135+
/// Return the page index where a proposed entry would reside on, wether it exists or not.
136+
///
137+
/// This page would have at least one entry with which to achor itself to. For instance, if a page is missing any anchorable information (ie. its header is on a previous page), it won't be returned, instead opting for a page before or after it.
138+
///
139+
/// This means that if a page is returned, and the first complete entry appears mid-way on the page, but a new entry were to be positioned before it, the caller can assume it would reside _after_ any imcomplete entries, but _before_ the first complete one.
140+
///
141+
/// If the returned page contains the start of an entry which would be located before the proposed entry, it is up to the caller to scan forward until that entry is finished and insert the proposed entry after that point.
142+
///
143+
/// ### Examples
144+
///
145+
/// Below are some examples of how this algorithm is expected to perform.
146+
///
147+
/// `5` in `[0, 1, 2]`:
148+
/// ```
149+
/// [0, 1, 2]
150+
/// 0 + 3/2 -> 1.5 -> 1
151+
/// 1 <= 5 ✓
152+
/// [1, 2]
153+
/// 1 + 2/2 -> 2
154+
/// 2 <= 5 ✓
155+
/// [2]
156+
/// ```
157+
///
158+
/// `2` in `[0, 1, 2]`:
159+
/// ```
160+
/// [0, 1, 2]
161+
/// 0 + 3/2 -> 1.5 -> 1
162+
/// 1 <= 2 ✓
163+
/// [1, 2]
164+
/// 1 + 2/2 -> 2
165+
/// 2 <= 2 ✓
166+
/// [2]
167+
/// ```
168+
///
169+
/// `1.1` in `[0, 1, 2]`:
170+
/// ```
171+
/// [0, 1, 2]
172+
/// 0 + 3/2 -> 1.5 -> 1
173+
/// 1 <= 1.1 ✓
174+
/// [1, 2]
175+
/// 1 + 2/2 -> 2
176+
/// 2 <= 1.1 ×
177+
/// [1]
178+
/// ```
179+
///
180+
/// `1` in `[0, 1, 2]`:
181+
/// ```
182+
/// [0, 1, 2]
183+
/// 0 + 3/2 -> 1.5 -> 1
184+
/// 1 <= 1 ✓
185+
/// [1, 2]
186+
/// 1 + 2/2 -> 2
187+
/// 2 <= 1 ×
188+
/// [1]
189+
/// ```
190+
///
191+
/// `0.5` in `[0, 1, 2]`:
192+
/// ```
193+
/// [0, 1, 2]
194+
/// 0 + 3/2 -> 1.5 -> 1
195+
/// 1 <= 0.5 ×
196+
/// [0]
197+
/// ```
198+
///
199+
/// `0` in `[0, 1, 2]`:
200+
/// ```
201+
/// [0, 1, 2]
202+
/// 0 + 3/2 -> 1.5 -> 1
203+
/// 1 <= 0 ×
204+
/// [0]
205+
/// ```
206+
///
207+
/// `-1` in `[0, 1, 2]`:
208+
/// ```
209+
/// [0, 1, 2]
210+
/// 0 + 3/2 -> 1.5 -> 1
211+
/// 1 <= -1 ×
212+
/// [0]
213+
/// ```
214+
///
215+
/// `6` in `[0, 1, 2, 3, 4, 5]`:
216+
/// ```
217+
/// [0, 1, 2, 3, 4, 5]
218+
/// ^
219+
/// 3 <= 6 ✓
220+
/// [3, 4, 5]
221+
/// ^
222+
/// 4 <= 6 ✓
223+
/// [4, 5]
224+
/// ^
225+
/// 5 <= 6 ✓
226+
/// [5]
227+
/// ```
228+
///
229+
/// `3.5` in `[0, 1, 2, 3, 4, 5]`:
230+
/// ```
231+
/// [0, 1, 2, 3, 4, 5]
232+
/// ^
233+
/// 3 <= 3.5 ✓
234+
/// [3, 4, 5]
235+
/// ^
236+
/// 4 <= 3.5 ×
237+
/// [3]
238+
/// ```
239+
///
240+
/// `2.1` in `[0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2]`:
241+
/// ```
242+
/// [0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2]
243+
/// >-----------^
244+
/// 2 <= 2.1 ✓
245+
/// [2, 2]
246+
/// >×
247+
/// [2] // Caller should scan forward at this point
248+
/// ```
249+
///
250+
/// `1.1` in `[0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2]`:
251+
/// ```
252+
/// [0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2]
253+
/// >-----------^
254+
/// 2 <= 1.1 ×
255+
/// [0, 1, 1, 1, 1, 1, 1, 1, 1]
256+
/// >------------×
257+
/// [0, 1, 1, 1]
258+
/// >---×
259+
/// [0, 1]
260+
/// ^--^--^--^--^--^--^--^ // Scanning will stop after enough header data for the entry is aquired, usually after a single page or two.
261+
/// 1 <= 1.1 ✓
262+
/// [1]
263+
/// ```
264+
///
265+
/// `0.1` in `[0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2]`:
266+
/// ```
267+
/// [0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2]
268+
/// >-----------^
269+
/// 2 <= 0.1 ×
270+
/// [0, 1, 1, 1, 1, 1, 1, 1, 1]
271+
/// >------------×
272+
/// [0, 1, 1, 1]
273+
/// >---×
274+
/// [0, 1]
275+
/// ^--^--^--^--^--^--^--^
276+
/// 1 <= 0.1 ×
277+
/// [0]
278+
/// ```
279+
/// - Parameters:
280+
/// - proposedEntry: The entry to use in comparison with other persisted entries.
281+
/// - pages: A collection of pages to check against.
282+
/// - comparator: A comparator to determine order and equality between the proposed entry and a persisted one.
283+
/// - Returns: The index within the pages collection where the entry would reside.
284+
func pageIndex<T>(
285+
for proposedEntry: T,
286+
in pages: [DiskPersistence.Datastore.Page],
287+
comparator: (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
288+
) async throws -> Int? {
289+
var slice = pages[...]
290+
291+
/// Cursor should point to making the first page.
292+
guard !slice.isEmpty
293+
else { return nil }
294+
295+
/// Loosely based off of https://stackoverflow.com/questions/26678362/how-do-i-insert-an-element-at-the-correct-position-into-a-sorted-array-in-swift/70645571#70645571
296+
/// Continue the process until we have a slice with a single entry in it.
297+
while slice.count > 1 {
298+
/// Grab the middle index of our slice. We keep the original and a mutable variant that can scan ahead for ranges of pages.
299+
let originalMiddle = slice.index(slice.startIndex, offsetBy: slice.count/2)
300+
var middle = slice.index(slice.startIndex, offsetBy: slice.count/2)
301+
302+
var bytesForFirstEntry: Bytes?
303+
var firstEntryOfPage: DatastorePageEntry?
304+
305+
/// Start checking the page at the middle index, continuing to scan until we build up enough of an entry to compare to.
306+
pageIterator: for page in pages[middle...] {
307+
let blocks = try await page.blocks
308+
309+
/// Start scanning the page block-by-block, continuing to scan until we build up enough of an entry to compare to.
310+
for try await block in blocks {
311+
switch block {
312+
case .complete(let bytes):
313+
/// We have a complete entry, lets use it and stop scanning
314+
firstEntryOfPage = try DatastorePageEntry(bytes: bytes, isPartial: false)
315+
break pageIterator
316+
case .head(let bytes):
317+
/// We are starting an entry, but will need to go to the next page.
318+
bytesForFirstEntry = bytes
319+
case .slice(let bytes):
320+
/// In the first position, lets skip it.
321+
guard bytesForFirstEntry != nil else { continue }
322+
/// In the final position, lets save and continue.
323+
bytesForFirstEntry?.append(contentsOf: bytes)
324+
case .tail(let bytes):
325+
/// In the first position, lets skip it.
326+
guard bytesForFirstEntry != nil else { continue }
327+
/// In the final position, lets save and stop.
328+
bytesForFirstEntry?.append(contentsOf: bytes)
329+
firstEntryOfPage = try DatastorePageEntry(bytes: bytesForFirstEntry!, isPartial: false)
330+
break pageIterator
331+
}
332+
333+
/// If we have some bytes, attempt to decode them into an entry.
334+
if let bytesForFirstEntry {
335+
firstEntryOfPage = try? DatastorePageEntry(bytes: bytesForFirstEntry, isPartial: false)
336+
}
337+
338+
/// If we have an entry, stop scanning as we can go ahead and operate on it.
339+
if firstEntryOfPage != nil { break pageIterator }
340+
}
341+
342+
/// If we had to advance a page and didn't yet start accumulating data, move our middle since it would be pointless to check that page again if the proposed entry was ordered after the persisted one we found.
343+
if bytesForFirstEntry == nil {
344+
middle = slice.index(middle, offsetBy: 1)
345+
/// If we've gone past the slice, stop here.
346+
guard middle < slice.endIndex
347+
else { break }
348+
}
349+
}
350+
351+
guard bytesForFirstEntry != nil else {
352+
/// If we didn't encounter a single start sequence, a real one must be located before this point, so don't bother checking _any_ of the pages we scanned through a second time.
353+
slice = slice[..<originalMiddle]
354+
continue
355+
}
356+
357+
/// If we don't have a first entry by now, stop here.
358+
guard let firstEntryOfPage
359+
else { throw DiskPersistenceError.invalidPageFormat }
360+
361+
if try comparator(proposedEntry, firstEntryOfPage) == .ascending {
362+
/// If the proposed entry is strictly before the first of the page, repeat the search prior to this page.
363+
slice = slice[..<middle]
364+
} else {
365+
/// If the proposed entry is equal to the first of the page, or comes after it, use the later half to repeat the search.
366+
slice = slice[middle...]
367+
}
368+
}
369+
370+
return slice.startIndex
371+
}
372+
373+
func entry<T>(
374+
for proposedEntry: T,
375+
comparator: (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
376+
) async throws -> (
377+
cursor: DiskPersistence.InstanceCursor,
378+
entry: DatastorePageEntry
379+
) {
380+
try await entry(for: proposedEntry, in: try await orderedPages, comparator: comparator)
381+
}
382+
383+
func entry<T>(
384+
for proposedEntry: T,
385+
in pages: [DiskPersistence.Datastore.Page],
386+
comparator: (_ lhs: T, _ rhs: DatastorePageEntry) throws -> SortOrder
387+
) async throws -> (
388+
cursor: DiskPersistence.InstanceCursor,
389+
entry: DatastorePageEntry
390+
) {
391+
/// Get the page the entry should reside on
392+
guard let startingPageIndex = try await pageIndex(for: proposedEntry, in: pages, comparator: comparator)
393+
else { throw DatastoreInterfaceError.instanceNotFound }
394+
395+
396+
var bytesForEntry: Bytes?
397+
var isEntryComplete = false
398+
var blocksForEntry: [DiskPersistence.CursorBlock] = []
399+
var pageIndex = startingPageIndex
400+
401+
pageIterator: for page in pages[startingPageIndex...] {
402+
defer { pageIndex += 1 }
403+
let blocks = try await page.blocks
404+
var blockIndex = 0
405+
406+
for try await block in blocks {
407+
defer { blockIndex += 1 }
408+
switch block {
409+
case .complete(let bytes):
410+
/// We have a complete entry, lets use it and stop scanning
411+
bytesForEntry = bytes
412+
isEntryComplete = true
413+
case .head(let bytes):
414+
/// We are starting an entry, but will need to go to the next page.
415+
bytesForEntry = bytes
416+
case .slice(let bytes):
417+
/// In the first position, lets skip it.
418+
guard bytesForEntry != nil else { continue }
419+
/// In the final position, lets save and continue.
420+
bytesForEntry?.append(contentsOf: bytes)
421+
case .tail(let bytes):
422+
/// In the first position, lets skip it.
423+
guard bytesForEntry != nil else { continue }
424+
/// In the final position, lets save and stop.
425+
bytesForEntry?.append(contentsOf: bytes)
426+
isEntryComplete = true
427+
}
428+
429+
blocksForEntry.append(DiskPersistence.CursorBlock(
430+
pageIndex: pageIndex,
431+
page: pages[pageIndex],
432+
blockIndex: blockIndex
433+
))
434+
435+
if let bytes = bytesForEntry, isEntryComplete {
436+
let entry = try DatastorePageEntry(bytes: bytes, isPartial: false)
437+
438+
switch try comparator(proposedEntry, entry) {
439+
case .descending:
440+
/// Move on to the next entry.
441+
break
442+
case .equal:
443+
/// We found the entry, so return it.
444+
return (
445+
cursor: DiskPersistence.InstanceCursor(
446+
persistence: datastore.snapshot.persistence,
447+
datastore: datastore,
448+
index: self,
449+
blocks: blocksForEntry
450+
),
451+
entry: entry
452+
)
453+
case .ascending:
454+
/// We must have passed the entry, which could only happen if it didn't exist.
455+
throw DatastoreInterfaceError.instanceNotFound
456+
}
457+
458+
isEntryComplete = false
459+
bytesForEntry = nil
460+
blocksForEntry = []
461+
}
462+
}
463+
}
464+
465+
/// If we got this far, we didn't encounter the entry, and must have passed every entry along the way.
466+
throw DatastoreInterfaceError.instanceNotFound
467+
}
133468
}

Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastorePageEntry.swift

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,46 @@ import Bytes
1313
struct DatastorePageEntry: Hashable {
1414
var headers: [Bytes]
1515
var content: Bytes
16+
var isPartial: Bool = false
1617
}
1718

19+
// MARK: - Decoding
20+
21+
extension DatastorePageEntry {
22+
init(bytes: Bytes, isPartial: Bool) throws {
23+
var iterator = bytes.makeIterator()
24+
25+
var headers: [Bytes] = []
26+
27+
let space = " ".utf8Bytes[0]
28+
let newline = "\n".utf8Bytes[0]
29+
30+
/// First, check for a new line. If we get one, the header section is done.
31+
while let nextByte = iterator.next(), nextByte != newline {
32+
/// Accumulate the following bytes until we encounter a space
33+
var headerSizeBytes = [nextByte]
34+
while let nextByte = iterator.next(), nextByte != space {
35+
headerSizeBytes.append(nextByte)
36+
}
37+
38+
/// Decode those bytes as a decimal number
39+
let decimalSizeString = String(utf8Bytes: headerSizeBytes)
40+
guard let headerSize = Int(decimalSizeString), headerSize > 0, headerSize <= 8*1024
41+
else { throw DiskPersistenceError.invalidEntryFormat }
42+
43+
/// Save the header
44+
headers.append(try iterator.next(Bytes.self, count: headerSize))
45+
46+
/// Make sure it ends in a new line
47+
try iterator.check(utf8: "\n")
48+
}
49+
50+
/// Just collect the rest of the bytes as the content.
51+
self.content = iterator.next(Bytes.self, max: bytes.count)
52+
self.headers = headers
53+
self.isPartial = isPartial
54+
}
55+
}
1856

1957
// MARK: - Encoding
2058

0 commit comments

Comments
 (0)