Skip to content

Commit ef5c850

Browse files
committed
final 0.5.10 fixes
1 parent 15ed000 commit ef5c850

File tree

6 files changed

+152
-50
lines changed

6 files changed

+152
-50
lines changed

justfile

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -387,26 +387,18 @@ customer-topic-generator *args: (generator "customer" args)
387387

388388
customer-duckdb-delta: (duckdb "\"select * from delta_scan('s3://lake/tansu.customer');\"")
389389

390-
broker-memory profile="profiling":
391-
cargo build --profile {{ profile }} --bin tansu
392-
./target/{{ replace(profile, "dev", "debug") }}/tansu broker --storage-engine=memory:// 2>&1 | tee broker.log
390+
broker-memory profile="profiling": (build profile "dynostore") (tansu-broker profile "--storage-engine=memory://")
393391

394-
broker-null profile="profiling":
395-
cargo build --profile {{ profile }} --bin tansu
396-
./target/{{ replace(profile, "dev", "debug") }}/tansu --storage-engine=null://sink 2>&1 | tee broker.log
392+
broker-null profile="profiling": (build profile "default") (tansu-broker profile "--storage-engine=null://")
397393

398394
clean-tansu-db:
399395
rm -f tansu.db*
400396

401-
broker-sqlite profile="profiling": clean-tansu-db
402-
cargo build --profile {{ profile }} --features libsql --bin tansu
403-
./target/{{ replace(profile, "dev", "debug") }}/tansu broker --storage-engine=sqlite://tansu.db 2>&1 | tee broker.log
397+
broker-sqlite profile="profiling": clean-tansu-db (build profile "libsql") (tansu-broker profile "--storage-engine=sqlite://tansu.db")
404398

405399
broker-s3 profile="profiling": (build profile "dynostore") docker-compose-down minio-up minio-ready-local minio-local-alias minio-tansu-bucket (tansu-broker profile "--storage-engine=s3://tansu/")
406400

407-
broker-postgres profile="profiling":
408-
cargo build --profile {{ profile }} --features postgres --bin tansu
409-
./target/{{ replace(profile, "dev", "debug") }}/tansu broker --storage-engine=postgres://pmorgan@localhost 2>&1 | tee broker.log
401+
broker-postgres profile="profiling": (build profile "postgres") docker-compose-down db-up (tansu-broker profile "--storage-engine=postgres://pmorgan@localhost")
410402

411403
samply-null profile="profiling":
412404
cargo build --profile {{ profile }} --bin tansu

tansu-sans-io/src/record/deflated.rs

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
1+
// Copyright ⓒ 2024-2026 Peter Morgan <peter.james.morgan@gmail.com>
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -177,6 +177,71 @@ impl TryFrom<Vec<u8>> for Batch {
177177
}
178178
}
179179

