Skip to content

Commit ad005c6

Browse files
committed
better support of null storage (profile) engine
1 parent 90c68cf commit ad005c6

File tree

11 files changed

+305
-202
lines changed

11 files changed

+305
-202
lines changed

Cargo.toml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ url = { version = "2.5.4", features = ["serde"] }
159159
uuid = { version = "1.13.2", features = ["serde", "v4", "v7"] }
160160
zstd = "0.13.2"
161161

162-
[profile.release]
163-
codegen-units = 1
164-
lto = "thin"
162+
# [profile.release]
163+
# codegen-units = 1
164+
# lto = "thin"
165+
166+
[profile.profiling]
167+
inherits = "release"
168+
debug = true

justfile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ build-examples: (cargo-build "--examples")
1717

1818
release: (cargo-build "--release" "--bin" "tansu" "--no-default-features" "--features" "delta,dynostore,iceberg,libsql,parquet,postgres")
1919

20+
release-sqlite: (cargo-build "--release" "--bin" "tansu" "--no-default-features" "--features" "libsql")
21+
2022
test: test-workspace test-doc
2123

2224
test-workspace:
@@ -381,3 +383,7 @@ customer-topic-create *args: (topic-create "customer" "--partitions=1" "--confi
381383
customer-topic-generator *args: (generator "customer" args)
382384

383385
customer-duckdb-delta: (duckdb "\"select * from delta_scan('s3://lake/tansu.customer');\"")
386+
387+
profile-null:
388+
cargo build --profile profiling
389+
RUST_LOG=warn samply record ./target/profiling/tansu --storage-engine=null://sink

tansu-service/src/api.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use tansu_sans_io::{
1919
ApiKey, ApiVersionsRequest, ApiVersionsResponse, Body, ErrorCode, Frame, Header,
2020
RootMessageMeta, api_versions_response::ApiVersion,
2121
};
22-
use tracing::debug;
2322

2423
use crate::Error;
2524

@@ -167,8 +166,6 @@ where
167166
type Error = E;
168167

169168
async fn serve(&self, ctx: Context<State>, req: Frame) -> Result<Self::Response, Self::Error> {
170-
debug!(?req);
171-
172169
let api_key = req.api_key()?;
173170

174171
if let Some(service) = self.routes.get(&api_key) {

tansu-service/src/frame.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ where
159159
type Error = S::Error;
160160

161161
async fn serve(&self, ctx: Context<State>, req: Frame) -> Result<Self::Response, Self::Error> {
162-
debug!(?req);
163162
let correlation_id = req.correlation_id()?;
164163

165164
let req = Q::try_from(req.body).map_err(Into::into)?;
@@ -216,7 +215,7 @@ where
216215
type Error = S::Error;
217216

218217
async fn serve(&self, ctx: Context<State>, req: Bytes) -> Result<Self::Response, Self::Error> {
219-
let req = Frame::request_from_bytes(req).inspect(|req| debug!(?req))?;
218+
let req = Frame::request_from_bytes(req)?;
220219
let api_key = req.api_key()?;
221220
let api_version = req.api_version()?;
222221
let correlation_id = req.correlation_id()?;

tansu-service/src/stream.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,14 @@ pub struct TcpBytesService<S, State> {
235235
_state: PhantomData<State>,
236236
}
237237

238+
impl<S, State> TcpBytesService<S, State> {
239+
fn elapsed_millis(&self, start: SystemTime) -> u64 {
240+
start
241+
.elapsed()
242+
.map_or(0, |duration| duration.as_millis() as u64)
243+
}
244+
}
245+
238246
impl<S, State> Service<TcpContext, TcpStream> for TcpBytesService<S, State>
239247
where
240248
S: Service<State, Bytes, Response = Bytes>,
@@ -298,7 +306,6 @@ where
298306
.read_exact(&mut request[4..])
299307
.await
300308
.inspect_err(|err| error!(?err))?;
301-
debug!(?request);
302309

303310
REQUEST_SIZE.record(request.len() as u64, &attributes);
304311

@@ -313,15 +320,11 @@ where
313320
.inspect(|response| {
314321
RESPONSE_SIZE.record(response.len() as u64, &attributes);
315322

316-
REQUEST_DURATION.record(
317-
request_start
318-
.elapsed()
319-
.map_or(0, |duration| duration.as_millis() as u64),
320-
&attributes,
321-
);
322-
})?;
323+
let elapsed_millis = self.elapsed_millis(request_start);
324+
debug!(elapsed_millis);
323325

324-
debug!(response = ?&response[..]);
326+
REQUEST_DURATION.record(elapsed_millis, &attributes);
327+
})?;
325328

326329
req.write_all(&response)
327330
.await

tansu-storage/src/lib.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,13 +1700,23 @@ impl Builder<i32, String, Url, Url> {
17001700
message: self.storage.to_string(),
17011701
}),
17021702

1703+
"null" => Ok(StorageContainer::Null(null::Engine::new(
1704+
self.cluster_id.clone(),
1705+
self.node_id,
1706+
self.advertised_listener.clone(),
1707+
))),
1708+
17031709
#[cfg(not(any(
17041710
feature = "dynostore",
17051711
feature = "libsql",
17061712
feature = "postgres",
17071713
feature = "turso"
17081714
)))]
1709-
_storage => Ok(StorageContainer::Null(null::Engine)),
1715+
_storage => Ok(StorageContainer::Null(null::Engine::new(
1716+
self.cluster_id.clone(),
1717+
self.node_id,
1718+
self.advertised_listener.clone(),
1719+
))),
17101720

