Skip to content

Commit 4eed711

Browse files
authored
Python Fixes (#493)
- Fix an issue where DBOS would not create a system database with special characters in its name - Fix typing for async scheduled and Kafka workflows - Improve error messages for unserializable workflow inputs and outputs and step outputs - Fix an issue where DBOS does not reconnect after losing its connection to Kafka
1 parent 5b2b5ff commit 4eed711

File tree

10 files changed

+186
-45
lines changed

10 files changed

+186
-45
lines changed

dbos/_dbos_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,7 @@ def configure_db_engine_parameters(
444444

445445
# Configure user database engine parameters
446446
app_engine_kwargs: dict[str, Any] = {
447+
"connect_args": {"application_name": "dbos_transact"},
447448
"pool_timeout": 30,
448449
"max_overflow": 0,
449450
"pool_size": 20,

dbos/_kafka.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import re
22
import threading
3-
from typing import TYPE_CHECKING, Any, Callable, NoReturn
3+
from typing import TYPE_CHECKING, Any, Callable, Coroutine, NoReturn
44

55
from confluent_kafka import Consumer, KafkaError, KafkaException
66

@@ -15,7 +15,9 @@
1515
from ._logger import dbos_logger
1616
from ._registrations import get_dbos_func_name
1717

18-
_KafkaConsumerWorkflow = Callable[[KafkaMessage], None]
18+
_KafkaConsumerWorkflow = (
19+
Callable[[KafkaMessage], None] | Callable[[KafkaMessage], Coroutine[Any, Any, None]]
20+
)
1921

2022
_kafka_queue: Queue
2123
_in_order_kafka_queues: dict[str, Queue] = {}
@@ -37,8 +39,8 @@ def _kafka_consumer_loop(
3739
in_order: bool,
3840
) -> None:
3941

40-
def on_error(err: KafkaError) -> NoReturn:
41-
raise KafkaException(err)
42+
def on_error(err: KafkaError) -> None:
43+
dbos_logger.error(f"Exception in Kafka consumer: {err}")
4244

4345
config["error_cb"] = on_error
4446
if "auto.offset.reset" not in config:

dbos/_scheduler.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import threading
33
import traceback
44
from datetime import datetime, timezone
5-
from typing import TYPE_CHECKING, Callable
5+
from typing import TYPE_CHECKING, Any, Callable, Coroutine
66

77
from ._logger import dbos_logger
88
from ._queue import Queue
@@ -14,7 +14,10 @@
1414
from ._croniter import croniter # type: ignore
1515
from ._registrations import get_dbos_func_name
1616

17-
ScheduledWorkflow = Callable[[datetime, datetime], None]
17+
ScheduledWorkflow = (
18+
Callable[[datetime, datetime], None]
19+
| Callable[[datetime, datetime], Coroutine[Any, Any, None]]
20+
)
1821

1922

2023
def scheduler_loop(

dbos/_serialization.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@ def deserialize(cls, serialized_data: str) -> Any:
2525
class DefaultSerializer(Serializer):
2626

2727
def serialize(self, data: Any) -> str:
28-
pickled_data: bytes = pickle.dumps(data)
29-
encoded_data: str = base64.b64encode(pickled_data).decode("utf-8")
30-
return encoded_data
28+
try:
29+
pickled_data: bytes = pickle.dumps(data)
30+
encoded_data: str = base64.b64encode(pickled_data).decode("utf-8")
31+
return encoded_data
32+
except Exception as e:
33+
dbos_logger.error(f"Error serializing object: {data}", exc_info=e)
34+
raise
3135

3236
def deserialize(cls, serialized_data: str) -> Any:
3337
pickled_data: bytes = base64.b64decode(serialized_data)

dbos/_sys_db_postgres.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def run_migrations(self) -> None:
4141
parameters={"db_name": sysdb_name},
4242
).scalar():
4343
dbos_logger.info(f"Creating system database {sysdb_name}")
44-
conn.execute(sa.text(f"CREATE DATABASE {sysdb_name}"))
44+
conn.execute(sa.text(f'CREATE DATABASE "{sysdb_name}"'))
4545
engine.dispose()
4646
else:
4747
# If we were provided an engine, validate it can connect

tests/test_config.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -209,15 +209,15 @@ def test_process_config_full():
209209
"max_overflow": 0,
210210
"pool_size": 20,
211211
"pool_pre_ping": True,
212-
"connect_args": {"connect_timeout": 1},
212+
"connect_args": {"connect_timeout": 1, "application_name": "dbos_transact"},
213213
}
214214
assert configFile["database"]["sys_db_engine_kwargs"] == {
215215
"key": "value",
216216
"pool_timeout": 30,
217217
"max_overflow": 0,
218218
"pool_size": 27,
219219
"pool_pre_ping": True,
220-
"connect_args": {"connect_timeout": 1},
220+
"connect_args": {"connect_timeout": 1, "application_name": "dbos_transact"},
221221
}
222222
assert configFile["runtimeConfig"]["start"] == ["python3 main.py"]
223223
assert configFile["runtimeConfig"]["admin_port"] == 8001
@@ -255,15 +255,15 @@ def test_process_config_system_database():
255255
"max_overflow": 0,
256256
"pool_size": 20,
257257
"pool_pre_ping": True,
258-
"connect_args": {"connect_timeout": 1},
258+
"connect_args": {"connect_timeout": 1, "application_name": "dbos_transact"},
259259
}
260260
assert configFile["database"]["sys_db_engine_kwargs"] == {
261261
"key": "value",
262262
"pool_timeout": 30,
263263
"max_overflow": 0,
264264
"pool_size": 27,
265265
"pool_pre_ping": True,
266-
"connect_args": {"connect_timeout": 1},
266+
"connect_args": {"connect_timeout": 1, "application_name": "dbos_transact"},
267267
}
268268

269269

@@ -397,14 +397,14 @@ def test_configure_db_engine_parameters_defaults():
397397
"max_overflow": 0,
398398
"pool_size": 20,
399399
"pool_pre_ping": True,
400-
"connect_args": {"connect_timeout": 10},
400+
"connect_args": {"connect_timeout": 10, "application_name": "dbos_transact"},
401401
}
402402
assert data["sys_db_engine_kwargs"] == {
403403
"pool_timeout": 30,
404404
"max_overflow": 0,
405405
"pool_size": 20,
406406
"pool_pre_ping": True,
407-
"connect_args": {"connect_timeout": 10},
407+
"connect_args": {"connect_timeout": 10, "application_name": "dbos_transact"},
408408
}
409409

