Skip to content
This repository was archived by the owner on Mar 19, 2026. It is now read-only.

Commit bcf422e

Browse files
authored
Merge pull request #398 from webcoderz/asyncpg
Async pg module w/ connection pooling
2 parents 59d6c90 + e54eaf9 commit bcf422e

File tree

14 files changed

+800
-19
lines changed

14 files changed

+800
-19
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,4 +174,4 @@ cython_debug/
174174
src/controlflow/_version.py
175175
all_code.md
176176
all_docs.md
177-
llm_guides.md
177+
llm_guides.md

examples/asyncpg-memory.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import asyncio
2+
3+
import controlflow as cf
4+
from controlflow.memory.async_memory import AsyncMemory
5+
from controlflow.memory.providers.postgres import AsyncPostgresMemory
6+
7+
provider = AsyncPostgresMemory(
8+
database_url="postgresql+psycopg://postgres:postgres@localhost:5432/database",
9+
# embedding_dimension=1536,
10+
# embedding_fn=OpenAIEmbeddings(),
11+
table_name="vector_db_async",
12+
)
13+
14+
# Create a memory module for user preferences
15+
user_preferences = AsyncMemory(
16+
key="user_preferences",
17+
instructions="Store and retrieve user preferences.",
18+
provider=provider,
19+
)
20+
21+
# Create an agent with access to the memory
22+
agent = cf.Agent(memories=[user_preferences])
23+
24+
25+
# Create a flow to ask for the user's favorite color
26+
@cf.flow
27+
async def remember_pet():
28+
return await cf.run_async(
29+
"Ask the user for their favorite animal and store it in memory",
30+
agents=[agent],
31+
interactive=True,
32+
)
33+
34+
35+
# Create a flow to recall the user's favorite color
36+
@cf.flow
37+
async def recall_pet():
38+
return await cf.run_async(
39+
"What is the user's favorite animal?",
40+
agents=[agent],
41+
)
42+
43+
44+
async def main():
45+
print("First flow:")
46+
await remember_pet()
47+
48+
print("\nSecond flow:")
49+
result = await recall_pet()
50+
print(result)
51+
return result
52+
53+
54+
if __name__ == "__main__":
55+
asyncio.run(main())

examples/pg-memory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from controlflow.memory.providers.postgres import PostgresMemory
44

