Skip to content

Commit 8cb66bd

Browse files
authored
feat(app-namespace): support app namespace (#498)
1 parent f368a9a commit 8cb66bd

File tree

10 files changed

+85
-33
lines changed

10 files changed

+85
-33
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
# Postgres database address for cocoindex
22
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
3+
COCOINDEX_APP_NAMESPACE=Dev0

python/cocoindex/cli.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def show(flow_name: str | None, color: bool, verbose: bool):
6565

6666
console.print()
6767
table = Table(
68-
title=f"Schema for Flow: {flow.name}",
68+
title=f"Schema for Flow: {flow.full_name}",
6969
show_header=True,
7070
header_style="bold magenta"
7171
)
@@ -108,7 +108,7 @@ def drop(flow_name: tuple[str, ...], drop_all: bool):
108108
if drop_all:
109109
flow_names = flow_names_with_setup()
110110
elif len(flow_name) == 0:
111-
flow_names = [fl.name for fl in flow.flows()]
111+
flow_names = flow.flow_names()
112112
else:
113113
flow_names = list(flow_name)
114114
setup_status = drop_setup(flow_names)
@@ -160,7 +160,7 @@ def evaluate(flow_name: str | None, output_dir: str | None, cache: bool = True):
160160
"""
161161
fl = _flow_by_name(flow_name)
162162
if output_dir is None:
163-
output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}"
163+
output_dir = f"eval_{setting.get_app_namespace(trailing_delimiter='_')}{flow_name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}"
164164
options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=cache)
165165
fl.evaluate_and_dump(options)
166166

python/cocoindex/flow.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from . import _engine
2020
from . import index
2121
from . import op
22+
from . import setting
2223
from .convert import dump_engine_object
2324
from .typing import encode_enriched_type
2425
from .runtime import execution_context
@@ -310,7 +311,7 @@ class _FlowBuilderState:
310311

311312
def __init__(self, /, name: str | None = None):
312313
flow_name = _flow_name_builder.build_name(name, prefix="_flow_")
313-
self.engine_flow_builder = _engine.FlowBuilder(flow_name)
314+
self.engine_flow_builder = _engine.FlowBuilder(get_full_flow_name(flow_name))
314315
self.field_name_builder = _NameBuilder()
315316

316317
def get_data_slice(self, v: Any) -> _engine.DataSlice:
@@ -481,7 +482,7 @@ def _render_spec(self, verbose: bool = False) -> Tree:
481482
Render the flow spec as a styled rich Tree with hierarchical structure.
482483
"""
483484
spec = self._get_spec(verbose=verbose)
484-
tree = Tree(f"Flow: {self.name}", style="cyan")
485+
tree = Tree(f"Flow: {self.full_name}", style="cyan")
485486

486487
def build_tree(label: str, lines: list):
487488
node = Tree(label, style="bold magenta" if lines else "cyan")
@@ -508,9 +509,9 @@ def __repr__(self):
508509
return repr(self._lazy_engine_flow())
509510

510511
@property
511-
def name(self) -> str:
512+
def full_name(self) -> str:
512513
"""
513-
Get the name of the flow.
514+
Get the full name of the flow.
514515
"""
515516
return self._lazy_engine_flow().name()
516517

@@ -566,8 +567,16 @@ def _create_engine_flow() -> _engine.Flow:
566567
_flows_lock = Lock()
567568
_flows: dict[str, Flow] = {}
568569

570+
def get_full_flow_name(name: str) -> str:
571+
"""
572+
Get the full name of a flow.
573+
"""
574+
return f"{setting.get_app_namespace(trailing_delimiter='.')}{name}"
575+
569576
def add_flow_def(name: str, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow:
570577
"""Add a flow definition to the cocoindex library."""
578+
if not all(c.isalnum() or c == '_' for c in name):
579+
raise ValueError(f"Flow name '{name}' contains invalid characters. Only alphanumeric characters and underscores are allowed.")
571580
with _flows_lock:
572581
if name in _flows:
573582
raise KeyError(f"Flow with name {name} already exists")
@@ -587,12 +596,12 @@ def flow_names() -> list[str]:
587596
with _flows_lock:
588597
return list(_flows.keys())
589598

590-
def flows() -> list[Flow]:
599+
def flows() -> dict[str, Flow]:
591600
"""
592601
Get all flows.
593602
"""
594603
with _flows_lock:
595-
return list(_flows.values())
604+
return dict(_flows)
596605

597606
def flow_by_name(name: str) -> Flow:
598607
"""
@@ -605,14 +614,13 @@ def ensure_all_flows_built() -> None:
605614
"""
606615
Ensure all flows are built.
607616
"""
608-
for fl in flows():
609-
fl.internal_flow()
617+
execution_context.run(ensure_all_flows_built_async())
610618

611619
async def ensure_all_flows_built_async() -> None:
612620
"""
613621
Ensure all flows are built.
614622
"""
615-
for fl in flows():
623+
for fl in flows().values():
616624
await fl.internal_flow_async()
617625

618626
def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]:
@@ -626,13 +634,13 @@ async def update_all_flows_async(options: FlowLiveUpdaterOptions) -> dict[str, _
626634
Update all flows.
627635
"""
628636
await ensure_all_flows_built_async()
629-
async def _update_flow(fl: Flow) -> _engine.IndexUpdateInfo:
637+
async def _update_flow(name: str, fl: Flow) -> tuple[str, _engine.IndexUpdateInfo]:
630638
async with FlowLiveUpdater(fl, options) as updater:
631639
await updater.wait_async()
632-
return updater.update_stats()
640+
return (name, updater.update_stats())
633641
fls = flows()
634-
all_stats = await asyncio.gather(*(_update_flow(fl) for fl in fls))
635-
return {fl.name: stats for fl, stats in zip(fls, all_stats)}
642+
all_stats = await asyncio.gather(*(_update_flow(name, fl) for (name, fl) in fls.items()))
643+
return dict(all_stats)
636644

637645
_transient_flow_name_builder = _NameBuilder()
638646
class TransientFlow:

python/cocoindex/lib.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
def init(settings: setting.Settings):
1616
"""Initialize the cocoindex library."""
1717
_engine.init(dump_engine_object(settings))
18+
setting.set_app_namespace(settings.app_namespace)
1819

1920

2021
def start_server(settings: setting.ServerSettings):

python/cocoindex/setting.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,25 @@
66
from typing import Callable, Self, Any, overload
77
from dataclasses import dataclass
88

9+
_app_namespace: str = ''
10+
11+
def get_app_namespace(*, trailing_delimiter: str | None = None) -> str:
12+
"""Get the application namespace. Append the `trailing_delimiter` if not empty."""
13+
if _app_namespace == '' or trailing_delimiter is None:
14+
return _app_namespace
15+
return f'{_app_namespace}{trailing_delimiter}'
16+
17+
def split_app_namespace(full_name: str, delimiter: str) -> tuple[str, str]:
18+
"""Split the full name into the application namespace and the rest."""
19+
parts = full_name.split(delimiter, 1)
20+
if len(parts) == 1:
21+
return '', parts[0]
22+
return (parts[0], parts[1])
23+
24+
def set_app_namespace(app_namespace: str):
25+
"""Set the application namespace."""
26+
global _app_namespace # pylint: disable=global-statement
27+
_app_namespace = app_namespace
928

1029
@dataclass
1130
class DatabaseConnectionSpec:
@@ -30,6 +49,7 @@ def _load_field(target: dict[str, Any], name: str, env_name: str, required: bool
3049
class Settings:
3150
"""Settings for the cocoindex library."""
3251
database: DatabaseConnectionSpec
52+
app_namespace: str
3353

3454
@classmethod
3555
def from_env(cls) -> Self:
@@ -40,7 +60,10 @@ def from_env(cls) -> Self:
4060
_load_field(db_kwargs, "user", "COCOINDEX_DATABASE_USER")
4161
_load_field(db_kwargs, "password", "COCOINDEX_DATABASE_PASSWORD")
4262
database = DatabaseConnectionSpec(**db_kwargs)
43-
return cls(database=database)
63+
64+
app_namespace = os.getenv("COCOINDEX_APP_NAMESPACE", '')
65+
66+
return cls(database=database, app_namespace=app_namespace)
4467

4568
@dataclass
4669
class ServerSettings:

python/cocoindex/setup.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from . import flow
2+
from . import setting
23
from . import _engine
34

45
def sync_setup() -> _engine.SetupStatus:
@@ -7,10 +8,15 @@ def sync_setup() -> _engine.SetupStatus:
78

89
def drop_setup(flow_names: list[str]) -> _engine.SetupStatus:
910
flow.ensure_all_flows_built()
10-
return _engine.drop_setup(flow_names)
11+
return _engine.drop_setup([flow.get_full_flow_name(name) for name in flow_names])
1112

1213
def flow_names_with_setup() -> list[str]:
13-
return _engine.flow_names_with_setup()
14+
result = []
15+
for name in _engine.flow_names_with_setup():
16+
app_namespace, name = setting.split_app_namespace(name, '.')
17+
if app_namespace == setting.get_app_namespace():
18+
result.append(name)
19+
return result
1420

1521
def apply_setup_changes(setup_status: _engine.SetupStatus):
1622
_engine.apply_setup_changes(setup_status)

src/execution/db_tracking_setup.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@ use serde::{Deserialize, Serialize};
55
use sqlx::PgPool;
66

77
pub fn default_tracking_table_name(flow_name: &str) -> String {
8-
let sanitized_name = flow_name
9-
.chars()
10-
.map(|c| if c.is_alphanumeric() { c } else { '_' })
11-
.collect::<String>();
12-
format!("{}__cocoindex_tracking", sanitized_name)
8+
format!(
9+
"{}__cocoindex_tracking",
10+
utils::db::sanitize_identifier(flow_name)
11+
)
1312
}
1413

1514
pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1;

src/ops/storages/postgres.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use futures::FutureExt;
1111
use indexmap::{IndexMap, IndexSet};
1212
use itertools::Itertools;
1313
use serde::Serialize;
14-
use sqlx::postgres::types::PgRange;
1514
use sqlx::postgres::PgRow;
15+
use sqlx::postgres::types::PgRange;
1616
use sqlx::{PgPool, Row};
1717
use std::ops::Bound;
1818
use uuid::Uuid;
@@ -934,10 +934,12 @@ impl StorageFactoryBase for Factory {
934934
.map(|d| {
935935
let table_id = TableId {
936936
database: d.spec.database.clone(),
937-
table_name: d
938-
.spec
939-
.table_name
940-
.unwrap_or_else(|| format!("{}__{}", context.flow_instance_name, d.name)),
937+
table_name: d.spec.table_name.unwrap_or_else(|| {
938+
utils::db::sanitize_identifier(&format!(
939+
"{}__{}",
940+
context.flow_instance_name, d.name
941+
))
942+
}),
941943
};
942944
let setup_state = SetupState::new(
943945
&table_id,

src/prelude.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
pub(crate) use anyhow::{Context, Result};
44
pub(crate) use async_trait::async_trait;
55
pub(crate) use chrono::{DateTime, Utc};
6+
pub(crate) use futures::{FutureExt, StreamExt};
67
pub(crate) use futures::{
78
future::{BoxFuture, Shared},
89
prelude::*,
910
stream::BoxStream,
1011
};
11-
pub(crate) use futures::{FutureExt, StreamExt};
1212
pub(crate) use indexmap::{IndexMap, IndexSet};
1313
pub(crate) use itertools::Itertools;
14-
pub(crate) use serde::{de::DeserializeOwned, Deserialize, Serialize};
14+
pub(crate) use serde::{Deserialize, Serialize, de::DeserializeOwned};
1515
pub(crate) use std::any::Any;
1616
pub(crate) use std::borrow::Cow;
1717
pub(crate) use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
@@ -21,11 +21,11 @@ pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak};
2121
pub(crate) use crate::base::{self, schema, spec, value};
2222
pub(crate) use crate::builder::{self, plan};
2323
pub(crate) use crate::execution;
24-
pub(crate) use crate::lib_context::{get_lib_context, get_runtime, FlowContext, LibContext};
24+
pub(crate) use crate::lib_context::{FlowContext, LibContext, get_lib_context, get_runtime};
2525
pub(crate) use crate::ops::interface;
2626
pub(crate) use crate::service::error::ApiError;
2727
pub(crate) use crate::setup::AuthRegistry;
28-
pub(crate) use crate::utils::retryable;
28+
pub(crate) use crate::utils::{self, retryable};
2929

3030
pub(crate) use crate::{api_bail, api_error};
3131

src/utils/db.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,15 @@ pub enum WriteAction {
3131
Insert,
3232
Update,
3333
}
34+
35+
pub fn sanitize_identifier(s: &str) -> String {
36+
let mut result = String::new();
37+
for c in s.chars() {
38+
if c.is_alphanumeric() || c == '_' {
39+
result.push(c);
40+
} else {
41+
result.push_str("__");
42+
}
43+
}
44+
result
45+
}

0 commit comments

Comments
 (0)