Skip to content

Commit d38d800

Browse files
authored
support multiple column family (#246)
1 parent a54168a commit d38d800

File tree

4 files changed

+185
-60
lines changed

4 files changed

+185
-60
lines changed

Database/Sources/Database/Options.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@ public struct Options: ~Copyable, Sendable {
2222
rocksdb_options_set_create_if_missing(ptr.value, createIfMissing ? 1 : 0)
2323
}
2424

25-
public func setLevelCompactionDynamicLevelBytes(levelCompactionDynamicLevelBytes: Bool) {
25+
public func setLevelCompactionDynamicLevelBytes(_ levelCompactionDynamicLevelBytes: Bool) {
2626
rocksdb_options_set_level_compaction_dynamic_level_bytes(ptr.value, levelCompactionDynamicLevelBytes ? 1 : 0)
2727
}
28+
29+
public func setCreateIfMissingColumnFamilies(_ createIfMissingColumnFamilies: Bool) {
30+
rocksdb_options_set_create_missing_column_families(ptr.value, createIfMissingColumnFamilies ? 1 : 0)
31+
}
2832
}
2933

3034
public struct WriteOptions: ~Copyable, Sendable {

Database/Sources/Database/RocksDB.swift

Lines changed: 81 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ import Foundation
22
import rocksdb
33
import Utils
44

5-
public final class RocksDB: Sendable {
5+
public protocol ColumnFamilyKey: Sendable, CaseIterable, Hashable, RawRepresentable<UInt8> {}
6+
7+
public final class RocksDB<CFKey: ColumnFamilyKey>: Sendable {
68
public enum BatchOperation {
7-
case delete(key: Data)
8-
case put(key: Data, value: Data)
9+
case delete(column: CFKey, key: Data)
10+
case put(column: CFKey, key: Data, value: Data)
911
}
1012

1113
public enum Error: Swift.Error {
@@ -17,12 +19,12 @@ public final class RocksDB: Sendable {
1719
case noData
1820
}
1921

20-
private let dbOptions: Options
2122
private let writeOptions: WriteOptions
2223
private let readOptions: ReadOptions
23-
private let db: SendableOpaquePointer
24+
private let db: SafePointer
25+
private let cfHandles: [SendableOpaquePointer]
2426

25-
public init(path: URL) throws(Error) {
27+
public init(path: URL) throws {
2628
let dbOptions = Options()
2729

2830
// TODO: starting from options here
@@ -32,22 +34,50 @@ public final class RocksDB: Sendable {
3234
dbOptions.increaseParallelism(cpus: cpus)
3335
dbOptions.optimizeLevelStyleCompaction(memtableMemoryBudget: 512 * 1024 * 1024) // 512 MB
3436
dbOptions.setCreateIfMissing(true)
37+
dbOptions.setCreateIfMissingColumnFamilies(true)
38+
39+
let cfOptions = Options()
40+
cfOptions.setLevelCompactionDynamicLevelBytes(true)
41+
42+
var names = CFKey.allCases.map { "\($0)" }
43+
// ensure always have a default column family
44+
if !names.contains("default") {
45+
names.insert("default", at: 0)
46+
}
47+
var cfOptionsList = names.map { _ in cfOptions.value as OpaquePointer? }
48+
49+
var outHandles = [OpaquePointer?](repeating: nil, count: names.count)
3550

3651
// open DB
37-
db = try Self.call { err, _ in
38-
rocksdb_open(dbOptions.value, path.path, &err).asSendable
39-
} onErr: { message throws(Error) in
40-
throw Error.openFailed(message: message)
52+
let dbPtr = try FFIUtils.withCString(names) { cnames in
53+
var cnames = cnames
54+
return try Self.call { err, _ in
55+
rocksdb_open_column_families(
56+
dbOptions.value,
57+
path.path,
58+
Int32(names.count),
59+
&cnames,
60+
&cfOptionsList,
61+
&outHandles,
62+
&err
63+
)
64+
} onErr: { message throws in
65+
throw Error.openFailed(message: message)
66+
}
4167
}
4268

43-
self.dbOptions = dbOptions
69+
db = SafePointer(ptr: dbPtr!, free: rocksdb_close)
70+
71+
cfHandles = outHandles.map { $0!.asSendable }
4472

4573
writeOptions = WriteOptions()
4674
readOptions = ReadOptions()
4775
}
4876

4977
deinit {
50-
rocksdb_close(db.value)
78+
for handle in cfHandles {
79+
rocksdb_column_family_handle_destroy(handle.value)
80+
}
5181
}
5282
}
5383

@@ -57,8 +87,8 @@ extension RocksDB {
5787
private static func call<R>(
5888
_ data: [Data],
5989
fn: (inout UnsafeMutablePointer<Int8>?, [(ptr: UnsafeRawPointer, count: Int)]) -> R,
60-
onErr: (String) throws(Error) -> Void
61-
) throws(Error) -> R {
90+
onErr: (String) throws -> Void
91+
) throws -> R {
6292
var err: UnsafeMutablePointer<Int8>?
6393
defer {
6494
free(err)
@@ -97,54 +127,71 @@ extension RocksDB {
97127
private static func call<R>(
98128
_ data: Data...,
99129
fn: (inout UnsafeMutablePointer<Int8>?, [(ptr: UnsafeRawPointer, count: Int)]) -> R,
100-
onErr: (String) throws(Error) -> Void
101-
) throws(Error) -> R {
130+
onErr: (String) throws -> Void
131+
) throws -> R {
102132
try call(data, fn: fn, onErr: onErr)
103133
}
104134

105135
private static func call<R>(
106136
_ data: Data...,
107137
fn: ([(ptr: UnsafeRawPointer, count: Int)]) -> R
108-
) throws(Error) -> R {
138+
) throws -> R {
109139
try call(data) { _, ptrs in
110140
fn(ptrs)
111-
} onErr: { _ throws(Error) in
141+
} onErr: { _ throws in
112142
// do nothing as it should never be called
113143
}
114144
}
145+
146+
private func getHandle(column: CFKey) -> OpaquePointer {
147+
cfHandles[Int(column.rawValue)].value
148+
}
115149
}
116150

117151
// MARK: - public methods
118152

119153
extension RocksDB {
120-
public func put(key: Data, value: Data) throws(Error) {
154+
public func put(column: CFKey, key: Data, value: Data) throws {
155+
let handle = getHandle(column: column)
121156
try Self.call(key, value) { err, ptrs in
122157
let key = ptrs[0]
123158
let value = ptrs[1]
124-
rocksdb_put(db.value, writeOptions.value, key.ptr, key.count, value.ptr, value.count, &err)
125-
} onErr: { message throws(Error) in
159+
rocksdb_put_cf(
160+
db.value,
161+
writeOptions.value,
162+
handle,
163+
key.ptr,
164+
key.count,
165+
value.ptr,
166+
value.count,
167+
&err
168+
)
169+
} onErr: { message throws in
126170
throw Error.putFailed(message: message)
127171
}
128172
}
129173

130-
public func get(key: Data) throws -> Data? {
174+
public func get(column: CFKey, key: Data) throws -> Data? {
131175
var len = 0
176+
let handle = getHandle(column: column)
132177

133178
let ret = try Self.call(key) { err, ptrs in
134179
let key = ptrs[0]
135-
return rocksdb_get(db.value, readOptions.value, key.ptr, key.count, &len, &err)
136-
} onErr: { message throws(Error) in
180+
return rocksdb_get_cf(db.value, readOptions.value, handle, key.ptr, key.count, &len, &err)
181+
} onErr: { message throws in
137182
throw Error.getFailed(message: message)
138183
}
139184

140185
return ret.map { Data(bytesNoCopy: $0, count: len, deallocator: .free) }
141186
}
142187

143-
public func delete(key: Data) throws {
188+
public func delete(column: CFKey, key: Data) throws {
189+
let handle = getHandle(column: column)
190+
144191
try Self.call(key) { err, ptrs in
145192
let key = ptrs[0]
146-
rocksdb_delete(db.value, writeOptions.value, key.ptr, key.count, &err)
147-
} onErr: { message throws(Error) in
193+
rocksdb_delete_cf(db.value, writeOptions.value, handle, key.ptr, key.count, &err)
194+
} onErr: { message throws in
148195
throw Error.deleteFailed(message: message)
149196
}
150197
}
@@ -155,25 +202,27 @@ extension RocksDB {
155202

156203
for operation in operations {
157204
switch operation {
158-
case let .delete(key):
205+
case let .delete(column, key):
206+
let handle = getHandle(column: column)
159207
try Self.call(key) { ptrs in
160208
let key = ptrs[0]
161-
rocksdb_writebatch_delete(writeBatch, key.ptr, key.count)
209+
rocksdb_writebatch_delete_cf(writeBatch, handle, key.ptr, key.count)
162210
}
163211

164-
case let .put(key, value):
212+
case let .put(column, key, value):
213+
let handle = getHandle(column: column)
165214
try Self.call(key, value) { ptrs in
166215
let key = ptrs[0]
167216
let value = ptrs[1]
168217

169-
rocksdb_writebatch_put(writeBatch, key.ptr, key.count, value.ptr, value.count)
218+
rocksdb_writebatch_put_cf(writeBatch, handle, key.ptr, key.count, value.ptr, value.count)
170219
}
171220
}
172221
}
173222

174223
try Self.call { err, _ in
175224
rocksdb_write(db.value, writeOptions.value, writeBatch, &err)
176-
} onErr: { message throws(Error) in
225+
} onErr: { message throws in
177226
throw Error.batchFailed(message: message)
178227
}
179228
}

Database/Tests/DatabaseTests/RocksDBTests.swift

Lines changed: 79 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ import Testing
66
@testable import Database
77

88
extension String {
9-
var data: Data {
10-
Data(utf8)
11-
}
9+
var data: Data { Data(utf8) }
10+
}
11+
12+
enum Columns: UInt8, Sendable, ColumnFamilyKey {
13+
case col1
14+
case col2
15+
case col3
1216
}
1317

1418
final class RocksDBTests {
@@ -17,49 +21,100 @@ final class RocksDBTests {
1721
return tmpDir.appendingPathComponent("\(UUID().uuidString)")
1822
}()
1923

20-
var rocksDB: RocksDB!
24+
var rocksDB: RocksDB<Columns>!
2125

2226
init() throws {
2327
rocksDB = try RocksDB(path: path)
2428
}
2529

2630
deinit {
27-
rocksDB = nil // close it first
28-
// then delete the files
31+
rocksDB = nil
2932
try! FileManager.default.removeItem(at: path)
3033
}
3134

3235
@Test func basicOperations() throws {
33-
#expect(try rocksDB.get(key: "123".data) == nil)
36+
#expect(try rocksDB.get(column: .col1, key: "123".data) == nil)
3437

35-
try rocksDB.put(key: "123".data, value: "qwe".data)
36-
try rocksDB.put(key: "234".data, value: "asd".data)
38+
try rocksDB.put(column: .col1, key: "123".data, value: "qwe".data)
39+
try rocksDB.put(column: .col1, key: "234".data, value: "asd".data)
3740

38-
#expect(try rocksDB.get(key: "123".data) == "qwe".data)
39-
#expect(try rocksDB.get(key: "234".data) == "asd".data)
41+
#expect(try rocksDB.get(column: .col1, key: "123".data) == "qwe".data)
42+
#expect(try rocksDB.get(column: .col1, key: "234".data) == "asd".data)
4043

41-
try rocksDB.delete(key: "123".data)
44+
try rocksDB.delete(column: .col1, key: "123".data)
4245

43-
#expect(try rocksDB.get(key: "123".data) == nil)
46+
#expect(try rocksDB.get(column: .col1, key: "123".data) == nil)
4447

45-
try rocksDB.put(key: "234".data, value: "asdfg".data)
48+
try rocksDB.put(column: .col1, key: "234".data, value: "asdfg".data)
4649

47-
#expect(try rocksDB.get(key: "234".data) == "asdfg".data)
50+
#expect(try rocksDB.get(column: .col1, key: "234".data) == "asdfg".data)
4851
}
4952

5053
@Test func testBatchOperations() throws {
51-
try rocksDB.put(key: "123".data, value: "qwe".data)
54+
try rocksDB.put(column: .col1, key: "123".data, value: "qwe".data)
55+
56+
try rocksDB.batch(operations: [
57+
.delete(column: .col1, key: "123".data),
58+
.put(column: .col1, key: "234".data, value: "wer".data),
59+
.put(column: .col1, key: "345".data, value: "ert".data),
60+
.delete(column: .col1, key: "234".data),
61+
.put(column: .col1, key: "345".data, value: "ertert".data),
62+
])
63+
64+
#expect(try rocksDB.get(column: .col1, key: "123".data) == nil)
65+
#expect(try rocksDB.get(column: .col1, key: "234".data) == nil)
66+
#expect(try rocksDB.get(column: .col1, key: "345".data) == "ertert".data)
67+
}
68+
69+
@Test func testMultipleColumnFamilies() throws {
70+
// Test operations across different column families
71+
try rocksDB.put(column: .col1, key: "key1".data, value: "value1".data)
72+
try rocksDB.put(column: .col2, key: "key1".data, value: "value2".data)
73+
try rocksDB.put(column: .col3, key: "key1".data, value: "value3".data)
74+
75+
#expect(try rocksDB.get(column: .col1, key: "key1".data) == "value1".data)
76+
#expect(try rocksDB.get(column: .col2, key: "key1".data) == "value2".data)
77+
#expect(try rocksDB.get(column: .col3, key: "key1".data) == "value3".data)
78+
}
79+
80+
@Test func testLargeValues() throws {
81+
// Test handling of large values
82+
let largeValue = Data((0 ..< 1_000_000).map { UInt8($0 % 256) })
83+
try rocksDB.put(column: .col1, key: "large".data, value: largeValue)
5284

85+
let retrieved = try rocksDB.get(column: .col1, key: "large".data)
86+
#expect(retrieved == largeValue)
87+
}
88+
89+
@Test func testBatchOperationsAcrossColumns() throws {
90+
// Test batch operations across different column families
5391
try rocksDB.batch(operations: [
54-
.delete(key: "123".data),
55-
.put(key: "234".data, value: "wer".data),
56-
.put(key: "345".data, value: "ert".data),
57-
.delete(key: "234".data),
58-
.put(key: "345".data, value: "ertert".data),
92+
.put(column: .col1, key: "batch1".data, value: "value1".data),
93+
.put(column: .col2, key: "batch2".data, value: "value2".data),
94+
.put(column: .col3, key: "batch3".data, value: "value3".data),
5995
])
6096

61-
#expect(try rocksDB.get(key: "123".data) == nil)
62-
#expect(try rocksDB.get(key: "234".data) == nil)
63-
#expect(try rocksDB.get(key: "345".data) == "ertert".data)
97+
#expect(try rocksDB.get(column: .col1, key: "batch1".data) == "value1".data)
98+
#expect(try rocksDB.get(column: .col2, key: "batch2".data) == "value2".data)
99+
#expect(try rocksDB.get(column: .col3, key: "batch3".data) == "value3".data)
100+
}
101+
102+
@Test func testEmptyValues() throws {
103+
// Test handling of empty values
104+
try rocksDB.put(column: .col1, key: "empty".data, value: Data())
105+
106+
let retrieved = try rocksDB.get(column: .col1, key: "empty".data)
107+
#expect(retrieved?.isEmpty == true)
108+
}
109+
110+
@Test func testErrorConditions() throws {
111+
// Test invalid operations
112+
let invalidDB = try? RocksDB<Columns>(path: URL(fileURLWithPath: "/nonexistent/path"))
113+
#expect(invalidDB == nil)
114+
115+
// Test deleting non-existent key
116+
try rocksDB.delete(column: .col1, key: "nonexistent".data)
117+
let value = try rocksDB.get(column: .col1, key: "nonexistent".data)
118+
#expect(value == nil)
64119
}
65120
}

0 commit comments

Comments
 (0)