Skip to content

Commit bdc06fb

Browse files
Added PythonUDF helpers (#60)
* Add api for function signatures * Add get_function_info function * Add get_function_info and show/function_info endpoint * Add prefix / suffix for function names * added pythonudf helpers * fixed bug * added new env vars * added udf suffix * fixed pre commit checks --------- Co-authored-by: Kevin Smith <[email protected]>
1 parent 34a9644 commit bdc06fb

File tree

5 files changed

+101
-1
lines changed

5 files changed

+101
-1
lines changed

singlestoredb/apps/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from ._cloud_functions import run_function_app # noqa: F401
22
from ._dashboards import run_dashboard_app # noqa: F401
3+
from ._python_udfs import run_udf_app # noqa: F401

singlestoredb/apps/_config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ class AppConfig:
88
listen_port: int
99
base_url: str
1010
base_path: str
11+
notebook_server_id: str
1112
app_token: Optional[str]
1213
user_token: Optional[str]
1314
running_interactively: bool
1415
is_gateway_enabled: bool
16+
is_local_dev: bool
1517

1618
@staticmethod
1719
def _read_variable(name: str) -> str:
@@ -28,6 +30,8 @@ def from_env(cls) -> 'AppConfig':
2830
port = cls._read_variable('SINGLESTOREDB_APP_LISTEN_PORT')
2931
base_url = cls._read_variable('SINGLESTOREDB_APP_BASE_URL')
3032
base_path = cls._read_variable('SINGLESTOREDB_APP_BASE_PATH')
33+
notebook_server_id = cls._read_variable('SINGLESTOREDB_NOTEBOOK_SERVER_ID')
34+
is_local_dev_env_var = cls._read_variable('SINGLESTOREDB_IS_LOCAL_DEV')
3135

3236
workload_type = os.environ.get('SINGLESTOREDB_WORKLOAD_TYPE')
3337
running_interactively = workload_type == 'InteractiveNotebook'
@@ -49,10 +53,12 @@ def from_env(cls) -> 'AppConfig':
4953
listen_port=int(port),
5054
base_url=base_url,
5155
base_path=base_path,
56+
notebook_server_id=notebook_server_id,
5257
app_token=app_token,
5358
user_token=user_token,
5459
running_interactively=running_interactively,
5560
is_gateway_enabled=is_gateway_enabled,
61+
is_local_dev=is_local_dev_env_var == 'true',
5662
)
5763

5864
@property

singlestoredb/apps/_connection_info.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from dataclasses import dataclass
2+
from typing import Any
3+
from typing import Dict
24
from typing import Optional
35

46

@@ -8,3 +10,9 @@ class ConnectionInfo:
810

911
# Only present in interactive mode
1012
token: Optional[str]
13+
14+
15+
@dataclass
16+
class UdfConnectionInfo:
17+
url: str
18+
functions: Dict[str, Any]

singlestoredb/apps/_python_udfs.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import asyncio
2+
import os
3+
import typing
4+
5+
from ..functions.ext.asgi import Application
6+
from ._config import AppConfig
7+
from ._connection_info import UdfConnectionInfo
8+
from ._process import kill_process_by_port
9+
10+
if typing.TYPE_CHECKING:
11+
from ._uvicorn_util import AwaitableUvicornServer
12+
13+
# Keep track of currently running server
14+
_running_server: 'typing.Optional[AwaitableUvicornServer]' = None
15+
16+
17+
async def run_udf_app(
18+
replace_existing: bool,
19+
log_level: str = 'error',
20+
kill_existing_app_server: bool = True,
21+
) -> UdfConnectionInfo:
22+
global _running_server
23+
from ._uvicorn_util import AwaitableUvicornServer
24+
25+
try:
26+
import uvicorn
27+
except ImportError:
28+
raise ImportError('package uvicorn is required to run python udfs')
29+
30+
app_config = AppConfig.from_env()
31+
32+
if kill_existing_app_server:
33+
# Shutdown the server gracefully if it was started by us.
34+
# Since the uvicorn server doesn't start a new subprocess
35+
# killing the process would result in kernel dying.
36+
if _running_server is not None:
37+
await _running_server.shutdown()
38+
_running_server = None
39+
40+
# Kill if any other process is occupying the port
41+
kill_process_by_port(app_config.listen_port)
42+
43+
base_url = generate_base_url(app_config)
44+
45+
udf_suffix = ''
46+
if app_config.running_interactively:
47+
udf_suffix = '_test'
48+
app = Application(url=base_url, app_mode='managed', name_suffix=udf_suffix)
49+
50+
config = uvicorn.Config(
51+
app,
52+
host='0.0.0.0',
53+
port=app_config.listen_port,
54+
log_level=log_level,
55+
)
56+
_running_server = AwaitableUvicornServer(config)
57+
58+
# Register the functions
59+
app.register_functions(replace=replace_existing)
60+
61+
asyncio.create_task(_running_server.serve())
62+
await _running_server.wait_for_startup()
63+
64+
print(f'Python UDF registered at {base_url}')
65+
66+
return UdfConnectionInfo(base_url, app.get_function_info())
67+
68+
69+
def generate_base_url(app_config: AppConfig) -> str:
70+
if not app_config.is_gateway_enabled:
71+
raise RuntimeError('Python UDFs are not available if Nova Gateway is not enabled')
72+
73+
if not app_config.running_interactively:
74+
return app_config.base_url
75+
76+
# generate python udf endpoint for interactive notebooks
77+
gateway_url = os.environ.get('SINGLESTOREDB_NOVA_GATEWAY_ENDPOINT')
78+
if app_config.is_local_dev:
79+
gateway_url = os.environ.get('SINGLESTOREDB_NOVA_GATEWAY_DEV_ENDPOINT')
80+
if gateway_url is None:
81+
raise RuntimeError(
82+
'Missing SINGLESTOREDB_NOVA_GATEWAY_DEV_ENDPOINT environment variable.',
83+
)
84+
85+
return f'{gateway_url}/pythonudfs/{app_config.notebook_server_id}/interactive/'

singlestoredb/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@
317317
'external_function.app_mode', 'string',
318318
functools.partial(
319319
check_str,
320-
valid_values=['remote', 'collocated'],
320+
valid_values=['remote', 'collocated', 'managed'],
321321
),
322322
'remote',
323323
'Specifies the mode of operation of the external function application.',

0 commit comments

Comments
 (0)