Skip to content

Commit dccf1ef

Browse files
authored
Merge pull request #2524 from danielaskdd/fix-pg-cypher-dollar-quoting
fix(postgres): Handle dollar-sign sequences in Cypher queries to prevent syntax errors
2 parents 67e8a2b + f70fdfe commit dccf1ef

File tree

1 file changed

+100
-88
lines changed

1 file changed

+100
-88
lines changed

lightrag/kg/postgres_impl.py

Lines changed: 100 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,37 @@ def _safe_index_name(table_name: str, index_suffix: str) -> str:
9494
return shortened_name
9595

9696

97+
def _dollar_quote(s: str, tag_prefix: str = "AGE") -> str:
98+
"""
99+
Generate a PostgreSQL dollar-quoted string with a unique tag.
100+
101+
PostgreSQL dollar-quoting uses $tag$ as delimiters. If the content contains
102+
the same delimiter (e.g., $$ or $AGE1$), it will break the query.
103+
This function finds a unique tag that doesn't conflict with the content.
104+
105+
Args:
106+
s: The string to quote
107+
tag_prefix: Prefix for generating unique tags (default: "AGE")
108+
109+
Returns:
110+
The dollar-quoted string with a unique tag, e.g., $AGE1$content$AGE1$
111+
112+
Example:
113+
>>> _dollar_quote("hello")
114+
'$AGE1$hello$AGE1$'
115+
>>> _dollar_quote("$AGE1$ test")
116+
'$AGE2$$AGE1$ test$AGE2$'
117+
>>> _dollar_quote("$$$") # Content with dollar signs
118+
'$AGE1$$$$AGE1$'
119+
"""
120+
s = "" if s is None else str(s)
121+
for i in itertools.count(1):
122+
tag = f"{tag_prefix}{i}"
123+
wrapper = f"${tag}$"
124+
if wrapper not in s:
125+
return f"{wrapper}{s}{wrapper}"
126+
127+
97128
class PostgreSQLDB:
98129
def __init__(self, config: dict[str, Any], **kwargs: Any):
99130
self.host = config["host"]
@@ -4232,14 +4263,12 @@ async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | N
42324263
"""
42334264
label = self._normalize_node_id(source_node_id)
42344265

4235-
query = """SELECT * FROM cypher('%s', $$
4236-
MATCH (n:base {entity_id: "%s"})
4266+
# Build Cypher query with dynamic dollar-quoting to handle entity_id containing $ sequences
4267+
cypher_query = f"""MATCH (n:base {{entity_id: "{label}"}})
42374268
OPTIONAL MATCH (n)-[]-(connected:base)
4238-
RETURN n.entity_id AS source_id, connected.entity_id AS connected_id
4239-
$$) AS (source_id text, connected_id text)""" % (
4240-
self.graph_name,
4241-
label,
4242-
)
4269+
RETURN n.entity_id AS source_id, connected.entity_id AS connected_id"""
4270+
4271+
query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(cypher_query)}) AS (source_id text, connected_id text)"
42434272

42444273
results = await self._query(query)
42454274
edges = []
@@ -4273,15 +4302,13 @@ async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
42734302
label = self._normalize_node_id(node_id)
42744303
properties = self._format_properties(node_data)
42754304

4276-
query = """SELECT * FROM cypher('%s', $$
4277-
MERGE (n:base {entity_id: "%s"})
4278-
SET n += %s
4279-
RETURN n
4280-
$$) AS (n agtype)""" % (
4281-
self.graph_name,
4282-
label,
4283-
properties,
4284-
)
4305+
# Build Cypher query with dynamic dollar-quoting to handle content containing $$
4306+
# This prevents syntax errors when LLM-extracted descriptions contain $ sequences
4307+
cypher_query = f"""MERGE (n:base {{entity_id: "{label}"}})
4308+
SET n += {properties}
4309+
RETURN n"""
4310+
4311+
query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(cypher_query)}) AS (n agtype)"
42854312

42864313
try:
42874314
await self._query(query, readonly=False, upsert=True)
@@ -4312,21 +4339,18 @@ async def upsert_edge(
43124339
tgt_label = self._normalize_node_id(target_node_id)
43134340
edge_properties = self._format_properties(edge_data)
43144341

4315-
query = """SELECT * FROM cypher('%s', $$
4316-
MATCH (source:base {entity_id: "%s"})
4342+
# Build Cypher query with dynamic dollar-quoting to handle content containing $$
4343+
# This prevents syntax errors when LLM-extracted descriptions contain $ sequences
4344+
# See: https://github.com/HKUDS/LightRAG/issues/1438#issuecomment-2826000195
4345+
cypher_query = f"""MATCH (source:base {{entity_id: "{src_label}"}})
43174346
WITH source
4318-
MATCH (target:base {entity_id: "%s"})
4347+
MATCH (target:base {{entity_id: "{tgt_label}"}})
43194348
MERGE (source)-[r:DIRECTED]-(target)
4320-
SET r += %s
4321-
SET r += %s
4322-
RETURN r
4323-
$$) AS (r agtype)""" % (
4324-
self.graph_name,
4325-
src_label,
4326-
tgt_label,
4327-
edge_properties,
4328-
edge_properties, # https://github.com/HKUDS/LightRAG/issues/1438#issuecomment-2826000195
4329-
)
4349+
SET r += {edge_properties}
4350+
SET r += {edge_properties}
4351+
RETURN r"""
4352+
4353+
query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(cypher_query)}) AS (r agtype)"
43304354

43314355
try:
43324356
await self._query(query, readonly=False, upsert=True)
@@ -4346,10 +4370,11 @@ async def delete_node(self, node_id: str) -> None:
43464370
"""
43474371
label = self._normalize_node_id(node_id)
43484372

4349-
query = """SELECT * FROM cypher('%s', $$
4350-
MATCH (n:base {entity_id: "%s"})
4351-
DETACH DELETE n
4352-
$$) AS (n agtype)""" % (self.graph_name, label)
4373+
# Build Cypher query with dynamic dollar-quoting to handle entity_id containing $ sequences
4374+
cypher_query = f"""MATCH (n:base {{entity_id: "{label}"}})
4375+
DETACH DELETE n"""
4376+
4377+
query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(cypher_query)}) AS (n agtype)"
43534378

