Skip to content

Commit 3765a57

Browse files
committed
add transaction_id to record
1 parent f2e7e69 commit 3765a57

File tree

4 files changed

+48
-1
lines changed

4 files changed

+48
-1
lines changed

etc/initdb.d/010-schema.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ create table if not exists record (
107107
topition int references topition (id) on delete cascade,
108108
offset_id bigint not null,
109109
primary key (topition, offset_id),
110+
transaction_id xid8 default pg_current_xact_id() not null,
110111
attributes smallint,
111112
producer_id bigint,
112113
producer_epoch smallint,

tansu-storage/src/pg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1710,7 +1710,7 @@ impl Storage for Postgres {
17101710
let records = self
17111711
.prepare_query(
17121712
&c,
1713-
"record_fetch.sql",
1713+
"record_fetch_pg.sql",
17141714
&[
17151715
&self.cluster,
17161716
&topition.topic(),

tansu-storage/src/sql/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ pub(crate) static SQL: LazyLock<Cache> = LazyLock::new(|| {
184184
include_sql!("record_delete_by_topic.sql"),
185185
),
186186
("record_fetch.sql", include_sql!("record_fetch.sql")),
187+
("record_fetch_pg.sql", include_sql!("record_fetch_pg.sql")),
187188
("record_insert.sql", include_sql!("record_insert.sql")),
188189
("register_broker.sql", include_sql!("register_broker.sql")),
189190
("topic_by_cluster.sql", include_sql!("topic_by_cluster.sql")),
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
-- -*- mode: sql; sql-product: postgres; -*-
2+
-- Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
3+
--
4+
-- Licensed under the Apache License, Version 2.0 (the "License");
5+
-- you may not use this file except in compliance with the License.
6+
-- You may obtain a copy of the License at
7+
--
8+
-- http://www.apache.org/licenses/LICENSE-2.0
9+
--
10+
-- Unless required by applicable law or agreed to in writing, software
11+
-- distributed under the License is distributed on an "AS IS" BASIS,
12+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
-- See the License for the specific language governing permissions and
14+
-- limitations under the License.
15+
16+
-- prepare record_fetch (text, text, integer, integer, integer, integer) as
17+
with sized as (
18+
select
19+
20+
r.offset_id,
21+
r.attributes,
22+
r.timestamp,
23+
r.k,
24+
r.v,
25+
sum(coalesce(length(r.k), 0) + coalesce(length(r.v), 0)) over (order by r.offset_id) as bytes,
26+
r.producer_id,
27+
r.producer_epoch
28+
29+
from
30+
31+
cluster c
32+
join topic t on t.cluster = c.id
33+
join topition tp on tp.topic = t.id
34+
join record r on r.topition = tp.id
35+
36+
where
37+
38+
c.name = $1
39+
and t.name = $2
40+
and tp.partition = $3
41+
and r.offset_id >= $4
42+
and r.offset_id < $6
43+
and r.transaction_id < pg_snapshot_xmin(pg_current_snapshot()))
44+
45+
select * from sized where bytes < $5;

0 commit comments

Comments
 (0)