Skip to content

Commit fd94e3c

Browse files
authored
Support Python 3.14 (#507)
Use uuidv7 by default in Python 3.14. Also fix an issue where local recovery is attempted while using a recovery service (Conductor or Cloud), which prints confusing "No workflows to recover" messages.
1 parent 876eafa commit fd94e3c

File tree

15 files changed

+780
-717
lines changed

15 files changed

+780
-717
lines changed

.github/workflows/unit-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
strategy:
2323
fail-fast: false
2424
matrix:
25-
python-version: ['3.10', '3.11', '3.12', '3.13']
25+
python-version: ['3.10', '3.11', '3.12', '3.13', '3.14']
2626
dbos-database: ['SQLITE', 'POSTGRES']
2727
services:
2828
# Postgres service container

dbos/_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import json
33
import time
4-
import uuid
54
from typing import (
65
TYPE_CHECKING,
76
Any,
@@ -20,6 +19,7 @@
2019
from dbos._app_db import ApplicationDatabase
2120
from dbos._context import MaxPriority, MinPriority
2221
from dbos._sys_db import SystemDatabase
22+
from dbos._utils import generate_uuid
2323

2424
if TYPE_CHECKING:
2525
from dbos._dbos import WorkflowHandle, WorkflowHandleAsync
@@ -188,7 +188,7 @@ def _enqueue(self, options: EnqueueOptions, *args: Any, **kwargs: Any) -> str:
188188
max_recovery_attempts = DEFAULT_MAX_RECOVERY_ATTEMPTS
189189
workflow_id = options.get("workflow_id")
190190
if workflow_id is None:
191-
workflow_id = str(uuid.uuid4())
191+
workflow_id = generate_uuid()
192192
workflow_timeout = options.get("workflow_timeout", None)
193193
enqueue_options_internal: EnqueueOptionsInternal = {
194194
"deduplication_id": options.get("deduplication_id"),
@@ -281,7 +281,7 @@ def send(
281281
topic: Optional[str] = None,
282282
idempotency_key: Optional[str] = None,
283283
) -> None:
284-
idempotency_key = idempotency_key if idempotency_key else str(uuid.uuid4())
284+
idempotency_key = idempotency_key if idempotency_key else generate_uuid()
285285
status: WorkflowStatusInternal = {
286286
"workflow_uuid": f"{destination_id}-{idempotency_key}",
287287
"status": WorkflowStatusString.SUCCESS.value,

dbos/_conductor/conductor.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import threading
33
import time
44
import traceback
5-
import uuid
65
from importlib.metadata import version
76
from typing import TYPE_CHECKING, Optional
87

@@ -11,7 +10,7 @@
1110
from websockets.sync.connection import Connection
1211

1312
from dbos._context import SetWorkflowID
14-
from dbos._utils import GlobalParams
13+
from dbos._utils import GlobalParams, generate_uuid
1514
from dbos._workflow_commands import (
1615
garbage_collect,
1716
get_workflow,
@@ -192,7 +191,7 @@ def run(self) -> None:
192191
fork_message = p.ForkWorkflowRequest.from_json(message)
193192
new_workflow_id = fork_message.body["new_workflow_id"]
194193
if new_workflow_id is None:
195-
new_workflow_id = str(uuid.uuid4())
194+
new_workflow_id = generate_uuid()
196195
workflow_id = fork_message.body["workflow_id"]
197196
start_step = fork_message.body["start_step"]
198197
app_version = fork_message.body["application_version"]

dbos/_context.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import json
44
import os
5-
import uuid
65
from contextlib import AbstractContextManager
76
from contextvars import ContextVar
87
from dataclasses import dataclass
@@ -15,7 +14,7 @@
1514

1615
from sqlalchemy.orm import Session
1716

18-
from dbos._utils import GlobalParams
17+
from dbos._utils import GlobalParams, generate_uuid
1918

2019
from ._logger import dbos_logger
2120
from ._tracer import dbos_tracer
@@ -151,7 +150,7 @@ def assign_workflow_id(self) -> str:
151150
self.logger.warning(
152151
f"Multiple workflows started in the same SetWorkflowID block. Only the first workflow is assigned the specified workflow ID; subsequent workflows will use a generated workflow ID."
153152
)
154-
wfid = str(uuid.uuid4())
153+
wfid = generate_uuid()
155154
return wfid
156155

157156
def start_workflow(

dbos/_dbos.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import sys
88
import threading
99
import time
10-
import uuid
1110
from concurrent.futures import ThreadPoolExecutor
1211
from logging import Logger
1312
from typing import (
@@ -33,7 +32,7 @@
3332
from dbos._debouncer import debouncer_workflow
3433
from dbos._serialization import DefaultSerializer, Serializer
3534
from dbos._sys_db import SystemDatabase, WorkflowStatus
36-
from dbos._utils import INTERNAL_QUEUE_NAME, GlobalParams
35+
from dbos._utils import INTERNAL_QUEUE_NAME, GlobalParams, generate_uuid
3736
from dbos._workflow_commands import fork_workflow, list_queued_workflows, list_workflows
3837

3938
from ._classproperty import classproperty
@@ -444,7 +443,7 @@ def _launch(self, *, debug_mode: bool = False) -> None:
444443
if GlobalParams.app_version == "":
445444
GlobalParams.app_version = self._registry.compute_app_version()
446445
if self.conductor_key is not None:
447-
GlobalParams.executor_id = str(uuid.uuid4())
446+
GlobalParams.executor_id = generate_uuid()
448447
dbos_logger.info(f"Executor ID: {GlobalParams.executor_id}")
449448
dbos_logger.info(f"Application version: {GlobalParams.app_version}")
450449
self._executor_field = ThreadPoolExecutor(max_workers=sys.maxsize)
@@ -496,20 +495,21 @@ def _launch(self, *, debug_mode: bool = False) -> None:
496495
except Exception as e:
497496
dbos_logger.warning(f"Failed to start admin server: {e}")
498497

499-
dbos_logger.debug("Retrieving local pending workflows for recovery")
500-
workflow_ids = self._sys_db.get_pending_workflows(
501-
GlobalParams.executor_id, GlobalParams.app_version
502-
)
503-
if (len(workflow_ids)) > 0:
504-
self.logger.info(
505-
f"Recovering {len(workflow_ids)} workflows from application version {GlobalParams.app_version}"
506-
)
507-
else:
508-
self.logger.info(
509-
f"No workflows to recover from application version {GlobalParams.app_version}"
498+
# Recover local workflows if not using a recovery service
499+
if not self.conductor_key and not GlobalParams.dbos_cloud:
500+
dbos_logger.debug("Retrieving local pending workflows for recovery")
501+
workflow_ids = self._sys_db.get_pending_workflows(
502+
GlobalParams.executor_id, GlobalParams.app_version
510503
)
511-
512-
self._executor.submit(startup_recovery_thread, self, workflow_ids)
504+
if (len(workflow_ids)) > 0:
505+
self.logger.info(
506+
f"Recovering {len(workflow_ids)} workflows from application version {GlobalParams.app_version}"
507+
)
508+
else:
509+
self.logger.info(
510+
f"No workflows to recover from application version {GlobalParams.app_version}"
511+
)
512+
self._executor.submit(startup_recovery_thread, self, workflow_ids)
513513

514514
# Listen to notifications
515515
dbos_logger.debug("Starting notifications listener thread")

dbos/_dbos_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ def load_config(
265265
]
266266

267267
data = cast(ConfigFile, data)
268-
return data # type: ignore
268+
return data
269269

270270

271271
def process_config(

dbos/_debouncer.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import math
33
import time
44
import types
5-
import uuid
65
from typing import (
76
TYPE_CHECKING,
87
Any,
@@ -39,7 +38,7 @@
3938
from dbos._queue import Queue
4039
from dbos._registrations import get_dbos_func_name
4140
from dbos._serialization import WorkflowInputs
42-
from dbos._utils import INTERNAL_QUEUE_NAME
41+
from dbos._utils import INTERNAL_QUEUE_NAME, generate_uuid
4342

4443
if TYPE_CHECKING:
4544
from dbos._dbos import WorkflowHandle, WorkflowHandleAsync
@@ -209,7 +208,7 @@ def debounce(
209208

210209
# Deterministically generate the user workflow ID and message ID
211210
def assign_debounce_ids() -> tuple[str, str]:
212-
return str(uuid.uuid4()), ctx.assign_workflow_id()
211+
return generate_uuid(), ctx.assign_workflow_id()
213212

214213
message_id, user_workflow_id = dbos._sys_db.call_function_as_step(
215214
assign_debounce_ids, "DBOS.assign_debounce_ids"
@@ -320,14 +319,14 @@ def debounce(
320319
"workflow_id": (
321320
self.workflow_options["workflow_id"]
322321
if self.workflow_options.get("workflow_id")
323-
else str(uuid.uuid4())
322+
else generate_uuid()
324323
),
325324
"app_version": self.workflow_options.get("app_version"),
326325
"deduplication_id": self.workflow_options.get("deduplication_id"),
327326
"priority": self.workflow_options.get("priority"),
328327
"workflow_timeout_sec": self.workflow_options.get("workflow_timeout"),
329328
}
330-
message_id = str(uuid.uuid4())
329+
message_id = generate_uuid()
331330
while True:
332331
try:
333332
# Attempt to enqueue a debouncer for this workflow.

dbos/_fastapi.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import uuid
21
from typing import Any, Callable, MutableMapping, cast
32

43
from fastapi import FastAPI
@@ -9,15 +8,15 @@
98
from . import DBOS
109
from ._context import EnterDBOSHandler, OperationType, SetWorkflowID, TracedAttributes
1110
from ._error import DBOSException
12-
from ._utils import request_id_header
11+
from ._utils import generate_uuid, request_id_header
1312

1413

1514
def _get_or_generate_request_id(request: FastAPIRequest) -> str:
1615
request_id = request.headers.get(request_id_header, None)
1716
if request_id is not None:
1817
return request_id
1918
else:
20-
return str(uuid.uuid4())
19+
return generate_uuid()
2120

2221

2322
async def _dbos_error_handler(request: FastAPIRequest, gexc: Exception) -> JSONResponse:

dbos/_flask.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
import uuid
21
from typing import Any
32
from urllib.parse import urlparse
43

54
from flask import Flask
65
from werkzeug.wrappers import Request as WRequest
76

87
from ._context import EnterDBOSHandler, OperationType, SetWorkflowID, TracedAttributes
9-
from ._utils import request_id_header
8+
from ._utils import generate_uuid, request_id_header
109

1110

1211
class FlaskMiddleware:
@@ -41,7 +40,7 @@ def _get_or_generate_request_id(request: WRequest) -> str:
4140
if request_id is not None:
4241
return request_id
4342
else:
44-
return str(uuid.uuid4())
43+
return generate_uuid()
4544

4645

4746
def setup_flask_middleware(app: Flask) -> None:

dbos/_utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import importlib.metadata
22
import os
3+
import sys
4+
import uuid
35

46
import psycopg
57
from sqlalchemy.exc import DBAPIError
@@ -12,6 +14,7 @@
1214
class GlobalParams:
1315
app_version: str = os.environ.get("DBOS__APPVERSION", "")
1416
executor_id: str = os.environ.get("DBOS__VMID", "local")
17+
dbos_cloud: bool = os.environ.get("DBOS__CLOUD") == "true"
1518
try:
1619
# Only works on Python >= 3.8
1720
dbos_version = importlib.metadata.version("dbos")
@@ -57,3 +60,10 @@ def retriable_sqlite_exception(e: Exception) -> bool:
5760
return True
5861
else:
5962
return False
63+
64+
65+
def generate_uuid() -> str:
66+
if sys.version_info >= (3, 14):
67+
return str(uuid.uuid7())
68+
else:
69+
return str(uuid.uuid4())

0 commit comments

Comments
 (0)