180+
impl TryFrom<Bytes> for Batch {
181+
type Error = Error;
182+
fn try_from(mut encoded: Bytes) -> result::Result<Self, Self::Error> {
183+
let base_offset = encoded.try_get_i64()?;
184+
let batch_length = encoded.try_get_i32()?;
185+
186+
let partition_leader_epoch = encoded.try_get_i32()?;
187+
let magic = encoded.try_get_i8()?;
188+
let crc = encoded.try_get_u32()?;
189+
190+
let crc_data_size = usize::try_from(batch_length).map(|batch_length| {
191+
batch_length
192+
- size_of_val(&partition_leader_epoch)
193+
- size_of_val(&magic)
194+
- size_of_val(&crc)
195+
})?;
196+
197+
let crc_data = &encoded[..crc_data_size];
198+
199+
let computed = {
200+
let mut digest = crc_fast::Digest::new(crc_fast::CrcAlgorithm::Crc32Iscsi);
201+
digest.update(crc_data);
202+
203+
digest.finalize() as u32
204+
};
205+
206+
if computed != crc {
207+
error!(crc, computed);
208+
}
209+
210+
let attributes = encoded.try_get_i16()?;
211+
let last_offset_delta = encoded.try_get_i32()?;
212+
let base_timestamp = encoded.try_get_i64()?;
213+
let max_timestamp = encoded.try_get_i64()?;
214+
let producer_id = encoded.try_get_i64()?;
215+
let producer_epoch = encoded.try_get_i16()?;
216+
let base_sequence = encoded.try_get_i32()?;
217+
let record_count = encoded.try_get_u32()?;
218+
219+
let record_data_size =
220+
usize::try_from(batch_length).map(|batch_length| batch_length - FIXED_BATCH_LENGTH)?;
221+
222+
let record_data = encoded.slice(..record_data_size);
223+
224+
let batch = Batch {
225+
base_offset,
226+
batch_length,
227+
partition_leader_epoch,
228+
magic,
229+
crc,
230+
attributes,
231+
last_offset_delta,
232+
base_timestamp,
233+
max_timestamp,
234+
producer_id,
235+
producer_epoch,
236+
base_sequence,
237+
record_count,
238+
record_data,
239+
};
240+
241+
Ok(batch)
242+
}
243+
}
244+
180245
impl TryFrom<&[u8]> for Batch {
181246
type Error = Error;
182247

tansu-storage/src/dynostore.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
1+
// Copyright ⓒ 2024-2026 Peter Morgan <peter.james.morgan@gmail.com>
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -48,7 +48,6 @@ use tansu_sans_io::{
4848
AddPartitionsToTxnPartitionResult, AddPartitionsToTxnTopicResult,
4949
},
5050
create_topics_request::{CreatableTopic, CreatableTopicConfig},
51-
de::BatchDecoder,
5251
delete_groups_response::DeletableGroupResult,
5352
delete_records_request::DeleteRecordsTopic,
5453
delete_records_response::DeleteRecordsTopicResult,
@@ -402,8 +401,7 @@ impl DynoStore {
402401
}
403402

404403
fn decode(&self, encoded: Bytes) -> Result<deflated::Batch> {
405-
let decoder = BatchDecoder::new(encoded);
406-
deflated::Batch::deserialize(decoder).map_err(Into::into)
404+
deflated::Batch::try_from(encoded).map_err(Into::into)
407405
}
408406

409407
async fn get<V>(&self, location: &Path) -> Result<(V, Version)>
@@ -555,6 +553,7 @@ impl Storage for DynoStore {
555553
}
556554
}
557555

556+
#[instrument(skip_all, fields(topic = %topic.name))]
558557
async fn create_topic(&self, topic: CreatableTopic, _validate_only: bool) -> Result<Uuid> {
559558
match self
560559
.meta
@@ -564,6 +563,8 @@ impl Storage for DynoStore {
564563
}
565564

566565
let id = Uuid::now_v7();
566+
debug!(%id);
567+
567568
let td = TopicMetadata {
568569
id,
569570
topic: topic.clone(),

tansu-storage/src/dynostore/metadata.rs

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
1+
// Copyright ⓒ 2024-2026 Peter Morgan <peter.james.morgan@gmail.com>
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@ use std::{
1717
ops::DerefMut,
1818
slice::from_ref,
1919
sync::{Arc, LazyLock, Mutex},
20-
time::{Duration, SystemTime},
20+
time::Duration,
2121
};
2222

2323
use async_trait::async_trait;
@@ -37,7 +37,17 @@ use super::METER;
3737
#[derive(Clone, Debug, Eq, PartialEq)]
3838
struct CacheEntry {
3939
version: UpdateVersion,
40-
tagged_at: SystemTime,
40+
}
41+
42+
impl From<GetOptions> for CacheEntry {
43+
fn from(value: GetOptions) -> Self {
44+
Self {
45+
version: UpdateVersion {
46+
e_tag: value.if_none_match,
47+
version: None,
48+
},
49+
}
50+
}
4151
}
4252

4353
impl From<&PutResult> for CacheEntry {
@@ -49,7 +59,6 @@ impl From<&PutResult> for CacheEntry {
4959

5060
Self {
5161
version: UpdateVersion { e_tag, version },
52-
tagged_at: SystemTime::now(),
5362
}
5463
}
5564
}
@@ -63,7 +72,6 @@ impl From<&GetResult> for CacheEntry {
6372

6473
Self {
6574
version: UpdateVersion { e_tag, version },
66-
tagged_at: SystemTime::now(),
6775
}
6876
}
6977
}
@@ -142,14 +150,13 @@ impl<O> ObjectStore for Cache<O>
142150
where
143151
O: ObjectStore,
144152
{
153+
#[instrument(skip_all, fields(location = %location))]
145154
async fn put_opts(
146155
&self,
147156
location: &Path,
148157
payload: PutPayload,
149158
opts: PutOptions,
150159
) -> Result<PutResult, object_store::Error> {
151-
debug!(%location, ?opts);
152-
153160
let method = KeyValue::new("method", "put_opts");
154161

155162
REQUESTS.add(1, from_ref(&method));
@@ -174,6 +181,7 @@ where
174181
})
175182
}
176183

