Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ FalkorDB Python client

see [docs](http://falkordb-py.readthedocs.io/)

## Installation
```sh
pip install FalkorDB
```
## Installation
```sh
pip install FalkorDB
```

Embedded mode (local in-process server via optional binaries):
```sh
pip install "FalkorDB[lite]"
```

## Usage

Expand All @@ -32,8 +37,8 @@ Or use [FalkorDB Cloud](https://app.falkordb.cloud)
```python
from falkordb import FalkorDB

# Connect to FalkorDB
db = FalkorDB(host='localhost', port=6379)
# Connect to FalkorDB
db = FalkorDB(host='localhost', port=6379)

# Select the social graph
g = db.select_graph('social')
Expand All @@ -49,9 +54,14 @@ nodes = g.ro_query('MATCH (n) RETURN n LIMIT 10').result_set
# Copy the Graph
copy_graph = g.copy('social_copy')

# Delete the Graph
g.delete()
```
# Delete the Graph
g.delete()

# Embedded FalkorDB (no external server)
with FalkorDB(embedded=True, db_path="/tmp/social.rdb") as embedded_db:
eg = embedded_db.select_graph("embedded_social")
eg.query('CREATE (:Person {name: "Alice"})')
```

### Asynchronous Example

Expand All @@ -63,8 +73,8 @@ from redis.asyncio import BlockingConnectionPool
async def main():

# Connect to FalkorDB
pool = BlockingConnectionPool(max_connections=16, timeout=None, decode_responses=True)
db = FalkorDB(connection_pool=pool)
pool = BlockingConnectionPool(max_connections=16, timeout=None, decode_responses=True)
db = FalkorDB(connection_pool=pool)

# Select the social graph
g = db.select_graph('social')
Expand All @@ -90,8 +100,13 @@ async def main():
print(f"Created Alice: {results[1].result_set[0][0]}")
print(f"Created Bob: {results[2].result_set[0][0]}")

# Close the connection when done
await pool.aclose()
# Close the connection when done
await pool.aclose()

# Embedded mode (same API)
async with FalkorDB(embedded=True) as embedded_db:
embedded_graph = embedded_db.select_graph("embedded_social")
await embedded_graph.query('CREATE (:Person {name: "Bob"})')

# Run the async example
if __name__ == "__main__":
Expand Down
127 changes: 91 additions & 36 deletions falkordb/asyncio/falkordb.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import List, Optional, Union

import redis.asyncio as redis # type: ignore[import-not-found]
Expand Down Expand Up @@ -31,6 +32,8 @@ class FalkorDB:
print(node.properties['name'])
"""

_embedded_server = None

def __init__(
self,
host="localhost",
Expand Down Expand Up @@ -70,41 +73,89 @@ def __init__(
reinitialize_steps=5,
read_from_replicas=False,
address_remap=None,
embedded=False,
db_path=None,
embedded_config=None,
startup_timeout=10.0,
connection_acquire_timeout=5.0,
):
self._embedded_server = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

AttributeError: 'FalkorDB' object has no attribute '_embedded_server' — CI test failure.

Although __init__ sets self._embedded_server = None at line 80, the CI test fails with AttributeError at line 282, indicating instances are reaching aclose() without _embedded_server set (e.g., via object.__new__, a patched __init__, or mock-created instances in existing tests). Add a class-level declaration as a safe default:

🐛 Proposed fix
 class FalkorDB:
     """
     Asynchronous FalkorDB Class for interacting with a FalkorDB server.
     ...
     """
+    _embedded_server = None
+
     def __init__(
         self,
         ...
🧰 Tools
🪛 GitHub Actions: Lint

[error] Ruff format check would reformat this file.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@falkordb/asyncio/falkordb.py` at line 80, The class lacks a class-level
default for _embedded_server which leads to AttributeError when instances are
created without running __init__ (f.e. in tests); add a class attribute
declaration on the FalkorDB class like "_embedded_server = None" so any instance
(including ones created via object.__new__ or mocks) always has this attribute
defined, and ensure the aclose() method still checks self._embedded_server
before using it (referencing FalkorDB, _embedded_server, and aclose).


if embedded:
from ..lite.server import EmbeddedServer

if max_connections is None:
max_connections = 16

server = EmbeddedServer(
db_path=db_path,
config=embedded_config,
startup_timeout=startup_timeout,
)
self._embedded_server = server
connection_pool = redis.BlockingConnectionPool(
connection_class=redis.UnixDomainSocketConnection,
path=server.unix_socket_path,
max_connections=max_connections,
timeout=connection_acquire_timeout,
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
socket_keepalive_options=socket_keepalive_options,
encoding=encoding,
encoding_errors=encoding_errors,
decode_responses=True,
retry_on_error=retry_on_error,
retry=retry,
health_check_interval=health_check_interval,
client_name=client_name,
lib_name=lib_name,
lib_version=lib_version,
username=username,
password=password,
credential_provider=credential_provider,
protocol=protocol,
)
Comment on lines +84 to +118
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Blocking synchronous calls inside async __init__ will stall the event loop.

EmbeddedServer() (line 80) is fully synchronous — it spawns a subprocess and polls with time.sleep() in a loop for up to startup_timeout seconds (server.py lines 52–82). Calling this directly in the __init__ of an async client blocks the running event loop for the entire startup duration.

Consider wrapping the server startup in asyncio.to_thread() (or loop.run_in_executor()) and providing an async factory method instead of doing this work in __init__:

Example: async factory method
`@classmethod`
async def create(cls, *, embedded=False, db_path=None, embedded_config=None,
                 startup_timeout=10.0, max_connections=None, **kwargs):
    if embedded:
        import asyncio
        from ..lite.server import EmbeddedServer

        if max_connections is None:
            max_connections = 16

        server = await asyncio.to_thread(
            EmbeddedServer,
            db_path=db_path,
            config=embedded_config,
            startup_timeout=startup_timeout,
        )
        kwargs["connection_pool"] = redis.BlockingConnectionPool(
            connection_class=redis.UnixDomainSocketConnection,
            path=server.unix_socket_path,
            max_connections=max_connections,
            ...
        )
        instance = cls(**kwargs)
        instance._embedded_server = server
        return instance
    else:
        return cls(**kwargs)


conn = redis.Redis(
host=host,
port=port,
db=0,
password=password,
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
socket_keepalive_options=socket_keepalive_options,
connection_pool=connection_pool,
unix_socket_path=unix_socket_path,
encoding=encoding,
encoding_errors=encoding_errors,
decode_responses=True,
retry_on_error=retry_on_error,
ssl=ssl,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ca_data=ssl_ca_data,
ssl_check_hostname=ssl_check_hostname,
max_connections=max_connections,
single_connection_client=single_connection_client,
health_check_interval=health_check_interval,
client_name=client_name,
driver_info=DriverInfo(lib_name, lib_version or get_package_version()),
username=username,
retry=retry,
redis_connect_func=connect_func,
credential_provider=credential_provider,
protocol=protocol,
)
if embedded:
conn = redis.Redis(
connection_pool=connection_pool,
single_connection_client=single_connection_client,
)
else:
conn = redis.Redis(
host=host,
port=port,
db=0,
password=password,
socket_timeout=socket_timeout,
socket_connect_timeout=socket_connect_timeout,
socket_keepalive=socket_keepalive,
socket_keepalive_options=socket_keepalive_options,
connection_pool=connection_pool,
unix_socket_path=unix_socket_path,
encoding=encoding,
encoding_errors=encoding_errors,
decode_responses=True,
retry_on_error=retry_on_error,
ssl=ssl,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ca_data=ssl_ca_data,
ssl_check_hostname=ssl_check_hostname,
max_connections=max_connections,
single_connection_client=single_connection_client,
health_check_interval=health_check_interval,
client_name=client_name,
driver_info=DriverInfo(lib_name, lib_version or get_package_version()),
username=username,
retry=retry,
redis_connect_func=connect_func,
credential_provider=credential_provider,
protocol=protocol,
)

if Is_Cluster(conn):
conn = Cluster_Conn(
Expand Down Expand Up @@ -215,16 +266,20 @@ async def config_set(self, name: str, value=None) -> None:
return await self.connection.execute_command(CONFIG_CMD, "SET", name, value)

async def aclose(self) -> None:
"""
Close the underlying connection(s).
"""
"""Close the underlying connection(s) and stop the embedded server
if present."""

try:
await self.connection.aclose()
except RedisError:
# best-effort close — don't raise on Redis errors
pass

server = getattr(self, "_embedded_server", None)
if server is not None:
await asyncio.to_thread(server.stop)
self._embedded_server = None

async def __aenter__(self) -> "FalkorDB":
"""Return self to support async with-statement usage."""

Expand Down
Loading