Skip to content

Commit 25b46aa

Browse files
committed
[RocksDB] Allow for WriteBatch buffer reuse after commit
1 parent daa27da commit 25b46aa

File tree

3 files changed

+36
-17
lines changed

3 files changed

+36
-17
lines changed

crates/metadata-server/src/raft/storage/rocksdb.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ impl RocksDbStorage {
327327
write_batch,
328328
)
329329
.await
330+
.map(|_| ())
330331
.map_err(Into::into)
331332
}
332333

crates/partition-store/src/partition_store.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,7 @@ impl Transaction for PartitionStoreTransaction<'_> {
971971
self.write_batch_with_index,
972972
)
973973
.await
974+
.map(|_| ())
974975
.map_err(|error| StorageError::Generic(error.into()))
975976
}
976977
}

crates/rocksdb/src/lib.rs

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ impl RocksDb {
155155
self.db.cfs()
156156
}
157157

158+
/// Write a [`rocksdb::WriteBatch`] to the database.
159+
///
160+
/// The batch is consumed and returned on success, allowing callers to reuse
161+
/// it (e.g. by calling [`rocksdb::WriteBatch::clear()`]) without
162+
/// re-allocating. On error the batch is lost.
158163
#[tracing::instrument(skip_all, fields(db = %self.name()))]
159164
pub async fn write_batch(
160165
self: &Arc<Self>,
@@ -163,17 +168,22 @@ impl RocksDb {
163168
io_mode: IoMode,
164169
write_options: rocksdb::WriteOptions,
165170
write_batch: rocksdb::WriteBatch,
166-
) -> Result<(), RocksError> {
171+
) -> Result<rocksdb::WriteBatch, RocksError> {
167172
self.write_batch_internal(
168173
name,
169174
priority,
170175
io_mode,
171176
write_options,
172-
move |db, write_options| db.write_batch(&write_batch, write_options),
177+
write_batch,
178+
|db, write_options, batch| db.write_batch(batch, write_options),
173179
)
174180
.await
175181
}
176182

183+
/// Write a [`rocksdb::WriteBatchWithIndex`] to the database.
184+
///
185+
/// The batch is consumed and returned on success, allowing callers to reuse
186+
/// it without re-allocating. On error the batch is lost.
177187
#[tracing::instrument(skip_all, fields(db = %self.name()))]
178188
pub async fn write_batch_with_index(
179189
self: &Arc<Self>,
@@ -182,27 +192,32 @@ impl RocksDb {
182192
io_mode: IoMode,
183193
write_options: rocksdb::WriteOptions,
184194
write_batch: rocksdb::WriteBatchWithIndex,
185-
) -> Result<(), RocksError> {
195+
) -> Result<rocksdb::WriteBatchWithIndex, RocksError> {
186196
self.write_batch_internal(
187197
name,
188198
priority,
189199
io_mode,
190200
write_options,
191-
move |db, write_options| db.write_batch_with_index(&write_batch, write_options),
201+
write_batch,
202+
|db, write_options, batch| db.write_batch_with_index(batch, write_options),
192203
)
193204
.await
194205
}
195206

196-
async fn write_batch_internal<OP>(
207+
async fn write_batch_internal<B, OP>(
197208
self: &Arc<Self>,
198209
name: &'static str,
199210
priority: Priority,
200211
io_mode: IoMode,
201212
mut write_options: rocksdb::WriteOptions,
213+
batch: B,
202214
write_op: OP,
203-
) -> Result<(), RocksError>
215+
) -> Result<B, RocksError>
204216
where
205-
OP: Fn(&RocksAccess, &rocksdb::WriteOptions) -> Result<(), rocksdb::Error> + Send + 'static,
217+
B: Send + 'static,
218+
OP: Fn(&RocksAccess, &rocksdb::WriteOptions, &B) -> Result<(), rocksdb::Error>
219+
+ Send
220+
+ 'static,
206221
{
207222
// depending on the IoMode, we decide how to do the write.
208223
match io_mode {
@@ -212,14 +227,14 @@ impl RocksDb {
212227
"Blocking IO is allowed for write_batch, stall detection will not be used in this operation!"
213228
);
214229
write_options.set_no_slowdown(false);
215-
write_op(&self.db, &write_options)?;
230+
write_op(&self.db, &write_options, &batch)?;
216231
counter!(STORAGE_IO_OP,
217232
DISPOSITION => DISPOSITION_MAYBE_BLOCKING,
218233
OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(),
219234
PRIORITY => priority.as_static_str(),
220235
)
221236
.increment(1);
222-
return Ok(());
237+
return Ok(batch);
223238
}
224239
IoMode::AlwaysBackground => {
225240
// Operation will block, dispatch to background.
@@ -229,9 +244,10 @@ impl RocksDb {
229244
let task = StorageTask::default()
230245
.priority(priority)
231246
.kind(StorageTaskKind::WriteBatch)
232-
.op(move || {
247+
.op(move || -> Result<B, rocksdb::Error> {
233248
let _x = RocksDbPerfGuard::new(name);
234-
write_op(&db.db, &write_options)
249+
write_op(&db.db, &write_options, &batch)?;
250+
Ok(batch)
235251
})
236252
.build()
237253
.unwrap();
@@ -248,14 +264,14 @@ impl RocksDb {
248264
IoMode::OnlyIfNonBlocking => {
249265
let _x = RocksDbPerfGuard::new(name);
250266
write_options.set_no_slowdown(true);
251-
write_op(&self.db, &write_options)?;
267+
write_op(&self.db, &write_options, &batch)?;
252268
counter!(STORAGE_IO_OP,
253269
DISPOSITION => DISPOSITION_NON_BLOCKING,
254270
OP_TYPE => StorageTaskKind::WriteBatch.as_static_str(),
255271
PRIORITY => priority.as_static_str(),
256272
)
257273
.increment(1);
258-
return Ok(());
274+
return Ok(batch);
259275
}
260276
_ => {}
261277
}
@@ -265,7 +281,7 @@ impl RocksDb {
265281
write_options.set_no_slowdown(true);
266282

267283
let perf_guard = RocksDbPerfGuard::new(name);
268-
let result = write_op(&self.db, &write_options);
284+
let result = write_op(&self.db, &write_options, &batch);
269285
match result {
270286
Ok(_) => {
271287
counter!(STORAGE_IO_OP,
@@ -274,7 +290,7 @@ impl RocksDb {
274290
PRIORITY => priority.as_static_str(),
275291
)
276292
.increment(1);
277-
Ok(())
293+
Ok(batch)
278294
}
279295
Err(e) if is_retryable_error(e.kind()) => {
280296
counter!(STORAGE_IO_OP,
@@ -294,9 +310,10 @@ impl RocksDb {
294310
let task = StorageTask::default()
295311
.priority(priority)
296312
.kind(StorageTaskKind::WriteBatch)
297-
.op(move || {
313+
.op(move || -> Result<B, rocksdb::Error> {
298314
let _x = RocksDbPerfGuard::new(name);
299-
write_op(&db.db, &write_options)
315+
write_op(&db.db, &write_options, &batch)?;
316+
Ok(batch)
300317
})
301318
.build()
302319
.unwrap();

0 commit comments

Comments
 (0)