410410

@@ -419,14 +419,14 @@ def test_configure_db_engine_parameters_custom_sys_db_pool_sizes():
419419
"max_overflow": 0,
420420
"pool_size": 20,
421421
"pool_pre_ping": True,
422-
"connect_args": {"connect_timeout": 10},
422+
"connect_args": {"connect_timeout": 10, "application_name": "dbos_transact"},
423423
}
424424
assert data["sys_db_engine_kwargs"] == {
425425
"pool_timeout": 30,
426426
"max_overflow": 0,
427427
"pool_size": 35,
428428
"pool_pre_ping": True,
429-
"connect_args": {"connect_timeout": 10},
429+
"connect_args": {"connect_timeout": 10, "application_name": "dbos_transact"},
430430
}
431431

432432

@@ -440,7 +440,11 @@ def test_configure_db_engine_parameters_user_kwargs_override():
440440
"pool_pre_ping": True,
441441
"custom_param": "value",
442442
"pool_size": 50,
443-
"connect_args": {"connect_timeout": 30, "key": "value"},
443+
"connect_args": {
444+
"connect_timeout": 30,
445+
"key": "value",
446+
"application_name": "dbos_transact",
447+
},
444448
},
445449
}
446450

@@ -453,7 +457,11 @@ def test_configure_db_engine_parameters_user_kwargs_override():
453457
"pool_pre_ping": True,
454458
"custom_param": "value",
455459
"pool_size": 50,
456-
"connect_args": {"connect_timeout": 30, "key": "value"},
460+
"connect_args": {
461+
"connect_timeout": 30,
462+
"key": "value",
463+
"application_name": "dbos_transact",
464+
},
457465
}
458466

459467
# System engine kwargs should use system pool size but same user overrides
@@ -463,7 +471,11 @@ def test_configure_db_engine_parameters_user_kwargs_override():
463471
"pool_pre_ping": True,
464472
"custom_param": "value",
465473
"pool_size": 35,
466-
"connect_args": {"connect_timeout": 30, "key": "value"},
474+
"connect_args": {
475+
"connect_timeout": 30,
476+
"key": "value",
477+
"application_name": "dbos_transact",
478+
},
467479
}
468480

469481

@@ -487,7 +499,7 @@ def test_configure_db_engine_parameters_user_kwargs_and_db_url_connect_timeout()
487499
"pool_pre_ping": True,
488500
"custom_param": "value",
489501
"pool_size": 50,
490-
"connect_args": {"connect_timeout": 22},
502+
"connect_args": {"connect_timeout": 22, "application_name": "dbos_transact"},
491503
}
492504

493505
# System engine kwargs should use system pool size but same user overrides
@@ -497,7 +509,7 @@ def test_configure_db_engine_parameters_user_kwargs_and_db_url_connect_timeout()
497509
"pool_pre_ping": True,
498510
"custom_param": "value",
499511
"pool_size": 50,
500-
"connect_args": {"connect_timeout": 22},
512+
"connect_args": {"connect_timeout": 22, "application_name": "dbos_transact"},
501513
}
502514

503515

@@ -556,7 +568,7 @@ def test_configure_db_engine_parameters_user_kwargs_mixed_params():
556568
"pool_pre_ping": True,
557569
"custom_param": "value",
558570
"pool_size": 50,
559-
"connect_args": {"connect_timeout": 10},
571+
"connect_args": {"connect_timeout": 10, "application_name": "dbos_transact"},
560572
}
561573

