Skip to content

Commit 15ab2e3

Browse files
feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform Daily.co has been battle-tested and is ready to be the default. Whereby remains available for rooms that explicitly set it. * feat: enforce Hatchet for all multitrack processing Remove use_celery option from rooms - multitrack (Daily) recordings now always use Hatchet workflows. Celery remains for single-track (Whereby) file processing only. - Remove use_celery column from room table - Simplify dispatch logic to always use Hatchet for multitracks - Update tests to mock Hatchet instead of Celery * fix: update whereby test to patch Hatchet instead of removed Celery import --------- Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
1 parent 1ce1c7a commit 15ab2e3

File tree

7 files changed

+188
-232
lines changed

7 files changed

+188
-232
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""drop_use_celery_column
2+
3+
Revision ID: 3aa20b96d963
4+
Revises: e69f08ead8ea
5+
Create Date: 2026-02-05 10:12:44.065279
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = "3aa20b96d963"
16+
down_revision: Union[str, None] = "e69f08ead8ea"
17+
branch_labels: Union[str, Sequence[str], None] = None
18+
depends_on: Union[str, Sequence[str], None] = None
19+
20+
21+
def upgrade() -> None:
22+
with op.batch_alter_table("room", schema=None) as batch_op:
23+
batch_op.drop_column("use_celery")
24+
25+
26+
def downgrade() -> None:
27+
with op.batch_alter_table("room", schema=None) as batch_op:
28+
batch_op.add_column(
29+
sa.Column(
30+
"use_celery",
31+
sa.Boolean(),
32+
server_default=sa.text("false"),
33+
nullable=False,
34+
)
35+
)