55
provider = PostgresMemory(
6-
database_url="postgresql://postgres:postgres@localhost:5432/your_database",
6+
database_url="postgresql://postgres:postgres@localhost:5432/database",
77
# embedding_dimension=1536,
88
# embedding_fn=OpenAIEmbeddings(),
99
table_name="vector_db",

src/controlflow/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
# functions, utilites, and decorators
1515
from .memory import Memory
16+
from .memory.async_memory import AsyncMemory
1617
from .instructions import instructions
1718
from .decorators import flow, task
1819
from .tools import tool

src/controlflow/agents/agent.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from controlflow.llm.models import get_model as get_model_from_string
3333
from controlflow.llm.rules import LLMRules
3434
from controlflow.memory import Memory
35+
from controlflow.memory.async_memory import AsyncMemory
3536
from controlflow.tools.tools import (
3637
Tool,
3738
as_lc_tools,
@@ -82,7 +83,7 @@ class Agent(ControlFlowModel, abc.ABC):
8283
default=False,
8384
description="If True, the agent is given tools for interacting with a human user.",
8485
)
85-
memories: list[Memory] = Field(
86+
memories: list[Memory] | list[AsyncMemory] = Field(
8687
default=[],
8788
description="A list of memory modules for the agent to use.",
8889
)

src/controlflow/defaults.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import controlflow.utilities
77
import controlflow.utilities.logging
88
from controlflow.llm.models import BaseChatModel
9+
from controlflow.memory.async_memory import AsyncMemoryProvider, get_memory_provider
910
from controlflow.memory.memory import MemoryProvider, get_memory_provider
1011
from controlflow.utilities.general import ControlFlowModel
1112

@@ -39,7 +40,9 @@ class Defaults(ControlFlowModel):
3940
model: Optional[Any]
4041
history: History
4142
agent: Agent
42-
memory_provider: Optional[Union[MemoryProvider, str]]
43+
memory_provider: (
44+
Optional[Union[MemoryProvider, str]] | Optional[Union[AsyncMemoryProvider, str]]
45+
)
4346

4447
# add more defaults here
4548
def __repr__(self) -> str:

src/controlflow/memory/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
from .memory import Memory
2+
from .async_memory import AsyncMemory
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import abc
2+
import re
3+
from typing import Dict, List, Optional, Union
4+
5+
from pydantic import Field, field_validator, model_validator
6+
7+
import controlflow
8+
from controlflow.tools.tools import Tool
9+
from controlflow.utilities.general import ControlFlowModel, unwrap
10+
from controlflow.utilities.logging import get_logger
11+
12+
logger = get_logger("controlflow.memory")
13+
14+
15+
def sanitize_memory_key(key: str) -> str:
16+
# Remove any characters that are not alphanumeric or underscore
17+
return re.sub(r"[^a-zA-Z0-9_]", "", key)
18+
19+
20+
class AsyncMemoryProvider(ControlFlowModel, abc.ABC):
21+
async def configure(self, memory_key: str) -> None:
22+
"""Configure the provider for a specific memory."""
23+
pass
24+
25+
@abc.abstractmethod
26+
async def add(self, memory_key: str, content: str) -> str:
27+
"""Create a new memory and return its ID."""
28+
pass
29+
30+
@abc.abstractmethod
31+
async def delete(self, memory_key: str, memory_id: str) -> None:
32+
"""Delete a memory by its ID."""
33+
pass
34+
35+
@abc.abstractmethod
36+
async def search(self, memory_key: str, query: str, n: int = 20) -> Dict[str, str]:
37+
"""Search for n memories using a string query."""
38+
pass
39+
40+
41+
class AsyncMemory(ControlFlowModel):
42+
"""
43+
A memory module is a partitioned collection of memories that are stored in a
44+
vector database, configured by a MemoryProvider.
45+
"""
46+
47+
key: str
48+
instructions: str = Field(
49+
description="Explain what this memory is for and how it should be used."
50+
)
51+
provider: AsyncMemoryProvider = Field(
52+
default_factory=lambda: controlflow.defaults.memory_provider,
53+
validate_default=True,
54+
)
55+
56+
def __hash__(self) -> int:
57+
return id(self)
58+
59+
@field_validator("provider", mode="before")
60+
@classmethod
61+
def validate_provider(
62+
cls, v: Optional[Union[AsyncMemoryProvider, str]]
63+
) -> AsyncMemoryProvider:
64+
if isinstance(v, str):
65+
return get_memory_provider(v)
66+
if v is None:
67+
raise ValueError(
68+
unwrap(
69+
"""
70+
Memory modules require a MemoryProvider to configure the
71+
underlying vector database. No provider was passed as an
72+
argument, and no default value has been configured.
73+
74+
For more information on configuring a memory provider, see
75+
the [Memory
76+
documentation](https://controlflow.ai/patterns/memory), and
77+
please review the [default provider
78+
guide](https://controlflow.ai/guides/default-memory) for
79+
information on configuring a default provider.
80+
81+
Please note that if you are using ControlFlow for the first
82+
time, this error is expected because ControlFlow does not include
83+
vector dependencies by default.
84+
"""
85+
)
86+
)
87+
return v
88+
89+
@field_validator("key")
90+
@classmethod
91+
def validate_key(cls, v: str) -> str:
92+
sanitized = sanitize_memory_key(v)
93+
if sanitized != v:
94+
raise ValueError(
95+
"Memory key must contain only alphanumeric characters and underscores"
96+
)
97+
return sanitized
98+
99+
async def _configure_provider(self):
100+
await self.provider.configure(self.key)
101+
return self
102+
103+
async def add(self, content: str) -> str:
104+
return await self.provider.add(self.key, content)
105+
106+
async def delete(self, memory_id: str) -> None:
107+
await self.provider.delete(self.key, memory_id)
108+
109+
async def search(self, query: str, n: int = 20) -> Dict[str, str]:
110+
return await self.provider.search(self.key, query, n)
111+
112+
def get_tools(self) -> List[Tool]:
113+
return [
114+
Tool.from_function(
115+
self.add,
116+
name=f"store_memory_{self.key}",
117+
description=f'Create a new memory in Memory: "{self.key}".',
118+
),
119+
Tool.from_function(
120+
self.delete,
121+
name=f"delete_memory_{self.key}",
122+
description=f'Delete a memory by its ID from Memory: "{self.key}".',
123+
),
124+
Tool.from_function(
125+
self.search,
126+
name=f"search_memories_{self.key}",
127+
description=f'Search for memories relevant to a string query in Memory: "{self.key}". Returns a dictionary of memory IDs and their contents.',
128+
),
129+
]
130+
131+
132+
def get_memory_provider(provider: str) -> AsyncMemoryProvider:
133+
logger.debug(f"Loading memory provider: {provider}")
134+
135+
# --- async postgres ---
136+
137+
if provider.startswith("async-postgres"):
138+
try:
139+
import sqlalchemy
140+
except ImportError:
141+
raise ImportError(
142+
"""To use async Postgres as a memory provider, please install the `sqlalchemy, `psycopg-pool`,
143+
`psycopg-binary`, and `psycopg` packages."""
144+
)
145+
146+
import controlflow.memory.providers.postgres as postgres_providers
147+
148+
return postgres_providers.AsyncPostgresMemory()
149+
raise ValueError(f'Memory provider "{provider}" could not be loaded from a string.')

src/controlflow/memory/memory.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,12 @@ def get_memory_provider(provider: str) -> MemoryProvider:
172172
import sqlalchemy
173173
except ImportError:
174174
raise ImportError(
175-
"To use Postgres as a memory provider, please install the `sqlalchemy` package."
175+
"""To use Postgres as a memory provider, please install the `sqlalchemy, `psycopg-pool`,
176+
`psycopg-binary`, and `psycopg` `psycopg2-binary` packages."""
176177
)
177178

178179
import controlflow.memory.providers.postgres as postgres_providers
179180

180181
return postgres_providers.PostgresMemory()
182+
181183
raise ValueError(f'Memory provider "{provider}" could not be loaded from a string.')

0 commit comments

Comments
 (0)