43544379
try:
43554380
await self._query(query, readonly=False)
@@ -4364,14 +4389,15 @@ async def remove_nodes(self, node_ids: list[str]) -> None:
43644389
Args:
43654390
node_ids (list[str]): A list of node IDs to remove.
43664391
"""
4367-
node_ids = [self._normalize_node_id(node_id) for node_id in node_ids]
4368-
node_id_list = ", ".join([f'"{node_id}"' for node_id in node_ids])
4392+
node_ids_normalized = [self._normalize_node_id(node_id) for node_id in node_ids]
4393+
node_id_list = ", ".join([f'"{node_id}"' for node_id in node_ids_normalized])
43694394

4370-
query = """SELECT * FROM cypher('%s', $$
4371-
MATCH (n:base)
4372-
WHERE n.entity_id IN [%s]
4373-
DETACH DELETE n
4374-
$$) AS (n agtype)""" % (self.graph_name, node_id_list)
4395+
# Build Cypher query with dynamic dollar-quoting to handle entity_id containing $ sequences
4396+
cypher_query = f"""MATCH (n:base)
4397+
WHERE n.entity_id IN [{node_id_list}]
4398+
DETACH DELETE n"""
4399+
4400+
query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(cypher_query)}) AS (n agtype)"
43754401

43764402
try:
43774403
await self._query(query, readonly=False)
@@ -4390,10 +4416,11 @@ async def remove_edges(self, edges: list[tuple[str, str]]) -> None:
43904416
src_label = self._normalize_node_id(source)
43914417
tgt_label = self._normalize_node_id(target)
43924418

4393-
query = """SELECT * FROM cypher('%s', $$
4394-
MATCH (a:base {entity_id: "%s"})-[r]-(b:base {entity_id: "%s"})
4395-
DELETE r
4396-
$$) AS (r agtype)""" % (self.graph_name, src_label, tgt_label)
4419+
# Build Cypher query with dynamic dollar-quoting to handle entity_id containing $ sequences
4420+
cypher_query = f"""MATCH (a:base {{entity_id: "{src_label}"}})-[r]-(b:base {{entity_id: "{tgt_label}"}})
4421+
DELETE r"""
4422+
4423+
query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(cypher_query)}) AS (r agtype)"
43974424

43984425
try:
43994426
await self._query(query, readonly=False)
@@ -4666,24 +4693,16 @@ async def get_edges_batch(
46664693
MATCH (a)<-[r]-(b)
46674694
RETURN src_eid AS source, tgt_eid AS target, properties(r) AS edge_properties"""
46684695