server/reflector/db/rooms.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,6 @@
5757
sqlalchemy.String,
5858
nullable=False,
5959
),
60-
sqlalchemy.Column(
61-
"use_celery",
62-
sqlalchemy.Boolean,
63-
nullable=False,
64-
server_default=false(),
65-
),
6660
sqlalchemy.Column(
6761
"skip_consent",
6862
sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
9791
ics_last_sync: datetime | None = None
9892
ics_last_etag: str | None = None
9993
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
100-
use_celery: bool = False
10194
skip_consent: bool = False
10295

10396

server/reflector/services/transcript_process.py

Lines changed: 77 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,10 @@
1515
from hatchet_sdk.clients.rest.models import V1TaskStatus
1616

1717
from reflector.db.recordings import recordings_controller
18-
from reflector.db.rooms import rooms_controller
1918
from reflector.db.transcripts import Transcript, transcripts_controller
2019
from reflector.hatchet.client import HatchetClientManager
2120
from reflector.logger import logger
2221
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
23-
from reflector.pipelines.main_multitrack_pipeline import (
24-
task_pipeline_multitrack_process,
25-
)
2622
from reflector.utils.string import NonEmptyString
2723

2824

@@ -181,124 +177,98 @@ async def dispatch_transcript_processing(
181177
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
182178
"""
183179
if isinstance(config, MultitrackProcessingConfig):
184-
use_celery = False
185-
if config.room_id:
186-
room = await rooms_controller.get_by_id(config.room_id)
187-
use_celery = room.use_celery if room else False
188-
189-
use_hatchet = not use_celery
190-
191-
if use_celery:
192-
logger.info(
193-
"Room uses legacy Celery processing",
194-
room_id=config.room_id,
195-
transcript_id=config.transcript_id,
180+
# Multitrack processing always uses Hatchet (no Celery fallback)
181+
# First check if we can replay (outside transaction since it's read-only)
182+
transcript = await transcripts_controller.get_by_id(config.transcript_id)
183+
if transcript and transcript.workflow_run_id and not force:
184+
can_replay = await HatchetClientManager.can_replay(
185+
transcript.workflow_run_id
196186
)
197-
198-
if use_hatchet:
199-
# First check if we can replay (outside transaction since it's read-only)
200-
transcript = await transcripts_controller.get_by_id(config.transcript_id)
201-
if transcript and transcript.workflow_run_id and not force:
202-
can_replay = await HatchetClientManager.can_replay(
203-
transcript.workflow_run_id
187+
if can_replay:
188+
await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
189+
logger.info(
190+
"Replaying Hatchet workflow",
191+
workflow_id=transcript.workflow_run_id,
204192
)
205-
if can_replay:
206-
await HatchetClientManager.replay_workflow(
207-
transcript.workflow_run_id
208-
)
209-
logger.info(
210-
"Replaying Hatchet workflow",
211-
workflow_id=transcript.workflow_run_id,
212-
)
213-
return None
214-
else:
215-
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
216-
# Log and proceed to start new workflow
217-
try:
218-
status = await HatchetClientManager.get_workflow_run_status(
219-
transcript.workflow_run_id
220-
)
221-
logger.info(
222-
"Old workflow not replayable, starting new",
223-
old_workflow_id=transcript.workflow_run_id,
224-
old_status=status.value,
225-
)
226-
except NotFoundException:
227-
# Workflow deleted from Hatchet but ID still in DB
228-
logger.info(
229-
"Old workflow not found in Hatchet, starting new",
230-
old_workflow_id=transcript.workflow_run_id,
231-
)
232-
233-
# Force: cancel old workflow if exists
234-
if force and transcript and transcript.workflow_run_id:
193+
return None
194+
else:
195+
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
196+
# Log and proceed to start new workflow
235197
try:
236-
await HatchetClientManager.cancel_workflow(
198+
status = await HatchetClientManager.get_workflow_run_status(
237199
transcript.workflow_run_id
238200
)
239201
logger.info(
240-
"Cancelled old workflow (--force)",
241-
workflow_id=transcript.workflow_run_id,
202+
"Old workflow not replayable, starting new",
203+
old_workflow_id=transcript.workflow_run_id,
204+
old_status=status.value,
242205
)
243206
except NotFoundException:
207+
# Workflow deleted from Hatchet but ID still in DB
244208
logger.info(
245-
"Old workflow already deleted (--force)",
246-
workflow_id=transcript.workflow_run_id,
209+
"Old workflow not found in Hatchet, starting new",
210+
old_workflow_id=transcript.workflow_run_id,
247211
)
248-
await transcripts_controller.update(
249-
transcript, {"workflow_run_id": None}
250-
)
251212

252-
# Re-fetch and check for concurrent dispatch (optimistic approach).
253-
# No database lock - worst case is duplicate dispatch, but Hatchet
254-
# workflows are idempotent so this is acceptable.
255-
transcript = await transcripts_controller.get_by_id(config.transcript_id)
256-
if transcript and transcript.workflow_run_id:
257-
# Another process started a workflow between validation and now
258-
try:
259-
status = await HatchetClientManager.get_workflow_run_status(
260-
transcript.workflow_run_id
213+
# Force: cancel old workflow if exists
214+
if force and transcript and transcript.workflow_run_id:
215+
try:
216+
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
217+
logger.info(
218+
"Cancelled old workflow (--force)",
219+
workflow_id=transcript.workflow_run_id,
220+
)
221+
except NotFoundException:
222+
logger.info(
223+
"Old workflow already deleted (--force)",
224+
workflow_id=transcript.workflow_run_id,
225+
)
226+
await transcripts_controller.update(transcript, {"workflow_run_id": None})
227+
228+
# Re-fetch and check for concurrent dispatch (optimistic approach).
229+
# No database lock - worst case is duplicate dispatch, but Hatchet
230+
# workflows are idempotent so this is acceptable.
231+
transcript = await transcripts_controller.get_by_id(config.transcript_id)
232+
if transcript and transcript.workflow_run_id:
233+
# Another process started a workflow between validation and now
234+
try:
235+
status = await HatchetClientManager.get_workflow_run_status(
236+
transcript.workflow_run_id
237+
)
238+
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
239+
logger.info(
240+
"Concurrent workflow detected, skipping dispatch",
241+
workflow_id=transcript.workflow_run_id,
261242
)
262-
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
263-
logger.info(
264-
"Concurrent workflow detected, skipping dispatch",
265-
workflow_id=transcript.workflow_run_id,
266-
)
267-
return None
268-
except ApiException:
269-
# Workflow might be gone (404) or API issue - proceed with new workflow
270-
pass
271-
272-
workflow_id = await HatchetClientManager.start_workflow(
273-
workflow_name="DiarizationPipeline",
274-
input_data={
275-
"recording_id": config.recording_id,
276-
"tracks": [{"s3_key": k} for k in config.track_keys],
277-
"bucket_name": config.bucket_name,
278-
"transcript_id": config.transcript_id,
279-
"room_id": config.room_id,
280-
},
281-
additional_metadata={
282-
"transcript_id": config.transcript_id,
283-
"recording_id": config.recording_id,
284-
"daily_recording_id": config.recording_id,
285-
},
286-
)
243+
return None
244+
except ApiException:
245+
# Workflow might be gone (404) or API issue - proceed with new workflow
246+
pass
247+
248+
workflow_id = await HatchetClientManager.start_workflow(
249+
workflow_name="DiarizationPipeline",
250+
input_data={
251+
"recording_id": config.recording_id,
252+
"tracks": [{"s3_key": k} for k in config.track_keys],
253+
"bucket_name": config.bucket_name,
254+
"transcript_id": config.transcript_id,
255+
"room_id": config.room_id,
256+
},
257+
additional_metadata={
258+
"transcript_id": config.transcript_id,
259+
"recording_id": config.recording_id,
260+
"daily_recording_id": config.recording_id,
261+
},
262+
)
287263

288-
if transcript:
289-
await transcripts_controller.update(
290-
transcript, {"workflow_run_id": workflow_id}
291-
)
264+
if transcript:
265+
await transcripts_controller.update(
266+
transcript, {"workflow_run_id": workflow_id}
267+
)
292268

293-
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
294-
return None
269+
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
270+
return None
295271

296-
# Celery pipeline (durable workflows disabled)
297-
return task_pipeline_multitrack_process.delay(
298-
transcript_id=config.transcript_id,
299-
bucket_name=config.bucket_name,
300-
track_keys=config.track_keys,
301-
)
302272
elif isinstance(config, FileProcessingConfig):
303273
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
304274
else:

server/reflector/settings.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from pydantic.types import PositiveInt
22
from pydantic_settings import BaseSettings, SettingsConfigDict
33

4-
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
4+
from reflector.schemas.platform import DAILY_PLATFORM, Platform
55
from reflector.utils.string import NonEmptyString
66

77

@@ -155,7 +155,7 @@ class Settings(BaseSettings):
155155
None # Webhook UUID for this environment. Not used by production code
156156
)
157157
# Platform Configuration
158-
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
158+
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
159159

160160
# Zulip integration
161161
ZULIP_REALM: str | None = None

0 commit comments

Comments
 (0)