From 99512570d44b66109ca58bd65bd4c1110585acd3 Mon Sep 17 00:00:00 2001 From: Jenny Date: Wed, 19 Nov 2025 19:08:52 -0800 Subject: [PATCH 01/14] postgresstore sdk with lazy imports --- .../src/databricks_langchain/__init__.py | 2 + .../src/databricks_langchain/store.py | 49 ++++++++++++++ .../langchain/tests/unit_tests/test_store.py | 67 +++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 integrations/langchain/src/databricks_langchain/store.py create mode 100644 integrations/langchain/tests/unit_tests/test_store.py diff --git a/integrations/langchain/src/databricks_langchain/__init__.py b/integrations/langchain/src/databricks_langchain/__init__.py index eb5fca3d..cf751ca9 100644 --- a/integrations/langchain/src/databricks_langchain/__init__.py +++ b/integrations/langchain/src/databricks_langchain/__init__.py @@ -21,6 +21,7 @@ from databricks_langchain.checkpoint import CheckpointSaver from databricks_langchain.embeddings import DatabricksEmbeddings from databricks_langchain.genie import GenieAgent +from databricks_langchain.store import DatabricksStore from databricks_langchain.vector_search_retriever_tool import VectorSearchRetrieverTool from databricks_langchain.vectorstores import DatabricksVectorSearch @@ -29,6 +30,7 @@ "ChatDatabricks", "CheckpointSaver", "DatabricksEmbeddings", + "DatabricksStore", "DatabricksVectorSearch", "GenieAgent", "VectorSearchRetrieverTool", diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py new file mode 100644 index 00000000..b4402c15 --- /dev/null +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from databricks.sdk import WorkspaceClient + +try: + from databricks_ai_bridge.lakebase import LakebasePool + from langgraph.store.postgres import PostgresStore + + _store_imports_available = True +except ImportError: + PostgresSaver = object + _store_imports_available = False + + +class DatabricksStore(PostgresStore): + """ + LangGraph PostgresStore using a Lakebase connection pool. + instance_name: Name of Lakebase Instance + """ + + def __init__( + self, + *, + instance_name: str, + workspace_client: WorkspaceClient | None = None, + **pool_kwargs: object, + ) -> None: + # Lazy imports + if not _store_imports_available: + raise ImportError( + "DatabricksStore requires databricks-langchain[memory]. " + "Please install with: pip install databricks-langchain[memory]" + ) + + self._lakebase = LakebasePool( + instance_name=instance_name, + workspace_client=workspace_client, + **dict(pool_kwargs), + ) + super().__init__(self._lakebase.pool) + + def __enter__(self): + """Enter context manager.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit context manager and close the connection pool.""" + self._lakebase.close() + return False diff --git a/integrations/langchain/tests/unit_tests/test_store.py b/integrations/langchain/tests/unit_tests/test_store.py new file mode 100644 index 00000000..860babca --- /dev/null +++ b/integrations/langchain/tests/unit_tests/test_store.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +pytest.importorskip("psycopg") +pytest.importorskip("psycopg_pool") + +from databricks_ai_bridge import lakebase + +from databricks_langchain import DatabricksStore + + +class TestConnectionPool: + def __init__(self, connection_value="conn"): + self.connection_value = connection_value + self.conninfo = "" + + def __call__( + self, + *, + conninfo, + connection_class=None, + **kwargs, + ): + self.conninfo = conninfo + return self + + def connection(self): + class _Ctx: + def __init__(self, outer): + self.outer = outer + + def __enter__(self): + return self.outer.connection_value + + def __exit__(self, exc_type, exc, tb): + pass + + return _Ctx(self) + + +def test_databricks_store_configures_lakebase(monkeypatch): + test_pool = TestConnectionPool(connection_value="lake-conn") + monkeypatch.setattr(lakebase, "ConnectionPool", test_pool) + + workspace = MagicMock() + workspace.database.generate_database_credential.return_value = MagicMock(token="stub-token") + workspace.database.get_database_instance.return_value.read_write_dns = "db-host" + workspace.current_service_principal.me.side_effect = RuntimeError("no sp") + workspace.current_user.me.return_value = MagicMock(user_name="test@databricks.com") + + store = DatabricksStore( + instance_name="lakebase-instance", + workspace_client=workspace, + ) + + assert ( + test_pool.conninfo + == "dbname=databricks_postgres user=test@databricks.com host=db-host port=5432 sslmode=require" + ) + assert isinstance(store, DatabricksStore) + assert store._lakebase.pool == test_pool + + with store._lakebase.connection() as conn: + assert conn == "lake-conn" From fff1c1ec2d815cbd584cce7075b4d17db02095aa Mon Sep 17 00:00:00 2001 From: Jenny Date: Thu, 20 Nov 2025 16:31:35 -0800 Subject: [PATCH 02/14] databricksstore --- .github/workflows/main.yml | 1 + .../src/databricks_langchain/store.py | 68 ++++++++++++++----- .../langchain/tests/unit_tests/test_store.py | 10 ++- 3 files changed, 59 insertions(+), 20 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 03652c84..15da3c87 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -92,6 +92,7 @@ jobs: run: | pytest tests/databricks_ai_bridge/test_lakebase.py pytest integrations/langchain/tests/unit_tests/test_checkpoint.py + pytest integrations/langchain/tests/unit_tests/test_store.py langchain_cross_version_test: runs-on: ubuntu-latest diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index b4402c15..bde2e9cd 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import Any, Optional + from databricks.sdk import WorkspaceClient try: @@ -8,42 +10,72 @@ _store_imports_available = True except ImportError: - PostgresSaver = object + LakebasePool = object + PostgresStore = object _store_imports_available = False -class DatabricksStore(PostgresStore): +class DatabricksStore: """ - LangGraph PostgresStore using a Lakebase connection pool. - instance_name: Name of Lakebase Instance + Wrapper around LangGraph's PostgresStore that uses a Lakebase + connection pool and borrows a connection per call. """ def __init__( self, *, instance_name: str, - workspace_client: WorkspaceClient | None = None, - **pool_kwargs: object, + workspace_client: Optional[WorkspaceClient] = None, + **pool_kwargs: Any, ) -> None: - # Lazy imports if not _store_imports_available: raise ImportError( "DatabricksStore requires databricks-langchain[memory]. " - "Please install with: pip install databricks-langchain[memory]" + "Install with: pip install 'databricks-langchain[memory]'" ) - self._lakebase = LakebasePool( + self._lakebase: LakebasePool = LakebasePool( instance_name=instance_name, workspace_client=workspace_client, - **dict(pool_kwargs), + **pool_kwargs, ) - super().__init__(self._lakebase.pool) + self._pool = self._lakebase.pool + + def _with_store(self, fn, *args, **kwargs): + """ + Borrow a connection, create a short-lived PostgresStore, call fn(store), + then return the connection to the pool. + """ + with self._pool.connection() as conn: + store = PostgresStore(conn=conn) + return fn(store, *args, **kwargs) + + def setup(self) -> None: + """Set up the store database tables (first time setup).""" + return self._with_store(lambda s: s.setup()) + + def put(self, namespace: tuple[str, ...], key: str, value: Any) -> None: + """Store a value in the store.""" + return self._with_store(lambda s: s.put(namespace, key, value)) + + def search( + self, + namespace: tuple[str, ...], + *, + query: Optional[str] = None, + limit: int = 20, + ) -> list[Any]: + """Search for items in the store.""" + return self._with_store(lambda s: s.search(namespace, query=query, limit=limit)) + + # def close(self) -> None: + # """Close the underlying Lakebase pool.""" + # self._lakebase.close() - def __enter__(self): - """Enter context manager.""" - return self + # def __enter__(self) -> "DatabricksStore": + # """Enter context manager.""" + # return self - def __exit__(self, exc_type, exc_val, exc_tb): - """Exit context manager and close the connection pool.""" - self._lakebase.close() - return False + # def __exit__(self, exc_type, exc_val, exc_tb) -> None: + # """Exit context manager and close the connection pool.""" + # self.close() diff --git a/integrations/langchain/tests/unit_tests/test_store.py b/integrations/langchain/tests/unit_tests/test_store.py index 860babca..dd205b67 100644 --- a/integrations/langchain/tests/unit_tests/test_store.py +++ b/integrations/langchain/tests/unit_tests/test_store.py @@ -6,6 +6,7 @@ pytest.importorskip("psycopg") pytest.importorskip("psycopg_pool") +pytest.importorskip("langgraph.checkpoint.postgres") from databricks_ai_bridge import lakebase @@ -42,9 +43,14 @@ def __exit__(self, exc_type, exc, tb): def test_databricks_store_configures_lakebase(monkeypatch): - test_pool = TestConnectionPool(connection_value="lake-conn") + mock_conn = MagicMock() + test_pool = TestConnectionPool(connection_value=mock_conn) monkeypatch.setattr(lakebase, "ConnectionPool", test_pool) + from langgraph.store.postgres import PostgresStore + + monkeypatch.setattr(PostgresStore, "setup", MagicMock()) + workspace = MagicMock() workspace.database.generate_database_credential.return_value = MagicMock(token="stub-token") workspace.database.get_database_instance.return_value.read_write_dns = "db-host" @@ -64,4 +70,4 @@ def test_databricks_store_configures_lakebase(monkeypatch): assert store._lakebase.pool == test_pool with store._lakebase.connection() as conn: - assert conn == "lake-conn" + assert conn == mock_conn From bb45b1c299233d56d51fe7c8fa946e79122d19da Mon Sep 17 00:00:00 2001 From: Jenny Date: Thu, 20 Nov 2025 17:08:44 -0800 Subject: [PATCH 03/14] lazy initialize lakebase pool --- .../src/databricks_langchain/store.py | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index bde2e9cd..ed37be4b 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -34,24 +34,37 @@ def __init__( "Install with: pip install 'databricks-langchain[memory]'" ) - self._lakebase: LakebasePool = LakebasePool( - instance_name=instance_name, - workspace_client=workspace_client, - **pool_kwargs, - ) - self._pool = self._lakebase.pool + # Store initialization parameters for lazy initialization, otherwise + # if we directly iniitalize pool during deployment it will fail + self._instance_name = instance_name + self._workspace_client = workspace_client + self._pool_kwargs = pool_kwargs + self._lakebase: Optional[LakebasePool] = None + self._pool = None + self._setup_called = False + + def _ensure_initialized(self) -> None: + """Lazy initialization of LakebasePool on first use after deployment is ready.""" + if self._lakebase is None: + self._lakebase = LakebasePool( + instance_name=self._instance_name, + workspace_client=self._workspace_client, + **self._pool_kwargs, + ) + self._pool = self._lakebase.pool def _with_store(self, fn, *args, **kwargs): """ Borrow a connection, create a short-lived PostgresStore, call fn(store), then return the connection to the pool. """ + self._ensure_initialized() with self._pool.connection() as conn: store = PostgresStore(conn=conn) return fn(store, *args, **kwargs) def setup(self) -> None: - """Set up the store database tables (first time setup).""" + """Set up the store database tables.""" return self._with_store(lambda s: s.setup()) def put(self, namespace: tuple[str, ...], key: str, value: Any) -> None: @@ -70,7 +83,8 @@ def search( # def close(self) -> None: # """Close the underlying Lakebase pool.""" - # self._lakebase.close() + # if self._lakebase is not None: + # self._lakebase.close() # def __enter__(self) -> "DatabricksStore": # """Enter context manager.""" From bff5c768fd2afe752973f076925574adac8bae3c Mon Sep 17 00:00:00 2001 From: Jenny Date: Fri, 21 Nov 2025 09:47:11 -0800 Subject: [PATCH 04/14] setup test --- integrations/langchain/src/databricks_langchain/store.py | 2 +- integrations/langchain/tests/unit_tests/test_store.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index ed37be4b..ba8733ee 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -34,7 +34,7 @@ def __init__( "Install with: pip install 'databricks-langchain[memory]'" ) - # Store initialization parameters for lazy initialization, otherwise + # Store initialization parameters for lazy initialization, otherwise # if we directly iniitalize pool during deployment it will fail self._instance_name = instance_name self._workspace_client = workspace_client diff --git a/integrations/langchain/tests/unit_tests/test_store.py b/integrations/langchain/tests/unit_tests/test_store.py index dd205b67..9dfe1607 100644 --- a/integrations/langchain/tests/unit_tests/test_store.py +++ b/integrations/langchain/tests/unit_tests/test_store.py @@ -62,6 +62,8 @@ def test_databricks_store_configures_lakebase(monkeypatch): workspace_client=workspace, ) + store.setup() + assert ( test_pool.conninfo == "dbname=databricks_postgres user=test@databricks.com host=db-host port=5432 sslmode=require" From 272c62d00cb0ce8af5c2ae2ba070de6bf65ed5db Mon Sep 17 00:00:00 2001 From: Jenny Date: Fri, 21 Nov 2025 09:52:38 -0800 Subject: [PATCH 05/14] nit format --- .../langchain/src/databricks_langchain/store.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index ba8733ee..bc9d679f 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -80,16 +80,3 @@ def search( ) -> list[Any]: """Search for items in the store.""" return self._with_store(lambda s: s.search(namespace, query=query, limit=limit)) - - # def close(self) -> None: - # """Close the underlying Lakebase pool.""" - # if self._lakebase is not None: - # self._lakebase.close() - - # def __enter__(self) -> "DatabricksStore": - # """Enter context manager.""" - # return self - - # def __exit__(self, exc_type, exc_val, exc_tb) -> None: - # """Exit context manager and close the connection pool.""" - # self.close() From 888a924fef09abf5885b68c8d29083775eb11e7e Mon Sep 17 00:00:00 2001 From: Jenny Date: Fri, 21 Nov 2025 15:59:06 -0800 Subject: [PATCH 06/14] subclass of BaseStore --- .../src/databricks_langchain/store.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index bc9d679f..ddaeec5d 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -6,19 +6,24 @@ try: from databricks_ai_bridge.lakebase import LakebasePool + from langgraph.store.base import BaseStore, Item from langgraph.store.postgres import PostgresStore _store_imports_available = True except ImportError: LakebasePool = object PostgresStore = object + BaseStore = object + Item = object _store_imports_available = False -class DatabricksStore: - """ - Wrapper around LangGraph's PostgresStore that uses a Lakebase - connection pool and borrows a connection per call. +class DatabricksStore(BaseStore): + """Provides APIs for working with long-term memory on Databricks using Lakebase. + Extends LangGraph BaseStore interface using Databricks Lakebase for connection pooling. + + Operations borrow a connection from the pool, create a short-lived PostgresStore, + execute the operation, and return the connection to the pool. """ def __init__( @@ -35,13 +40,12 @@ def __init__( ) # Store initialization parameters for lazy initialization, otherwise - # if we directly iniitalize pool during deployment it will fail + # if we directly initialize pool during deployment it will fail self._instance_name = instance_name self._workspace_client = workspace_client self._pool_kwargs = pool_kwargs self._lakebase: Optional[LakebasePool] = None self._pool = None - self._setup_called = False def _ensure_initialized(self) -> None: """Lazy initialization of LakebasePool on first use after deployment is ready.""" @@ -64,7 +68,7 @@ def _with_store(self, fn, *args, **kwargs): return fn(store, *args, **kwargs) def setup(self) -> None: - """Set up the store database tables.""" + """Instantiate the store, setting up necessary persistent storage.""" return self._with_store(lambda s: s.setup()) def put(self, namespace: tuple[str, ...], key: str, value: Any) -> None: @@ -77,6 +81,6 @@ def search( *, query: Optional[str] = None, limit: int = 20, - ) -> list[Any]: + ) -> list[Item]: """Search for items in the store.""" return self._with_store(lambda s: s.search(namespace, query=query, limit=limit)) From 2dfabefae6f670837d7d1a9055e1ce62d3282750 Mon Sep 17 00:00:00 2001 From: Jenny Date: Fri, 21 Nov 2025 16:23:11 -0800 Subject: [PATCH 07/14] batch/abatch --- .../src/databricks_langchain/store.py | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index ddaeec5d..00d9d6b5 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -1,12 +1,12 @@ from __future__ import annotations -from typing import Any, Optional +from typing import Any, Iterable, Optional from databricks.sdk import WorkspaceClient try: from databricks_ai_bridge.lakebase import LakebasePool - from langgraph.store.base import BaseStore, Item + from langgraph.store.base import BaseStore, Item, Op, Result from langgraph.store.postgres import PostgresStore _store_imports_available = True @@ -15,6 +15,8 @@ PostgresStore = object BaseStore = object Item = object + Op = object + Result = object _store_imports_available = False @@ -71,16 +73,26 @@ def setup(self) -> None: """Instantiate the store, setting up necessary persistent storage.""" return self._with_store(lambda s: s.setup()) - def put(self, namespace: tuple[str, ...], key: str, value: Any) -> None: - """Store a value in the store.""" - return self._with_store(lambda s: s.put(namespace, key, value)) + # BaseStore abstract methods - REQUIRED to implement + def batch(self, ops: Iterable[Op]) -> list[Result]: + """Execute a batch of operations synchronously. - def search( - self, - namespace: tuple[str, ...], - *, - query: Optional[str] = None, - limit: int = 20, - ) -> list[Item]: - """Search for items in the store.""" - return self._with_store(lambda s: s.search(namespace, query=query, limit=limit)) + This is the core method required by BaseStore. All other operations + (get, put, search, delete, list_namespaces) are inherited from BaseStore + and internally call this batch() method. + """ + return self._with_store(lambda s: s.batch(ops)) + + async def abatch(self, ops: Iterable[Op]) -> list[Result]: + """Execute a batch of operations asynchronously. + + This is the second abstract method required by BaseStore. + Currently delegates to sync batch() - for true async support, + would need async-compatible connection pooling. + """ + return self.batch(ops) + + # def close(self) -> None: + # """Close the underlying Lakebase pool.""" + # if self._lakebase is not None: + # self._lakebase.close() From 521573329bc3059c7beb1c13d81249eb2e25eb90 Mon Sep 17 00:00:00 2001 From: Jenny Date: Fri, 21 Nov 2025 16:24:09 -0800 Subject: [PATCH 08/14] add close --- integrations/langchain/src/databricks_langchain/store.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index 00d9d6b5..bcb6baaf 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -92,7 +92,7 @@ async def abatch(self, ops: Iterable[Op]) -> list[Result]: """ return self.batch(ops) - # def close(self) -> None: - # """Close the underlying Lakebase pool.""" - # if self._lakebase is not None: - # self._lakebase.close() + def close(self) -> None: + """Close the underlying Lakebase pool.""" + if self._lakebase is not None: + self._lakebase.close() From 0ea050cd51363d0444378b0786232d8f4c9b9e73 Mon Sep 17 00:00:00 2001 From: Jenny Date: Fri, 21 Nov 2025 16:29:39 -0800 Subject: [PATCH 09/14] remove close method --- integrations/langchain/src/databricks_langchain/store.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index bcb6baaf..28446f5c 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -73,7 +73,6 @@ def setup(self) -> None: """Instantiate the store, setting up necessary persistent storage.""" return self._with_store(lambda s: s.setup()) - # BaseStore abstract methods - REQUIRED to implement def batch(self, ops: Iterable[Op]) -> list[Result]: """Execute a batch of operations synchronously. @@ -91,8 +90,3 @@ async def abatch(self, ops: Iterable[Op]) -> list[Result]: would need async-compatible connection pooling. """ return self.batch(ops) - - def close(self) -> None: - """Close the underlying Lakebase pool.""" - if self._lakebase is not None: - self._lakebase.close() From 848a7cfbd05fc2a3f8dc5988dc0bcaa418ecb478 Mon Sep 17 00:00:00 2001 From: Jenny Date: Tue, 25 Nov 2025 05:29:50 -0800 Subject: [PATCH 10/14] add namespace normalization functions --- .../src/databricks_langchain/store.py | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index 28446f5c..af125cba 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -1,5 +1,7 @@ from __future__ import annotations +import hashlib +import re from typing import Any, Iterable, Optional from databricks.sdk import WorkspaceClient @@ -20,6 +22,62 @@ _store_imports_available = False +def normalize_namespace_label(s: Optional[str]) -> str: + """Normalize a string for use as a namespace label. + + Converts to lowercase, replaces @ with -at-, removes invalid characters, + and truncates with hash if too long. + + Args: + s: The string to normalize (e.g., email address, user_id) + + Returns: + Normalized string safe for namespace usage + + Example: + >>> normalize_namespace_label("user@example.com") + 'user-at-example-com' + >>> normalize_namespace_label("") + 'anon' + """ + SAFE_NS_MAX = 64 + + if not s: + return "anon" + x = s.strip().lower().replace("@", "-at-") + x = re.sub(r"[^a-z0-9_-]+", "-", x) # removes dots and punctuation + x = re.sub(r"-{2,}", "-", x).strip("-") or "anon" + if len(x) > SAFE_NS_MAX: + head = x[: SAFE_NS_MAX - 17] + tail = hashlib.sha256(x.encode()).hexdigest()[:16] + x = f"{head}-{tail}" + return x + + +def namespace(identifier: str, prefix: str = "users") -> tuple[str, ...]: + """Create a namespace tuple with a normalized identifier. + + Args: + identifier: The identifier to normalize (e.g., user_id, email, entity_name) + prefix: The namespace prefix (default: "users") + + Returns: + Tuple of (prefix, normalized_identifier) for use as namespace + + Example: + >>> namespace("email@databricks.com") + ('users', 'email-at-databricks-com') + >>> namespace("session-123", prefix="sessions") + ('sessions', 'session-123') + + Note: + To customize namespace structure for your agent (e.g., to isolate + memories per agent or add additional hierarchy), pass a custom prefix + or build your namespace tuple manually. + """ + return (prefix, normalize_namespace_label(identifier)) + + class DatabricksStore(BaseStore): """Provides APIs for working with long-term memory on Databricks using Lakebase. Extends LangGraph BaseStore interface using Databricks Lakebase for connection pooling. From ec3444b3cfe4798479ab66ada42b46bb1f157f59 Mon Sep 17 00:00:00 2001 From: Jenny Date: Tue, 25 Nov 2025 05:45:14 -0800 Subject: [PATCH 11/14] namespace as static methods --- .../src/databricks_langchain/store.py | 100 ++++++++---------- 1 file changed, 44 insertions(+), 56 deletions(-) diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index af125cba..baf910c3 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -22,62 +22,6 @@ _store_imports_available = False -def normalize_namespace_label(s: Optional[str]) -> str: - """Normalize a string for use as a namespace label. - - Converts to lowercase, replaces @ with -at-, removes invalid characters, - and truncates with hash if too long. - - Args: - s: The string to normalize (e.g., email address, user_id) - - Returns: - Normalized string safe for namespace usage - - Example: - >>> normalize_namespace_label("user@example.com") - 'user-at-example-com' - >>> normalize_namespace_label("") - 'anon' - """ - SAFE_NS_MAX = 64 - - if not s: - return "anon" - x = s.strip().lower().replace("@", "-at-") - x = re.sub(r"[^a-z0-9_-]+", "-", x) # removes dots and punctuation - x = re.sub(r"-{2,}", "-", x).strip("-") or "anon" - if len(x) > SAFE_NS_MAX: - head = x[: SAFE_NS_MAX - 17] - tail = hashlib.sha256(x.encode()).hexdigest()[:16] - x = f"{head}-{tail}" - return x - - -def namespace(identifier: str, prefix: str = "users") -> tuple[str, ...]: - """Create a namespace tuple with a normalized identifier. - - Args: - identifier: The identifier to normalize (e.g., user_id, email, entity_name) - prefix: The namespace prefix (default: "users") - - Returns: - Tuple of (prefix, normalized_identifier) for use as namespace - - Example: - >>> namespace("email@databricks.com") - ('users', 'email-at-databricks-com') - >>> namespace("session-123", prefix="sessions") - ('sessions', 'session-123') - - Note: - To customize namespace structure for your agent (e.g., to isolate - memories per agent or add additional hierarchy), pass a custom prefix - or build your namespace tuple manually. - """ - return (prefix, normalize_namespace_label(identifier)) - - class DatabricksStore(BaseStore): """Provides APIs for working with long-term memory on Databricks using Lakebase. Extends LangGraph BaseStore interface using Databricks Lakebase for connection pooling. @@ -86,6 +30,50 @@ class DatabricksStore(BaseStore): execute the operation, and return the connection to the pool. """ + @staticmethod + def normalize_namespace_label(s: Optional[str]) -> str: + """Normalize a string for use as a namespace label. + Converts to lowercase, replaces @ with -at-, removes invalid characters, + and truncates with hash if too long. + Args: + s: The string to normalize (e.g., email address, user_id) + Returns: + Normalized string safe for namespace usage + Example: + >>> normalize_namespace_label("user@example.com") + 'user-at-example-com' + >>> normalize_namespace_label("") + 'anon' + """ + SAFE_NS_MAX = 64 + + if not s: + return "anon" + x = s.strip().lower().replace("@", "-at-") + x = re.sub(r"[^a-z0-9_-]+", "-", x) # removes dots and punctuation + x = re.sub(r"-{2,}", "-", x).strip("-") or "anon" + if len(x) > SAFE_NS_MAX: + head = x[: SAFE_NS_MAX - 17] + tail = hashlib.sha256(x.encode()).hexdigest()[:16] + x = f"{head}-{tail}" + return x + + @staticmethod + def namespace(identifier: str, prefix: str = "users") -> tuple[str, ...]: + """Create a namespace tuple with a normalized identifier. + Args: + identifier: The identifier to normalize (e.g., user_id, email, entity_name) + prefix: The namespace prefix (default: "users") + Returns: + Tuple of (prefix, normalized_identifier) for use as namespace + Example: + >>> namespace("email@databricks.com") + ('users', 'email-at-databricks-com') + >>> namespace("session-123", prefix="sessions") + ('sessions', 'session-123') + """ + return (prefix, DatabricksStore.normalize_namespace_label(identifier)) + def __init__( self, *, From 4d1c74cef7da7b480a8d35cbdeb579d5228c47be Mon Sep 17 00:00:00 2001 From: Jenny Date: Tue, 25 Nov 2025 15:24:36 -0800 Subject: [PATCH 12/14] remove lazy init in sdk --- .../src/databricks_langchain/store.py | 25 +++++-------------- .../langchain/tests/unit_tests/test_store.py | 3 --- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/integrations/langchain/src/databricks_langchain/store.py b/integrations/langchain/src/databricks_langchain/store.py index baf910c3..e550f966 100644 --- a/integrations/langchain/src/databricks_langchain/store.py +++ b/integrations/langchain/src/databricks_langchain/store.py @@ -87,31 +87,18 @@ def __init__( "Install with: pip install 'databricks-langchain[memory]'" ) - # Store initialization parameters for lazy initialization, otherwise - # if we directly initialize pool during deployment it will fail - self._instance_name = instance_name - self._workspace_client = workspace_client - self._pool_kwargs = pool_kwargs - self._lakebase: Optional[LakebasePool] = None - self._pool = None - - def _ensure_initialized(self) -> None: - """Lazy initialization of LakebasePool on first use after deployment is ready.""" - if self._lakebase is None: - self._lakebase = LakebasePool( - instance_name=self._instance_name, - workspace_client=self._workspace_client, - **self._pool_kwargs, - ) - self._pool = self._lakebase.pool + self._lakebase: LakebasePool = LakebasePool( + instance_name=instance_name, + workspace_client=workspace_client, + **pool_kwargs, + ) def _with_store(self, fn, *args, **kwargs): """ Borrow a connection, create a short-lived PostgresStore, call fn(store), then return the connection to the pool. """ - self._ensure_initialized() - with self._pool.connection() as conn: + with self._lakebase.connection() as conn: store = PostgresStore(conn=conn) return fn(store, *args, **kwargs) diff --git a/integrations/langchain/tests/unit_tests/test_store.py b/integrations/langchain/tests/unit_tests/test_store.py index 9dfe1607..8b9c009e 100644 --- a/integrations/langchain/tests/unit_tests/test_store.py +++ b/integrations/langchain/tests/unit_tests/test_store.py @@ -62,14 +62,11 @@ def test_databricks_store_configures_lakebase(monkeypatch): workspace_client=workspace, ) - store.setup() - assert ( test_pool.conninfo == "dbname=databricks_postgres user=test@databricks.com host=db-host port=5432 sslmode=require" ) assert isinstance(store, DatabricksStore) - assert store._lakebase.pool == test_pool with store._lakebase.connection() as conn: assert conn == mock_conn From b036124aa295e95e6140f30efc674d064c1e9aea Mon Sep 17 00:00:00 2001 From: Jenny Date: Wed, 26 Nov 2025 12:06:36 -0800 Subject: [PATCH 13/14] namespace unit tests --- .../langchain/tests/unit_tests/test_store.py | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/integrations/langchain/tests/unit_tests/test_store.py b/integrations/langchain/tests/unit_tests/test_store.py index 8b9c009e..837150aa 100644 --- a/integrations/langchain/tests/unit_tests/test_store.py +++ b/integrations/langchain/tests/unit_tests/test_store.py @@ -70,3 +70,99 @@ def test_databricks_store_configures_lakebase(monkeypatch): with store._lakebase.connection() as conn: assert conn == mock_conn + + +class TestDatabricksStoreNamespace: + """Test the DatabricksStore.namespace() static method.""" + + def test_namespace_with_email(self): + """Test namespace normalization with a typical email address.""" + result = DatabricksStore.namespace("first.last@databricks.com") + assert result == ("users", "first-last-at-databricks-com") + + def test_namespace_with_uppercase(self): + """Test that uppercase letters are converted to lowercase.""" + result = DatabricksStore.namespace("FIRST.LAST@DATABRICKS.COM") + assert result == ("users", "first-last-at-databricks-com") + + def test_namespace_with_empty_identifier(self): + """Test that empty identifier returns 'anon'.""" + result = DatabricksStore.namespace("") + assert result == ("users", "anon") + + def test_namespace_with_whitespace_only(self): + """Test that whitespace-only identifier returns 'anon'.""" + result = DatabricksStore.namespace(" ") + assert result == ("users", "anon") + + def test_namespace_with_custom_prefix(self): + """Test namespace with a custom prefix.""" + result = DatabricksStore.namespace("user123", prefix="agents") + assert result == ("agents", "user123") + + def test_namespace_with_special_characters(self): + """Test that special characters are replaced with dashes.""" + result = DatabricksStore.namespace("user!name@test#site.com") + assert result == ("users", "user-name-at-test-site-com") + + def test_namespace_with_multiple_consecutive_dashes(self): + """Test that multiple consecutive dashes are collapsed to single dash.""" + result = DatabricksStore.namespace("user!!!name@@@test.com") + assert result == ("users", "user-name-at-test-com") + + def test_namespace_with_leading_trailing_special_chars(self): + """Test that leading/trailing dashes are stripped.""" + result = DatabricksStore.namespace("!!!user@test.com!!!") + assert result == ("users", "user-at-test-com") + + def test_namespace_with_underscores_and_hyphens(self): + """Test that underscores and hyphens are preserved.""" + result = DatabricksStore.namespace("user_name-123") + assert result == ("users", "user_name-123") + + def test_namespace_with_numbers(self): + """Test that numbers are preserved.""" + result = DatabricksStore.namespace("user123@test456.com") + assert result == ("users", "user123-at-test456-com") + + def test_namespace_with_long_identifier(self): + """Test that long identifiers are truncated with hash suffix.""" + # Create an identifier longer than 64 characters + long_identifier = "a" * 70 + "@example.com" + result = DatabricksStore.namespace(long_identifier) + + # Should be exactly 64 characters + assert len(result[1]) == 64 + + # Should start with truncated original and end with hash + assert result[1].startswith("a" * 47) + assert "-" in result[1] + + # Verify it's a valid namespace tuple + assert result[0] == "users" + assert isinstance(result, tuple) + assert len(result) == 2 + + def test_namespace_with_only_special_characters(self): + """Test identifier with only special characters.""" + result = DatabricksStore.namespace("@@@###$$$") + assert result == ("users", "anon") + + def test_namespace_with_mixed_valid_invalid_chars(self): + """Test identifier with mix of valid and invalid characters.""" + result = DatabricksStore.namespace("test$user%123@site&domain.com") + assert result == ("users", "test-user-123-at-site-domain-com") + + def test_namespace_with_unicode_characters(self): + """Test that unicode characters are removed or replaced.""" + result = DatabricksStore.namespace("user\u00e9@test.com") # user with é + assert result[0] == "users" + assert "at-test-com" in result[1] + + def test_namespace_returns_tuple(self): + """Test that namespace always returns a tuple.""" + result = DatabricksStore.namespace("test@example.com") + assert isinstance(result, tuple) + assert len(result) == 2 + assert isinstance(result[0], str) + assert isinstance(result[1], str) From 7872846c09c2843da7f54f83be277bc8c80531ce Mon Sep 17 00:00:00 2001 From: Jenny Date: Wed, 26 Nov 2025 12:14:07 -0800 Subject: [PATCH 14/14] update tests --- integrations/langchain/tests/unit_tests/test_store.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/integrations/langchain/tests/unit_tests/test_store.py b/integrations/langchain/tests/unit_tests/test_store.py index 837150aa..bba38185 100644 --- a/integrations/langchain/tests/unit_tests/test_store.py +++ b/integrations/langchain/tests/unit_tests/test_store.py @@ -105,11 +105,6 @@ def test_namespace_with_special_characters(self): result = DatabricksStore.namespace("user!name@test#site.com") assert result == ("users", "user-name-at-test-site-com") - def test_namespace_with_multiple_consecutive_dashes(self): - """Test that multiple consecutive dashes are collapsed to single dash.""" - result = DatabricksStore.namespace("user!!!name@@@test.com") - assert result == ("users", "user-name-at-test-com") - def test_namespace_with_leading_trailing_special_chars(self): """Test that leading/trailing dashes are stripped.""" result = DatabricksStore.namespace("!!!user@test.com!!!") @@ -143,10 +138,10 @@ def test_namespace_with_long_identifier(self): assert isinstance(result, tuple) assert len(result) == 2 - def test_namespace_with_only_special_characters(self): - """Test identifier with only special characters.""" + def test_namespace_with_at_and_special_characters(self): + """Test identifier with at and special characters.""" result = DatabricksStore.namespace("@@@###$$$") - assert result == ("users", "anon") + assert result == ("users", "at-at-at") def test_namespace_with_mixed_valid_invalid_chars(self): """Test identifier with mix of valid and invalid characters."""