Skip to content

Commit d4cc6be

Browse files
authored
feat: add change_seq to transcripts for ingestion support (#868)
* feat: add change_seq to transcripts for ingestion support Add a monotonically increasing change_seq column to the transcript table, backed by a PostgreSQL sequence and BEFORE INSERT OR UPDATE trigger. Every mutation gets a new sequence value, letting external ingesters checkpoint and never miss an update. * chore: regenerate frontend API types
1 parent cdd974b commit d4cc6be

File tree

5 files changed

+159
-11
lines changed

5 files changed

+159
-11
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""add_change_seq_to_transcript
2+
3+
Revision ID: 623af934249a
4+
Revises: 3aa20b96d963
5+
Create Date: 2026-02-19 18:53:12.315440
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "623af934249a"
16+
down_revision: Union[str, None] = "3aa20b96d963"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
# Sequence
23+
op.execute("CREATE SEQUENCE IF NOT EXISTS transcript_change_seq;")
24+
25+
# Column (nullable first for backfill)
26+
op.add_column("transcript", sa.Column("change_seq", sa.BigInteger(), nullable=True))
27+
28+
# Backfill existing rows with sequential values (ordered by created_at for determinism)
29+
op.execute("""
30+
UPDATE transcript SET change_seq = sub.seq FROM (
31+
SELECT id, nextval('transcript_change_seq') AS seq
32+
FROM transcript ORDER BY created_at ASC
33+
) sub WHERE transcript.id = sub.id;
34+
""")
35+
36+
# Now make NOT NULL
37+
op.alter_column("transcript", "change_seq", nullable=False)
38+
39+
# Default for any inserts between now and trigger creation
40+
op.alter_column(
41+
"transcript",
42+
"change_seq",
43+
server_default=sa.text("nextval('transcript_change_seq')"),
44+
)
45+
46+
# Trigger function
47+
op.execute("""
48+
CREATE OR REPLACE FUNCTION set_transcript_change_seq()
49+
RETURNS TRIGGER AS $$
50+
BEGIN
51+
NEW.change_seq := nextval('transcript_change_seq');
52+
RETURN NEW;
53+
END;
54+
$$ LANGUAGE plpgsql;
55+
""")
56+
57+
# Trigger (fires on every INSERT or UPDATE)
58+
op.execute("""
59+
CREATE TRIGGER trigger_transcript_change_seq
60+
BEFORE INSERT OR UPDATE ON transcript
61+
FOR EACH ROW
62+
EXECUTE FUNCTION set_transcript_change_seq();
63+
""")
64+
65+
# Index for efficient polling
66+
op.create_index("idx_transcript_change_seq", "transcript", ["change_seq"])
67+
68+
69+
def downgrade() -> None:
70+
op.execute("DROP TRIGGER IF EXISTS trigger_transcript_change_seq ON transcript;")
71+
op.execute("DROP FUNCTION IF EXISTS set_transcript_change_seq();")
72+
op.drop_index("idx_transcript_change_seq", table_name="transcript")
73+
op.drop_column("transcript", "change_seq")
74+
op.execute("DROP SEQUENCE IF EXISTS transcript_change_seq;")

server/reflector/db/search.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ class SearchResultDB(BaseModel):
151151
title: str | None = None
152152
source_kind: SourceKind
153153
room_id: str | None = None
154+
change_seq: int | None = None
154155
rank: float = Field(..., ge=0, le=1)
155156

156157

@@ -173,6 +174,7 @@ class SearchResult(BaseModel):
173174
total_match_count: NonNegativeInt = Field(
174175
default=0, description="Total number of matches found in the transcript"
175176
)
177+
change_seq: int | None = None
176178

177179
@field_serializer("created_at", when_used="json")
178180
def serialize_datetime(self, dt: datetime) -> str:
@@ -356,6 +358,7 @@ async def search_transcripts(
356358
transcripts.c.user_id,
357359
transcripts.c.room_id,
358360
transcripts.c.source_kind,
361+
transcripts.c.change_seq,
359362
transcripts.c.webvtt,
360363
transcripts.c.long_summary,
361364
sqlalchemy.case(

server/reflector/db/transcripts.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class SourceKind(enum.StrEnum):
3535
FILE = enum.auto()
3636

3737

38+
transcript_change_seq = sqlalchemy.Sequence("transcript_change_seq", metadata=metadata)
39+
3840
transcripts = sqlalchemy.Table(
3941
"transcript",
4042
metadata,
@@ -89,6 +91,12 @@ class SourceKind(enum.StrEnum):
8991
sqlalchemy.Column("webvtt", sqlalchemy.Text),
9092
# Hatchet workflow run ID for resumption of failed workflows
9193
sqlalchemy.Column("workflow_run_id", sqlalchemy.String),
94+
sqlalchemy.Column(
95+
"change_seq",
96+
sqlalchemy.BigInteger,
97+
transcript_change_seq,
98+
server_default=transcript_change_seq.next_value(),
99+
),
92100
sqlalchemy.Index("idx_transcript_recording_id", "recording_id"),
93101
sqlalchemy.Index("idx_transcript_user_id", "user_id"),
94102
sqlalchemy.Index("idx_transcript_created_at", "created_at"),
@@ -229,6 +237,7 @@ class Transcript(BaseModel):
229237
audio_deleted: bool | None = None
230238
webvtt: str | None = None
231239
workflow_run_id: str | None = None # Hatchet workflow run ID for resumption
240+
change_seq: int | None = None
232241

233242
@field_serializer("created_at", when_used="json")
234243
def serialize_datetime(self, dt: datetime) -> str:
@@ -381,6 +390,7 @@ async def get_all(
381390
source_kind: SourceKind | None = None,
382391
room_id: str | None = None,
383392
search_term: str | None = None,
393+
change_seq_from: int | None = None,
384394
return_query: bool = False,
385395
exclude_columns: list[str] = [
386396
"topics",
@@ -401,6 +411,7 @@ async def get_all(
401411
- `filter_recording`: filter out transcripts that are currently recording
402412
- `room_id`: filter transcripts by room ID
403413
- `search_term`: filter transcripts by search term
414+
- `change_seq_from`: filter transcripts with change_seq > this value
404415
"""
405416

406417
query = transcripts.select().join(
@@ -423,6 +434,9 @@ async def get_all(
423434
if search_term:
424435
query = query.where(transcripts.c.title.ilike(f"%{search_term}%"))
425436

437+
if change_seq_from is not None:
438+
query = query.where(transcripts.c.change_seq > change_seq_from)
439+
426440
# Exclude heavy JSON columns from list queries
427441
transcript_columns = [
428442
col for col in transcripts.c if col.name not in exclude_columns
@@ -436,9 +450,10 @@ async def get_all(
436450
)
437451

438452
if order_by is not None:
439-
field = getattr(transcripts.c, order_by[1:])
440453
if order_by.startswith("-"):
441-
field = field.desc()
454+
field = getattr(transcripts.c, order_by[1:]).desc()
455+
else:
456+
field = getattr(transcripts.c, order_by)
442457
query = query.order_by(field)
443458

444459
if filter_empty:

server/reflector/views/transcripts.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ def serialize_datetime(self, dt: datetime) -> str:
111111
room_id: str | None = None
112112
room_name: str | None = None
113113
audio_deleted: bool | None = None
114+
change_seq: int | None = None
114115

115116

116117
class TranscriptParticipantWithEmail(TranscriptParticipant):
@@ -266,20 +267,31 @@ async def transcripts_list(
266267
source_kind: SourceKind | None = None,
267268
room_id: str | None = None,
268269
search_term: str | None = None,
270+
change_seq_from: int | None = None,
271+
sort_by: Literal["created_at", "change_seq"] | None = None,
269272
):
270273
if not user and not settings.PUBLIC_MODE:
271274
raise HTTPException(status_code=401, detail="Not authenticated")
272275

273276
user_id = user["sub"] if user else None
274277

278+
# Default behavior preserved: sort_by=None → "-created_at"
279+
if sort_by == "change_seq":
280+
order_by = "change_seq" # ASC (ascending for checkpoint-based polling)
281+
elif sort_by == "created_at":
282+
order_by = "-created_at" # DESC (newest first, same as current default)
283+
else:
284+
order_by = "-created_at" # default, backward compatible
285+
275286
return await apaginate(
276287
get_database(),
277288
await transcripts_controller.get_all(
278289
user_id=user_id,
279290
source_kind=SourceKind(source_kind) if source_kind else None,
280291
room_id=room_id,
281292
search_term=search_term,
282-
order_by="-created_at",
293+
order_by=order_by,
294+
change_seq_from=change_seq_from,
283295
return_query=True,
284296
),
285297
)
@@ -512,6 +524,7 @@ async def transcript_get(
512524
"room_id": transcript.room_id,
513525
"room_name": room_name,
514526
"audio_deleted": transcript.audio_deleted,
527+
"change_seq": transcript.change_seq,
515528
"participants": participants,
516529
}
517530

www/app/reflector-api.d.ts

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,8 @@ export interface components {
10321032
room_name?: string | null;
10331033
/** Audio Deleted */
10341034
audio_deleted?: boolean | null;
1035+
/** Change Seq */
1036+
change_seq?: number | null;
10351037
};
10361038
/** GetTranscriptSegmentTopic */
10371039
GetTranscriptSegmentTopic: {
@@ -1178,6 +1180,8 @@ export interface components {
11781180
room_name?: string | null;
11791181
/** Audio Deleted */
11801182
audio_deleted?: boolean | null;
1183+
/** Change Seq */
1184+
change_seq?: number | null;
11811185
/** Participants */
11821186
participants:
11831187
| components["schemas"]["TranscriptParticipantWithEmail"][]
@@ -1241,6 +1245,8 @@ export interface components {
12411245
room_name?: string | null;
12421246
/** Audio Deleted */
12431247
audio_deleted?: boolean | null;
1248+
/** Change Seq */
1249+
change_seq?: number | null;
12441250
/** Participants */
12451251
participants:
12461252
| components["schemas"]["TranscriptParticipantWithEmail"][]
@@ -1305,6 +1311,8 @@ export interface components {
13051311
room_name?: string | null;
13061312
/** Audio Deleted */
13071313
audio_deleted?: boolean | null;
1314+
/** Change Seq */
1315+
change_seq?: number | null;
13081316
/** Participants */
13091317
participants:
13101318
| components["schemas"]["TranscriptParticipantWithEmail"][]
@@ -1376,6 +1384,8 @@ export interface components {
13761384
room_name?: string | null;
13771385
/** Audio Deleted */
13781386
audio_deleted?: boolean | null;
1387+
/** Change Seq */
1388+
change_seq?: number | null;
13791389
/** Participants */
13801390
participants:
13811391
| components["schemas"]["TranscriptParticipantWithEmail"][]
@@ -1449,6 +1459,8 @@ export interface components {
14491459
room_name?: string | null;
14501460
/** Audio Deleted */
14511461
audio_deleted?: boolean | null;
1462+
/** Change Seq */
1463+
change_seq?: number | null;
14521464
/** Participants */
14531465
participants:
14541466
| components["schemas"]["TranscriptParticipantWithEmail"][]
@@ -1834,6 +1846,8 @@ export interface components {
18341846
* @default 0
18351847
*/
18361848
total_match_count: number;
1849+
/** Change Seq */
1850+
change_seq?: number | null;
18371851
};
18381852
/**
18391853
* SourceKind
@@ -2146,34 +2160,61 @@ export interface components {
21462160
};
21472161
/** UserTranscriptCreatedData */
21482162
UserTranscriptCreatedData: {
2149-
/** Id */
2163+
/**
2164+
* Id
2165+
* @description A non-empty string
2166+
*/
21502167
id: string;
21512168
};
21522169
/** UserTranscriptDeletedData */
21532170
UserTranscriptDeletedData: {
2154-
/** Id */
2171+
/**
2172+
* Id
2173+
* @description A non-empty string
2174+
*/
21552175
id: string;
21562176
};
21572177
/** UserTranscriptDurationData */
21582178
UserTranscriptDurationData: {
2159-
/** Id */
2179+
/**
2180+
* Id
2181+
* @description A non-empty string
2182+
*/
21602183
id: string;
21612184
/** Duration */
21622185
duration: number;
21632186
};
21642187
/** UserTranscriptFinalTitleData */
21652188
UserTranscriptFinalTitleData: {
2166-
/** Id */
2189+
/**
2190+
* Id
2191+
* @description A non-empty string
2192+
*/
21672193
id: string;
2168-
/** Title */
2194+
/**
2195+
* Title
2196+
* @description A non-empty string
2197+
*/
21692198
title: string;
21702199
};
21712200
/** UserTranscriptStatusData */
21722201
UserTranscriptStatusData: {
2173-
/** Id */
2202+
/**
2203+
* Id
2204+
* @description A non-empty string
2205+
*/
21742206
id: string;
2175-
/** Value */
2176-
value: string;
2207+
/**
2208+
* Value
2209+
* @enum {string}
2210+
*/
2211+
value:
2212+
| "idle"
2213+
| "uploaded"
2214+
| "recording"
2215+
| "processing"
2216+
| "error"
2217+
| "ended";
21772218
};
21782219
/** UserWsTranscriptCreated */
21792220
UserWsTranscriptCreated: {
@@ -2926,6 +2967,8 @@ export interface operations {
29262967
source_kind?: components["schemas"]["SourceKind"] | null;
29272968
room_id?: string | null;
29282969
search_term?: string | null;
2970+
change_seq_from?: number | null;
2971+
sort_by?: ("created_at" | "change_seq") | null;
29292972
/** @description Page number */
29302973
page?: number;
29312974
/** @description Page size */

0 commit comments

Comments
 (0)