184+
#[instrument(skip_all, fields(location = %location))]
177185
async fn put_multipart_opts(
178186
&self,
179187
location: &Path,
@@ -201,7 +209,7 @@ where
201209
})
202210
}
203211

204-
#[instrument(skip_all, fields(%location), ret)]
212+
#[instrument(skip_all, fields(%location, if_none_match = options.if_none_match), ret)]
205213
async fn get_opts(
206214
&self,
207215
location: &Path,
@@ -260,14 +268,14 @@ where
260268
}
261269

262270
self.object_store
263-
.get_opts(location, options)
271+
.get_opts(location, options.clone())
264272
.await
265273
.inspect(|get_result| {
266274
let e_tag = get_result.meta.e_tag.clone();
267275
let version = get_result.meta.version.clone();
268276

269277
if let Ok(mut guard) = self.entries.lock() {
270-
debug!(%location, e_tag, version);
278+
debug!(e_tag, version);
271279

272280
let replacement = CacheEntry::from(get_result);
273281

@@ -284,22 +292,48 @@ where
284292
Some(_) => "replace",
285293
};
286294

295+
debug!(outcome);
296+
287297
ENTRIES.add(1, &[method.clone(), KeyValue::new("outcome", outcome)]);
288298
}
289299
})
290300
.inspect_err(|error| {
291301
debug!(%location, ?error);
292302

293-
ERRORS.add(
294-
1,
295-
&[
296-
method,
297-
KeyValue::new("error", object_store_error_name(error)),
298-
],
299-
);
303+
if matches!(error, object_store::Error::NotModified { .. }) {
304+
if let Ok(mut guard) = self.entries.lock() {
305+
let replacement = CacheEntry::from(options);
306+
307+
let outcome = match guard
308+
.deref_mut()
309+
.insert_evict(location.to_owned(), replacement.clone(), true)
310+
.ok()
311+
.flatten()
312+
{
313+
None => "add",
314+
315+
Some(existing) if existing == replacement => "existing",
316+
317+
Some(_) => "replace",
318+
};
319+
320+
debug!(outcome);
321+
322+
ENTRIES.add(1, &[method.clone(), KeyValue::new("outcome", outcome)]);
323+
}
324+
} else {
325+
ERRORS.add(
326+
1,
327+
&[
328+
method,
329+
KeyValue::new("error", object_store_error_name(error)),
330+
],
331+
);
332+
}
300333
})
301334
}
302335

336+
#[instrument(skip_all, fields(location = %location))]
303337
async fn delete(&self, location: &Path) -> Result<(), object_store::Error> {
304338
debug!(%location);
305339

@@ -328,6 +362,7 @@ where
328362
})
329363
}
330364

365+
#[instrument(skip_all, fields(prefix))]
331366
fn list(
332367
&self,
333368
prefix: Option<&Path>,
@@ -337,6 +372,7 @@ where
337372
self.object_store.list(prefix)
338373
}
339374

375+
#[instrument(skip_all, fields(prefix))]
340376
async fn list_with_delimiter(
341377
&self,
342378
prefix: Option<&Path>,
@@ -359,6 +395,7 @@ where
359395
})
360396
}
361397

398+
#[instrument(skip_all, fields(from = %from, to = %to))]
362399
async fn copy(&self, from: &Path, to: &Path) -> Result<(), object_store::Error> {
363400
debug!(%from, %to);
364401
REQUESTS.add(1, &[KeyValue::new("method", "copy")]);
@@ -375,6 +412,7 @@ where
375412
})
376413
}
377414

415+
#[instrument(skip_all, fields(from = %from, to = %to))]
378416
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<(), object_store::Error> {
379417
debug!(%from, %to);
380418
REQUESTS.add(1, &[KeyValue::new("method", "copy_if_not_exists")]);

0 commit comments

Comments
 (0)