Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit 82cdccf

Browse files
committed
refactoring deferred commits
1 parent ff45075 commit 82cdccf

File tree

21 files changed

+162
-118
lines changed

21 files changed

+162
-118
lines changed

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ make them catch up with others even if they are HOT, see [chain](chain/README.md
1313
✅ Parallel persistence, there is a long-running write thread spawned for each entity column (no blocking) \
1414
✅ Querying and ranging by secondary index \
1515
✅ Optional dictionaries for low cardinality fields or for building unique values (addresses) \
16-
✅ Sharding of columns which parallelizes their indexing (high quantity/volume columns) \
16+
✅ Sharding of columns which parallelizes their indexing (high quantity/volume columns) :
1717
```rust
1818
#[column(shards = 4)]
1919
#[column(index, shards = 4)]
@@ -71,6 +71,17 @@ to avoid benchmarking our SSD by random-access writes, ie. to rapidly reduce wri
7171
- sort all data in batches before writing it to reduce tree building overhead
7272
- solved by parallelizing writes to all columns into long-running batching threads
7373

74+
### Why Macros?
75+
76+
1. Rust's type system is not as expressive as e.g. Haskell's or Scala's for performance reasons
77+
2. Rust's macro system is powerful and straightforward to use
78+
79+
So, I find model driven development with code generation a great fit for Rust. It performs very well unless we generate 50k lines of code
80+
which would be the case of deeply nested entities with many indexes and dictionaries.
81+
82+
The core idea is about deriving R/W entity methods and nested entity definition `println!("{:#?}", Block::definition()?);` from struct annotations.
83+
Definition holds all the entity meta information, and it is used to create rich R/W transaction contexts that are used by derived entity R/W methods.
84+
7485
### Development
7586

7687
To use redbit in your project:

chains/btc/src/hook.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub(crate) fn write_from_input_refs_using_hash(tx_context: &TransactionWriteTxCo
44
let ids_router = tx_context.inputs.input_id.acquire_router();
55
let ptrs_router = tx_context.inputs.input_utxo_pointer_by_id.acquire_router();
66
let tx_hashes = input_refs.iter().map(|(ir, _)| ir.tx_hash).collect::<Vec<_>>();
7-
tx_context.transaction_hash_index.router.query_and_write(tx_hashes, is_last, Arc::new(move |last_shards, out| {
7+
tx_context.transaction_hash_index.query_and_write(tx_hashes, is_last, Arc::new(move |last_shards, out| {
88
let mut ids = Vec::with_capacity(out.len());
99
let mut pointers = Vec::with_capacity(out.len());
1010
for (index, tx_pointer_buf_opt) in out.into_iter() {

chains/cardano/src/hook.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub(crate) fn write_from_input_refs_using_hash(tx_context: &TransactionWriteTxCo
44
let ids_router = tx_context.inputs.input_id.acquire_router();
55
let ptrs_router = tx_context.inputs.input_utxo_pointer_by_id.acquire_router();
66
let tx_hashes = input_refs.iter().map(|(ir, _)| ir.tx_hash).collect::<Vec<_>>();
7-
tx_context.transaction_hash_index.router.query_and_write(tx_hashes, is_last, Arc::new(move |last_shards, out| {
7+
tx_context.transaction_hash_index.query_and_write(tx_hashes, is_last, Arc::new(move |last_shards, out| {
88
let mut ids = Vec::with_capacity(out.len());
99
let mut pointers = Vec::with_capacity(out.len());
1010
for (index, tx_pointer_buf_opt) in out.into_iter() {

chains/demo/src/hook.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub(crate) fn write_from_input_refs_using_hash(tx_context: &TransactionWriteTxCo
44
let ids_router = tx_context.inputs.input_id.acquire_router();
55
let ptrs_router = tx_context.inputs.input_utxo_pointer_by_id.acquire_router();
66
let tx_hashes = input_refs.iter().map(|(ir, _)| ir.tx_hash).collect::<Vec<_>>();
7-
tx_context.transaction_hash_index.router.query_and_write(tx_hashes, is_last, Arc::new(move |last_shards, out| {
7+
tx_context.transaction_hash_index.query_and_write(tx_hashes, is_last, Arc::new(move |last_shards, out| {
88
let mut ids = Vec::with_capacity(out.len());
99
let mut pointers = Vec::with_capacity(out.len());
1010
for (index, tx_pointer_buf_opt) in out.into_iter() {

chains/ergo/src/hook.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub(crate) fn write_from_input_refs_using_utxos(tx_context: &TransactionWriteTxC
44
let ids_router = tx_context.inputs.input_id.acquire_router();
55
let ptrs_router = tx_context.inputs.input_utxo_pointer_by_id.acquire_router();
66
let tx_hashes = input_refs.iter().map(|(box_id, _)| *box_id).collect::<Vec<_>>();
7-
tx_context.utxos.utxo_box_id_index.router.query_and_write(tx_hashes, is_last, Arc::new(move |last_shards, out| {
7+
tx_context.utxos.utxo_box_id_index.query_and_write(tx_hashes, is_last, Arc::new(move |last_shards, out| {
88
let mut ids = Vec::with_capacity(out.len());
99
let mut pointers = Vec::with_capacity(out.len());
1010
for (index, tx_pointer_buf_opt) in out.into_iter() {

macros/src/column/delete.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,34 @@ use quote::quote;
33

44
pub fn delete_statement(table_var: &Ident) -> TokenStream {
55
quote! {
6-
removed.push(tx_context.#table_var.router.delete_kv(pk)?);
6+
removed.push(tx_context.#table_var.delete_kv(pk)?);
77
}
88
}
99

1010
pub fn delete_index_statement(index_table_var: &Ident) -> TokenStream {
1111
quote! {
12-
removed.push(tx_context.#index_table_var.router.delete_kv(pk)?);
12+
removed.push(tx_context.#index_table_var.delete_kv(pk)?);
1313
}
1414
}
1515

1616
pub fn delete_many_index_statement(index_table: &Ident) -> TokenStream {
1717
quote! {
1818
for pk in pks.iter() {
19-
removed.push(tx_context.#index_table.router.delete_kv(*pk)?);
19+
removed.push(tx_context.#index_table.delete_kv(*pk)?);
2020
}
2121
}
2222
}
2323

2424
pub fn delete_dict_statement(dict_table_var: &Ident) -> TokenStream {
2525
quote! {
26-
removed.push(tx_context.#dict_table_var.router.delete_kv(pk)?);
26+
removed.push(tx_context.#dict_table_var.delete_kv(pk)?);
2727
}
2828
}
2929

3030
pub fn delete_many_dict_statement(dict_table_var: &Ident) -> TokenStream {
3131
quote! {
3232
for pk in pks.iter() {
33-
removed.push(tx_context.#dict_table_var.router.delete_kv(*pk)?);
33+
removed.push(tx_context.#dict_table_var.delete_kv(*pk)?);
3434
}
3535
}
3636
}

macros/src/entity/context.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ pub struct TxContextItem {
4747
pub write_definition: TokenStream,
4848
pub write_begin: TokenStream,
4949
pub async_flush: Option<TokenStream>,
50-
pub deferred_flush: Option<TokenStream>,
5150
pub write_shutdown: TokenStream,
5251
pub read_definition: TokenStream,
5352
}
@@ -60,6 +59,7 @@ pub fn def_tx_context(entity_def: &EntityDef, tx_contexts: &[TxContextItem]) ->
6059
let write_entity_tx_context_ty = &entity_def.write_ctx_type;
6160
let tx_context_name = format_ident!("{}", TX_CONTEXT);
6261
quote! {
62+
#[derive(Debug)]
6363
pub struct #entity_tx_context_ty {
6464
#(#definitions),*
6565
}
@@ -84,7 +84,6 @@ pub fn write_tx_context(entity_def: &EntityDef, tx_contexts: &[TxContextItem]) -
8484
let begins: Vec<TokenStream> = tx_contexts.iter().map(|item| item.write_begin.clone()).collect();
8585
let shutdowns: Vec<TokenStream> = tx_contexts.iter().map(|item| item.write_shutdown.clone()).collect();
8686
let async_flushes: Vec<TokenStream> = tx_contexts.iter().flat_map(|item| item.async_flush.clone()).collect();
87-
let deferred_flushes: Vec<TokenStream> = tx_contexts.iter().flat_map(|item| item.deferred_flush.clone()).collect();
8887
let write_tx_context_name = tx_context_name(TxType::Write);
8988
let write_tx_context_ty = &entity_def.write_ctx_type;
9089
let entity_tx_context_ty = &entity_def.ctx_type;
@@ -114,11 +113,6 @@ pub fn write_tx_context(entity_def: &EntityDef, tx_contexts: &[TxContextItem]) -
114113
#( futures.extend(#async_flushes); )*
115114
Ok(futures)
116115
}
117-
fn commit_ctx_deferred(&self) -> Result<Vec<FlushFuture>, AppError> {
118-
let mut futures: Vec<FlushFuture> = Vec::new();
119-
#( futures.extend(#deferred_flushes); )*
120-
Ok(futures)
121-
}
122116
}
123117
}
124118
}
@@ -164,7 +158,8 @@ pub fn tx_context_plain_item(def: &PlainTableDef) -> TxContextItem {
164158
let key_ty = &def.key_type;
165159
let val_ty: Type = def.value_type.clone().unwrap_or_else(|| syn::parse_str::<Type>("()").unwrap());
166160
let table_def = &def.underlying.definition;
167-
let shards = def.column_props.shards;
161+
let shards= def.column_props.shards;
162+
let root_pk= def.root_pk;
168163

169164
let definition =
170165
quote! {
@@ -183,18 +178,13 @@ pub fn tx_context_plain_item(def: &PlainTableDef) -> TxContextItem {
183178

184179
let def_constructor = quote! {
185180
#var_ident: RedbitTableDefinition::new(
181+
#root_pk,
186182
Partitioning::by_key(#shards),
187183
PlainFactory::new(#name_lit, #table_def),
188184
)
189185
};
190186
let write_begin = quote! { self.#var_ident.begin_async(durability)? };
191-
let async_flush =
192-
if def.root_pk {
193-
Some(quote! { self.#var_ident.flush_two_phased()? })
194-
} else {
195-
Some(quote! { self.#var_ident.flush_async()? })
196-
};
197-
let deferred_flush = Some(quote! { self.#var_ident.flush_deferred()? });
187+
let async_flush = Some(quote! { self.#var_ident.flush_async()? });
198188
let write_shutdown = quote! { self.#var_ident.shutdown_async()? };
199189

200190
TxContextItem {
@@ -204,7 +194,6 @@ pub fn tx_context_plain_item(def: &PlainTableDef) -> TxContextItem {
204194
write_definition,
205195
write_begin,
206196
async_flush,
207-
deferred_flush,
208197
write_shutdown,
209198
read_definition,
210199
}
@@ -237,13 +226,13 @@ pub fn tx_context_index_item(defs: &IndexTableDefs) -> TxContextItem {
237226

238227
let def_constructor = quote! {
239228
#var_ident: RedbitTableDefinition::new(
229+
false,
240230
Partitioning::by_value(#shards),
241231
IndexFactory::new(#name_lit, #lru_cache, #pk_by_index, #index_by_pk),
242232
)
243233
};
244234
let write_begin = quote! { self.#var_ident.begin_async(durability)? };
245235
let async_flush = Some(quote! { self.#var_ident.flush_async()? });
246-
let deferred_flush = Some(quote! { self.#var_ident.flush_deferred()? });
247236
let write_shutdown = quote! { self.#var_ident.shutdown_async()? };
248237

249238
TxContextItem {
@@ -253,7 +242,6 @@ pub fn tx_context_index_item(defs: &IndexTableDefs) -> TxContextItem {
253242
write_definition,
254243
write_begin,
255244
async_flush,
256-
deferred_flush,
257245
write_shutdown,
258246
read_definition,
259247
}
@@ -288,13 +276,13 @@ pub fn tx_context_dict_item(defs: &DictTableDefs) -> TxContextItem {
288276

289277
let def_constructor = quote! {
290278
#var_ident: RedbitTableDefinition::new(
279+
false,
291280
Partitioning::by_value(#shards),
292281
DictFactory::new(#name_lit, #lru_cache, #dict_pk_to_ids, #value_by_dict, #value_to_dict, #dict_pk_by_pk),
293282
)
294283
};
295284
let write_begin = quote! { self.#var_ident.begin_async(durability)? };
296285
let async_flush = Some(quote! { self.#var_ident.flush_async()? });
297-
let deferred_flush = Some(quote! { self.#var_ident.flush_deferred()? });
298286
let write_shutdown = quote! { self.#var_ident.shutdown_async()? };
299287

300288
TxContextItem {
@@ -304,7 +292,6 @@ pub fn tx_context_dict_item(defs: &DictTableDefs) -> TxContextItem {
304292
write_definition,
305293
write_begin,
306294
async_flush,
307-
deferred_flush,
308295
write_shutdown,
309296
read_definition,
310297
}
@@ -329,6 +316,23 @@ pub fn begin_write_fn_def(entity_def: &EntityDef) -> FunctionDef {
329316

330317
}
331318

319+
pub fn definition(entity_def: &EntityDef) -> FunctionDef {
320+
let tx_context_ty = &entity_def.ctx_type;
321+
let fn_name = format_ident!("definition");
322+
let fn_stream = quote! {
323+
pub fn #fn_name() -> Result<#tx_context_ty, AppError> {
324+
#tx_context_ty::definition()
325+
}
326+
};
327+
328+
FunctionDef {
329+
fn_stream,
330+
endpoint: None,
331+
test_stream: None,
332+
bench_stream: None,
333+
}
334+
}
335+
332336
pub fn new_write_fn_def(entity_def: &EntityDef) -> FunctionDef {
333337
let tx_context_ty = &entity_def.ctx_type;
334338
let write_tx_context_ty = &entity_def.write_ctx_type;

macros/src/entity/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ pub fn new(item_struct: &ItemStruct) -> Result<(KeyDef, Vec<FieldDef>, TokenStre
6767
store::persist_def(&entity_def, &store_statements),
6868
store::store_many_def(&entity_def, &store_statements),
6969
store::store_def(&entity_def, &store_statements),
70+
context::definition(&entity_def),
7071
context::begin_write_fn_def(&entity_def),
7172
context::new_write_fn_def(&entity_def),
7273
context::begin_read_fn_def(&entity_def),

macros/src/pk/delete.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ use quote::quote;
33

44
pub fn delete_statement(table_var: &Ident) -> TokenStream {
55
quote! {
6-
removed.push(tx_context.#table_var.router.delete_kv(pk)?);
6+
removed.push(tx_context.#table_var.delete_kv(pk)?);
77
}
88
}
99

1010
pub fn delete_many_statement(table_var: &Ident) -> TokenStream {
1111
quote! {
1212
for pk in pks.iter() {
13-
removed.push(tx_context.#table_var.router.delete_kv(*pk)?);
13+
removed.push(tx_context.#table_var.delete_kv(*pk)?);
1414
}
1515
}
1616
}

macros/src/pk/pk_range.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub fn fn_def(entity_def: &EntityDef, table_var: &Ident) -> FunctionDef {
1111

1212
let fn_stream = quote! {
1313
fn #fn_name(tx_context: &#write_ctx_type, from: #pk_type, until: #pk_type) -> Result<Vec<#pk_type>, AppError> {
14-
let entries = tx_context.#table_var.router.range(from, until)?;
14+
let entries = tx_context.#table_var.range(from, until)?;
1515
let mut results = Vec::new();
1616
for (key, _) in entries {
1717
let pointer: #pk_type = key.as_value();

0 commit comments

Comments
 (0)