4669-
def dollar_quote(s: str, tag_prefix="AGE"):
4670-
s = "" if s is None else str(s)
4671-
for i in itertools.count(1):
4672-
tag = f"{tag_prefix}{i}"
4673-
wrapper = f"${tag}$"
4674-
if wrapper not in s:
4675-
return f"{wrapper}{s}{wrapper}"
4676-
46774696
sql_fwd = f"""
4678-
SELECT * FROM cypher({dollar_quote(self.graph_name)}::name,
4679-
{dollar_quote(forward_cypher)}::cstring,
4697+
SELECT * FROM cypher({_dollar_quote(self.graph_name)}::name,
4698+
{_dollar_quote(forward_cypher)}::cstring,
46804699
$1::agtype)
46814700
AS (source text, target text, edge_properties agtype)
46824701
"""
46834702

46844703
sql_bwd = f"""
4685-
SELECT * FROM cypher({dollar_quote(self.graph_name)}::name,
4686-
{dollar_quote(backward_cypher)}::cstring,
4704+
SELECT * FROM cypher({_dollar_quote(self.graph_name)}::name,
4705+
{_dollar_quote(backward_cypher)}::cstring,
46874706
$1::agtype)
46884707
AS (source text, target text, edge_properties agtype)
46894708
"""
@@ -4758,25 +4777,19 @@ async def get_nodes_edges_batch(
47584777
# Format node IDs for the query
47594778
formatted_ids = ", ".join([f'"{n}"' for n in batch])
47604779

4761-
outgoing_query = """SELECT * FROM cypher('%s', $$
4762-
UNWIND [%s] AS node_id
4763-
MATCH (n:base {entity_id: node_id})
4780+
# Build Cypher queries with dynamic dollar-quoting to handle entity_id containing $ sequences
4781+
outgoing_cypher = f"""UNWIND [{formatted_ids}] AS node_id
4782+
MATCH (n:base {{entity_id: node_id}})
47644783
OPTIONAL MATCH (n:base)-[]->(connected:base)
4765-
RETURN node_id, connected.entity_id AS connected_id
4766-
$$) AS (node_id text, connected_id text)""" % (
4767-
self.graph_name,
4768-
formatted_ids,
4769-
)
4784+
RETURN node_id, connected.entity_id AS connected_id"""
47704785

4771-
incoming_query = """SELECT * FROM cypher('%s', $$
4772-
UNWIND [%s] AS node_id
4773-
MATCH (n:base {entity_id: node_id})
4786+
incoming_cypher = f"""UNWIND [{formatted_ids}] AS node_id
4787+
MATCH (n:base {{entity_id: node_id}})
47744788
OPTIONAL MATCH (n:base)<-[]-(connected:base)
4775-
RETURN node_id, connected.entity_id AS connected_id
4776-
$$) AS (node_id text, connected_id text)""" % (
4777-
self.graph_name,
4778-
formatted_ids,
4779-
)
4789+
RETURN node_id, connected.entity_id AS connected_id"""
4790+
4791+
outgoing_query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(outgoing_cypher)}) AS (node_id text, connected_id text)"
4792+
incoming_query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(incoming_cypher)}) AS (node_id text, connected_id text)"
47804793