562574
# System engine kwargs should use system pool size but same user overrides
@@ -566,7 +578,7 @@ def test_configure_db_engine_parameters_user_kwargs_mixed_params():
566578
"pool_pre_ping": True,
567579
"custom_param": "value",
568580
"pool_size": 50,
569-
"connect_args": {"connect_timeout": 10},
581+
"connect_args": {"connect_timeout": 10, "application_name": "dbos_transact"},
570582
}
571583

572584

@@ -581,14 +593,14 @@ def test_configure_db_engine_parameters_empty_user_kwargs():
581593
"max_overflow": 0,
582594
"pool_size": 20,
583595
"pool_pre_ping": True,
584-
"connect_args": {"connect_timeout": 10},
596+
"connect_args": {"connect_timeout": 10, "application_name": "dbos_transact"},
585597
}
586598
assert data["sys_db_engine_kwargs"] == {
587599
"pool_timeout": 30,
588600
"max_overflow": 0,
589601
"pool_size": 20,
590602
"pool_pre_ping": True,
591-
"connect_args": {"connect_timeout": 10},
603+
"connect_args": {"connect_timeout": 10, "application_name": "dbos_transact"},
592604
}
593605

594606

tests/test_dbos.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from dbos._schemas.system_database import SystemSchema
3838
from dbos._sys_db import GetWorkflowsInput
3939
from dbos._utils import GlobalParams
40+
from tests.conftest import using_sqlite
4041

4142

4243
def test_simple_workflow(dbos: DBOS) -> None:
@@ -1796,6 +1797,65 @@ def transaction() -> None:
17961797
assert s["function_name"] == step.__qualname__
17971798

17981799

1800+
def test_custom_database(
1801+
config: DBOSConfig, db_engine: sa.Engine, cleanup_test_databases: None
1802+
) -> None:
1803+
DBOS.destroy(destroy_registry=True)
1804+
assert config["system_database_url"]
1805+
custom_database = "F8nny_dAtaB@s3@[email protected]"
1806+
url = sa.make_url(config["system_database_url"])
1807+
url = url.set(database=custom_database)
1808+
config["system_database_url"] = url.render_as_string(hide_password=False)
1809+
# Destroy the database if it exists
1810+
if using_sqlite():
1811+
parsed_url = sa.make_url(config["system_database_url"])
1812+
db_path = parsed_url.database
1813+
assert db_path is not None
1814+
if os.path.exists(db_path):
1815+
os.remove(db_path)
1816+
else:
1817+
with db_engine.connect() as connection:
1818+
connection.execution_options(isolation_level="AUTOCOMMIT")
1819+
connection.execute(
1820+
sa.text(f'DROP DATABASE IF EXISTS "{custom_database}" WITH (FORCE)')
1821+
)
1822+
DBOS(config=config)
1823+
DBOS.launch()
1824+
1825+
key = "key"
1826+
val = "val"
1827+
1828+
@DBOS.transaction()
1829+
def transaction() -> None:
1830+
return
1831+
1832+
@DBOS.workflow()
1833+
def recv_workflow() -> Any:
1834+
transaction()
1835+
DBOS.set_event(key, val)
1836+
return DBOS.recv()
1837+
1838+
handle = DBOS.start_workflow(recv_workflow)
1839+
assert DBOS.get_event(handle.workflow_id, key) == val
1840+
DBOS.send(handle.workflow_id, val)
1841+
assert handle.get_result() == val
1842+
assert len(DBOS.list_workflows()) == 2
1843+
steps = DBOS.list_workflow_steps(handle.workflow_id)
1844+
assert len(steps) == 4
1845+
assert "transaction" in steps[0]["function_name"]
1846+
DBOS.destroy(destroy_registry=True)
1847+
1848+
# Test custom database with client
1849+
client = DBOSClient(
1850+
system_database_url=config["system_database_url"],
1851+
application_database_url=config["application_database_url"],
1852+
)
1853+
assert len(client.list_workflows()) == 2
1854+
steps = client.list_workflow_steps(handle.workflow_id)
1855+
assert len(steps) == 4
1856+
assert "transaction" in steps[0]["function_name"]
1857+
1858+
17991859
def test_custom_schema(
18001860
config: DBOSConfig, cleanup_test_databases: None, skip_with_sqlite: None
18011861
) -> None:

tests/test_failures.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import threading
22
import time
33
import uuid
4-
from typing import cast
4+
from typing import Any, Generator, cast
55

66
import pytest
77
import sqlalchemy as sa
@@ -553,3 +553,16 @@ def workflow() -> None:
553553

554554
with pytest.raises(DBOSWorkflowFunctionNotFoundError):
555555
DBOS._recover_pending_workflows()
556+
557+
558+
def test_nonserializable_return(dbos: DBOS) -> None:
559+
@DBOS.step()
560+
def step() -> Generator[str, Any, None]:
561+
yield "val"
562+
563+
@DBOS.workflow()
564+
def workflow() -> None:
565+
step()
566+
567+
with pytest.raises(TypeError):
568+
workflow()

0 commit comments

Comments
 (0)