diff --git a/README.md b/README.md index c21f15d..b289423 100644 --- a/README.md +++ b/README.md @@ -13,10 +13,15 @@ FalkorDB Python client see [docs](http://falkordb-py.readthedocs.io/) -## Installation -```sh -pip install FalkorDB -``` +## Installation +```sh +pip install FalkorDB +``` + +Embedded mode (local in-process server via optional binaries): +```sh +pip install "FalkorDB[lite]" +``` ## Usage @@ -32,8 +37,8 @@ Or use [FalkorDB Cloud](https://app.falkordb.cloud) ```python from falkordb import FalkorDB -# Connect to FalkorDB -db = FalkorDB(host='localhost', port=6379) +# Connect to FalkorDB +db = FalkorDB(host='localhost', port=6379) # Select the social graph g = db.select_graph('social') @@ -49,9 +54,14 @@ nodes = g.ro_query('MATCH (n) RETURN n LIMIT 10').result_set # Copy the Graph copy_graph = g.copy('social_copy') -# Delete the Graph -g.delete() -``` +# Delete the Graph +g.delete() + +# Embedded FalkorDB (no external server) +with FalkorDB(embedded=True, db_path="/tmp/social.rdb") as embedded_db: + eg = embedded_db.select_graph("embedded_social") + eg.query('CREATE (:Person {name: "Alice"})') +``` ### Asynchronous Example @@ -63,8 +73,8 @@ from redis.asyncio import BlockingConnectionPool async def main(): # Connect to FalkorDB - pool = BlockingConnectionPool(max_connections=16, timeout=None, decode_responses=True) - db = FalkorDB(connection_pool=pool) + pool = BlockingConnectionPool(max_connections=16, timeout=None, decode_responses=True) + db = FalkorDB(connection_pool=pool) # Select the social graph g = db.select_graph('social') @@ -90,8 +100,13 @@ async def main(): print(f"Created Alice: {results[1].result_set[0][0]}") print(f"Created Bob: {results[2].result_set[0][0]}") - # Close the connection when done - await pool.aclose() + # Close the connection when done + await pool.aclose() + + # Embedded mode (same API) + async with FalkorDB(embedded=True) as embedded_db: + embedded_graph = embedded_db.select_graph("embedded_social") + await embedded_graph.query('CREATE (:Person {name: "Bob"})') # Run the async example if __name__ == "__main__": diff --git a/falkordb/asyncio/falkordb.py b/falkordb/asyncio/falkordb.py index dba7199..d37b5d7 100644 --- a/falkordb/asyncio/falkordb.py +++ b/falkordb/asyncio/falkordb.py @@ -1,3 +1,4 @@ +import asyncio from typing import List, Optional, Union import redis.asyncio as redis # type: ignore[import-not-found] @@ -31,6 +32,8 @@ class FalkorDB: print(node.properties['name']) """ + _embedded_server = None + def __init__( self, host="localhost", @@ -70,41 +73,89 @@ def __init__( reinitialize_steps=5, read_from_replicas=False, address_remap=None, + embedded=False, + db_path=None, + embedded_config=None, + startup_timeout=10.0, + connection_acquire_timeout=5.0, ): + self._embedded_server = None + + if embedded: + from ..lite.server import EmbeddedServer + + if max_connections is None: + max_connections = 16 + + server = EmbeddedServer( + db_path=db_path, + config=embedded_config, + startup_timeout=startup_timeout, + ) + self._embedded_server = server + connection_pool = redis.BlockingConnectionPool( + connection_class=redis.UnixDomainSocketConnection, + path=server.unix_socket_path, + max_connections=max_connections, + timeout=connection_acquire_timeout, + socket_timeout=socket_timeout, + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + socket_keepalive_options=socket_keepalive_options, + encoding=encoding, + encoding_errors=encoding_errors, + decode_responses=True, + retry_on_error=retry_on_error, + retry=retry, + health_check_interval=health_check_interval, + client_name=client_name, + lib_name=lib_name, + lib_version=lib_version, + username=username, + password=password, + credential_provider=credential_provider, + protocol=protocol, + ) - conn = redis.Redis( - host=host, - port=port, - db=0, - password=password, - socket_timeout=socket_timeout, - socket_connect_timeout=socket_connect_timeout, - socket_keepalive=socket_keepalive, - socket_keepalive_options=socket_keepalive_options, - connection_pool=connection_pool, - unix_socket_path=unix_socket_path, - encoding=encoding, - encoding_errors=encoding_errors, - decode_responses=True, - retry_on_error=retry_on_error, - ssl=ssl, - ssl_keyfile=ssl_keyfile, - ssl_certfile=ssl_certfile, - ssl_cert_reqs=ssl_cert_reqs, - ssl_ca_certs=ssl_ca_certs, - ssl_ca_data=ssl_ca_data, - ssl_check_hostname=ssl_check_hostname, - max_connections=max_connections, - single_connection_client=single_connection_client, - health_check_interval=health_check_interval, - client_name=client_name, - driver_info=DriverInfo(lib_name, lib_version or get_package_version()), - username=username, - retry=retry, - redis_connect_func=connect_func, - credential_provider=credential_provider, - protocol=protocol, - ) + if embedded: + conn = redis.Redis( + connection_pool=connection_pool, + single_connection_client=single_connection_client, + ) + else: + conn = redis.Redis( + host=host, + port=port, + db=0, + password=password, + socket_timeout=socket_timeout, + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + socket_keepalive_options=socket_keepalive_options, + connection_pool=connection_pool, + unix_socket_path=unix_socket_path, + encoding=encoding, + encoding_errors=encoding_errors, + decode_responses=True, + retry_on_error=retry_on_error, + ssl=ssl, + ssl_keyfile=ssl_keyfile, + ssl_certfile=ssl_certfile, + ssl_cert_reqs=ssl_cert_reqs, + ssl_ca_certs=ssl_ca_certs, + ssl_ca_data=ssl_ca_data, + ssl_check_hostname=ssl_check_hostname, + max_connections=max_connections, + single_connection_client=single_connection_client, + health_check_interval=health_check_interval, + client_name=client_name, + driver_info=DriverInfo(lib_name, lib_version or get_package_version()), + username=username, + retry=retry, + redis_connect_func=connect_func, + credential_provider=credential_provider, + protocol=protocol, + ) if Is_Cluster(conn): conn = Cluster_Conn( @@ -215,9 +266,8 @@ async def config_set(self, name: str, value=None) -> None: return await self.connection.execute_command(CONFIG_CMD, "SET", name, value) async def aclose(self) -> None: - """ - Close the underlying connection(s). - """ + """Close the underlying connection(s) and stop the embedded server + if present.""" try: await self.connection.aclose() @@ -225,6 +275,11 @@ async def aclose(self) -> None: # best-effort close — don't raise on Redis errors pass + server = getattr(self, "_embedded_server", None) + if server is not None: + await asyncio.to_thread(server.stop) + self._embedded_server = None + async def __aenter__(self) -> "FalkorDB": """Return self to support async with-statement usage.""" diff --git a/falkordb/falkordb.py b/falkordb/falkordb.py index b3605b8..f706371 100644 --- a/falkordb/falkordb.py +++ b/falkordb/falkordb.py @@ -31,6 +31,8 @@ class FalkorDB: print(node.properties['name']) """ + _embedded_server = None + def __init__( self, host="localhost", @@ -78,47 +80,94 @@ def __init__( dynamic_startup_nodes=True, url=None, address_remap=None, + embedded=False, + db_path=None, + embedded_config=None, + startup_timeout=10.0, ): + self._embedded_server = None + + if embedded: + from .lite.server import EmbeddedServer + + if max_connections is None: + max_connections = 16 + + server = EmbeddedServer( + db_path=db_path, + config=embedded_config, + startup_timeout=startup_timeout, + ) + self._embedded_server = server + + connection_pool = redis.ConnectionPool( + connection_class=redis.UnixDomainSocketConnection, + path=server.unix_socket_path, + max_connections=max_connections, + socket_timeout=socket_timeout, + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + socket_keepalive_options=socket_keepalive_options, + encoding=encoding, + encoding_errors=encoding_errors, + decode_responses=True, + retry_on_error=retry_on_error, + retry=retry, + health_check_interval=health_check_interval, + client_name=client_name, + lib_name=lib_name, + lib_version=lib_version, + username=username, + password=password, + credential_provider=credential_provider, + protocol=protocol, + ) - conn = redis.Redis( - host=host, - port=port, - db=0, - password=password, - socket_timeout=socket_timeout, - socket_connect_timeout=socket_connect_timeout, - socket_keepalive=socket_keepalive, - socket_keepalive_options=socket_keepalive_options, - connection_pool=connection_pool, - unix_socket_path=unix_socket_path, - encoding=encoding, - encoding_errors=encoding_errors, - decode_responses=True, - retry_on_error=retry_on_error, - ssl=ssl, - ssl_keyfile=ssl_keyfile, - ssl_certfile=ssl_certfile, - ssl_cert_reqs=ssl_cert_reqs, - ssl_ca_certs=ssl_ca_certs, - ssl_ca_path=ssl_ca_path, - ssl_ca_data=ssl_ca_data, - ssl_check_hostname=ssl_check_hostname, - ssl_password=ssl_password, - ssl_validate_ocsp=ssl_validate_ocsp, - ssl_validate_ocsp_stapled=ssl_validate_ocsp_stapled, - ssl_ocsp_context=ssl_ocsp_context, - ssl_ocsp_expected_cert=ssl_ocsp_expected_cert, - max_connections=max_connections, - single_connection_client=single_connection_client, - health_check_interval=health_check_interval, - client_name=client_name, - driver_info=DriverInfo(lib_name, lib_version or get_package_version()), - username=username, - retry=retry, - redis_connect_func=connect_func, - credential_provider=credential_provider, - protocol=protocol, - ) + if embedded: + conn = redis.Redis( + connection_pool=connection_pool, + single_connection_client=single_connection_client, + ) + else: + conn = redis.Redis( + host=host, + port=port, + db=0, + password=password, + socket_timeout=socket_timeout, + socket_connect_timeout=socket_connect_timeout, + socket_keepalive=socket_keepalive, + socket_keepalive_options=socket_keepalive_options, + connection_pool=connection_pool, + unix_socket_path=unix_socket_path, + encoding=encoding, + encoding_errors=encoding_errors, + decode_responses=True, + retry_on_error=retry_on_error, + ssl=ssl, + ssl_keyfile=ssl_keyfile, + ssl_certfile=ssl_certfile, + ssl_cert_reqs=ssl_cert_reqs, + ssl_ca_certs=ssl_ca_certs, + ssl_ca_path=ssl_ca_path, + ssl_ca_data=ssl_ca_data, + ssl_check_hostname=ssl_check_hostname, + ssl_password=ssl_password, + ssl_validate_ocsp=ssl_validate_ocsp, + ssl_validate_ocsp_stapled=ssl_validate_ocsp_stapled, + ssl_ocsp_context=ssl_ocsp_context, + ssl_ocsp_expected_cert=ssl_ocsp_expected_cert, + max_connections=max_connections, + single_connection_client=single_connection_client, + health_check_interval=health_check_interval, + client_name=client_name, + driver_info=DriverInfo(lib_name, lib_version or get_package_version()), + username=username, + retry=retry, + redis_connect_func=connect_func, + credential_provider=credential_provider, + protocol=protocol, + ) if Is_Sentinel(conn): self.sentinel, self.service_name = Sentinel_Conn(conn, ssl) @@ -142,6 +191,30 @@ def __init__( self.flushdb = conn.flushdb self.execute_command = conn.execute_command + def close(self) -> None: + """Close the underlying connection(s) and stop the embedded server + if present.""" + + try: + self.connection.close() + except RedisError: + # best-effort close — don't raise on Redis errors + pass + + server = getattr(self, "_embedded_server", None) + if server is not None: + server.stop() + self._embedded_server = None + + def __enter__(self) -> "FalkorDB": + """Return self to support usage in a with-statement.""" + + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + """Close the connection when exiting a with-statement.""" + self.close() + @classmethod def from_url(cls, url: str, **kwargs) -> "FalkorDB": """ @@ -233,26 +306,6 @@ def config_set(self, name: str, value=None) -> None: return self.connection.execute_command(CONFIG_CMD, "SET", name, value) - def close(self) -> None: - """ - Close the underlying connection(s). - """ - - try: - self.connection.close() - except RedisError: - # best-effort close — don't raise on Redis errors - pass - - def __enter__(self) -> "FalkorDB": - """Return self to support usage in a with-statement.""" - - return self - - def __exit__(self, exc_type, exc_val, exc_tb) -> None: - """Close the connection when exiting a with-statement.""" - self.close() - # GRAPH.UDF LOAD [REPLACE]