17111721
#[cfg(any(
17121722
feature = "dynostore",
@@ -1923,7 +1933,7 @@ impl Storage for StorageContainer {
19231933
})
19241934
}
19251935

1926-
#[instrument(skip(self), ret)]
1936+
#[instrument(skip_all, ret)]
19271937
async fn produce(
19281938
&self,
19291939
transaction_id: Option<&str>,

tansu-storage/src/lite.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ impl managed::Manager for ConnectionManager {
221221
type Type = Connection;
222222
type Error = Error;
223223

224-
#[instrument(ret)]
224+
#[instrument(skip_all, ret)]
225225
async fn create(&self) -> Result<Self::Type, Self::Error> {
226226
let start = SystemTime::now();
227227

@@ -239,6 +239,11 @@ impl managed::Manager for ConnectionManager {
239239
}
240240

241241
{
242+
_ = connection
243+
.execute("PRAGMA synchronous = normal", ())
244+
.await
245+
.inspect(|rows| debug!(rows))?;
246+
242247
let mut rows = connection.query("PRAGMA synchronous", ()).await?;
243248

244249
if let Some(row) = rows.next().await.inspect_err(|err| error!(?err))? {
@@ -272,13 +277,11 @@ impl managed::Manager for ConnectionManager {
272277
.inspect(|_| CONNECT_DURATION.record(elapsed_millis(start), &[]))
273278
}
274279

275-
#[instrument(ret)]
276280
async fn recycle(
277281
&self,
278-
obj: &mut Self::Type,
279-
metrics: &managed::Metrics,
282+
_obj: &mut Self::Type,
283+
_metrics: &managed::Metrics,
280284
) -> managed::RecycleResult<Self::Error> {
281-
debug!(?obj, ?metrics);
282285
Ok(())
283286
}
284287
}
@@ -543,13 +546,11 @@ impl Delegate {
543546

544547
async fn idempotent_message_check(
545548
&self,
546-
transaction_id: Option<&str>,
549+
_transaction_id: Option<&str>,
547550
topition: &Topition,
548551
deflated: &deflated::Batch,
549552
connection: &Connection,
550553
) -> Result<()> {
551-
debug!(transaction_id, ?deflated);
552-
553554
let mut rows = connection
554555
.query(
555556
&sql_lookup("producer_epoch_current_for_producer.sql")?,
@@ -663,8 +664,6 @@ impl Delegate {
663664
deflated: deflated::Batch,
664665
tx: &Transaction,
665666
) -> Result<i64> {
666-
debug!(cluster = ?self.cluster, ?transaction_id, ?topition, ?deflated);
667-
668667
let start = SystemTime::now();
669668

670669
let topic = topition.topic();
@@ -731,7 +730,7 @@ impl Delegate {
731730
let key = record.key.as_deref();
732731
let value = record.value.as_deref();
733732

734-
debug!(?delta, ?record, ?offset);
733+
debug!(?delta, ?offset);
735734

736735
_ = self
737736
.prepare_execute(
@@ -1349,7 +1348,7 @@ impl Storage for Engine {
13491348
})
13501349
}
13511350

1352-
#[instrument(ret)]
1351+
#[instrument(skip_all, ret)]
13531352
async fn produce(
13541353
&self,
13551354
transaction_id: Option<&str>,
@@ -2118,8 +2117,6 @@ impl Storage for Delegate {
21182117
) -> Result<i64> {
21192118
let start = SystemTime::now();
21202119

2121-
debug!(cluster = self.cluster, transaction_id, ?topition, ?deflated);
2122-
21232120
let tx = self.transaction().await.inspect(|_| {
21242121
debug!(after_produce_transaction = elapsed_millis(start));
21252122
})?;

0 commit comments

Comments
 (0)