47814794
outgoing_results = await self._query(outgoing_query)
47824795
incoming_results = await self._query(incoming_query)
@@ -4850,10 +4863,12 @@ async def _bfs_subgraph(
48504863

48514864
# Get starting node data
48524865
label = self._normalize_node_id(node_label)
4853-
query = """SELECT * FROM cypher('%s', $$
4854-
MATCH (n:base {entity_id: "%s"})
4855-
RETURN id(n) as node_id, n
4856-
$$) AS (node_id bigint, n agtype)""" % (self.graph_name, label)
4866+
4867+
# Build Cypher query with dynamic dollar-quoting to handle entity_id containing $ sequences
4868+
cypher_query = f"""MATCH (n:base {{entity_id: "{label}"}})
4869+
RETURN id(n) as node_id, n"""
4870+
4871+
query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(cypher_query)}) AS (node_id bigint, n agtype)"
48574872

48584873
node_result = await self._query(query)
48594874
if not node_result or not node_result[0].get("n"):
@@ -4909,9 +4924,8 @@ async def _bfs_subgraph(
49094924
[f'"{self._normalize_node_id(node_id)}"' for node_id in node_ids]
49104925
)
49114926

4912-
# Construct batch query for outgoing edges
4913-
outgoing_query = f"""SELECT * FROM cypher('{self.graph_name}', $$
4914-
UNWIND [{formatted_ids}] AS node_id
4927+
# Build Cypher queries with dynamic dollar-quoting to handle entity_id containing $ sequences
4928+
outgoing_cypher = f"""UNWIND [{formatted_ids}] AS node_id
49154929
MATCH (n:base {{entity_id: node_id}})
49164930
OPTIONAL MATCH (n)-[r]->(neighbor:base)
49174931
RETURN node_id AS current_id,
@@ -4921,13 +4935,9 @@ async def _bfs_subgraph(
49214935
id(r) AS edge_id,
49224936
r,
49234937
neighbor,
4924-
true AS is_outgoing
4925-
$$) AS (current_id text, current_internal_id bigint, neighbor_internal_id bigint,
4926-
neighbor_id text, edge_id bigint, r agtype, neighbor agtype, is_outgoing bool)"""
4938+
true AS is_outgoing"""
49274939

4928-
# Construct batch query for incoming edges
4929-
incoming_query = f"""SELECT * FROM cypher('{self.graph_name}', $$
4930-
UNWIND [{formatted_ids}] AS node_id
4940+
incoming_cypher = f"""UNWIND [{formatted_ids}] AS node_id
49314941
MATCH (n:base {{entity_id: node_id}})
49324942
OPTIONAL MATCH (n)<-[r]-(neighbor:base)
49334943
RETURN node_id AS current_id,
@@ -4937,9 +4947,11 @@ async def _bfs_subgraph(
49374947
id(r) AS edge_id,
49384948
r,
49394949
neighbor,
4940-
false AS is_outgoing
4941-
$$) AS (current_id text, current_internal_id bigint, neighbor_internal_id bigint,
4942-
neighbor_id text, edge_id bigint, r agtype, neighbor agtype, is_outgoing bool)"""
4950+
false AS is_outgoing"""
4951+
4952+
outgoing_query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(outgoing_cypher)}) AS (current_id text, current_internal_id bigint, neighbor_internal_id bigint, neighbor_id text, edge_id bigint, r agtype, neighbor agtype, is_outgoing bool)"
4953+
4954+
incoming_query = f"SELECT * FROM cypher({_dollar_quote(self.graph_name)}, {_dollar_quote(incoming_cypher)}) AS (current_id text, current_internal_id bigint, neighbor_internal_id bigint, neighbor_id text, edge_id bigint, r agtype, neighbor agtype, is_outgoing bool)"
49434955

49444956
# Execute queries
49454957
outgoing_results = await self._query(outgoing_query)

0 commit comments

Comments
 (0)