Skip to content

Commit f9fef54

Browse files
committed
Test case
1 parent 76895cc commit f9fef54

File tree

3 files changed

+183
-0
lines changed

3 files changed

+183
-0
lines changed

regress/expected/concurrent.out

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
\! python3 regress/python/test_concurrent.py
2+
Result: OK
3+
4+
Result: OK
5+
6+
Result: OK
7+
8+
Result: OK
9+
10+
Result: OK
11+
12+
All threads have finished execution.

regress/python/test_concurrent.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
from contextlib import contextmanager
2+
import psycopg2
3+
from psycopg2 import sql
4+
import threading
5+
6+
from concurrent.futures import ThreadPoolExecutor
7+
from threading import Semaphore
8+
9+
sqls = [
10+
""" SELECT * FROM cypher('test_graph', $$
11+
MERGE (n:DDDDD {doc_id: 'f5ce4dc2'})
12+
SET n.embedset_id='ae1b9b73', n.doc_id='f5ce4dc2', n.doc_hash='977b56ef'
13+
$$) as (result agtype) """,
14+
""" SELECT * FROM cypher('test_graph', $$
15+
MERGE (n:EEEEE {doc_id: 'f5ce4dc2'})
16+
SET n.embedset_id='ae1b9b73', n.doc_id='f5ce4dc2', n.doc_hash='1d7e79a0'
17+
$$) as (result agtype) """,
18+
""" SELECT * FROM cypher('test_graph', $$
19+
MATCH (source:EEEEE {doc_id:'f5ce4dc2'})
20+
MATCH (target:DDDDD {doc_id:'f5ce4dc2'})
21+
WITH source, target
22+
MERGE (source)-[r:DIRECTED]->(target)
23+
SET r.embedset_id='ae1b9b73', r.doc_id='f5ce4dc2'
24+
RETURN r
25+
$$) as (result agtype) """,
26+
""" SELECT * FROM cypher('test_graph', $$
27+
MATCH (source:EEEEE {doc_id:'f5ce4dc2'})
28+
MATCH (target:DDDDD {doc_id:'f5ce4dc2'})
29+
WITH source, target
30+
MERGE (source)-[r:DIRECTED]->(target)
31+
SET r.embedset_id='ae1b9b73', r.doc_id='f5ce4dc2'
32+
RETURN r
33+
$$) as (result agtype) """,
34+
""" SELECT * FROM cypher('test_graph', $$
35+
MATCH (source:EEEEE {doc_id:'f5ce4dc2'})
36+
MATCH (target:DDDDD {doc_id:'f5ce4dc2'})
37+
WITH source, target
38+
MERGE (source)-[r:DIRECTED]->(target)
39+
SET r.embedset_id='ae1b9b73', r.doc_id='f5ce4dc2'
40+
RETURN r
41+
$$) as (result agtype) """,
42+
]
43+
44+
45+
class PieGraphConnector:
46+
host: str
47+
port: str
48+
user: str
49+
password: str
50+
database: str
51+
warehouse: str
52+
53+
def __init__(self, global_config: dict):
54+
self.host = global_config.get("host", "")
55+
self.port = global_config.get("port", "")
56+
self.user = global_config.get("user", "")
57+
self.password = global_config.get("password", "")
58+
self.database = global_config.get("database", "")
59+
self.warehouse = global_config.get("warehouse", "")
60+
61+
@contextmanager
62+
def conn(self):
63+
conn = None
64+
if self.warehouse and self.warehouse != "":
65+
options = "'-c warehouse=" + self.warehouse + "'"
66+
conn = psycopg2.connect(
67+
dbname=self.database,
68+
user=self.user,
69+
password=self.password,
70+
host=self.host,
71+
port=self.port,
72+
options=options,
73+
)
74+
else:
75+
conn = psycopg2.connect(
76+
dbname=self.database,
77+
user=self.user,
78+
password=self.password,
79+
host=self.host,
80+
port=self.port,
81+
)
82+
conn.autocommit = True
83+
with conn.cursor() as cursor:
84+
cursor.execute("CREATE EXTENSION IF NOT EXISTS age;")
85+
cursor.execute("LOAD 'age';")
86+
cursor.execute("SET search_path = ag_catalog, '$user', public;")
87+
try:
88+
yield conn
89+
finally:
90+
conn.close()
91+
92+
93+
class BoundedThreadPoolExecutor(ThreadPoolExecutor):
94+
def __init__(self, max_workers=5, max_task_size=32, *args, **kwargs):
95+
if max_task_size < max_workers:
96+
raise ValueError(
97+
"max_task_size should be greater than or equal to max_workers"
98+
)
99+
if max_workers is not None:
100+
kwargs["max_workers"] = max_workers
101+
super().__init__(*args, **kwargs)
102+
self._semaphore = Semaphore(max_task_size)
103+
104+
def submit(self, fn, /, *args, **kwargs):
105+
timeout = kwargs.get("timeout", None)
106+
if self._semaphore.acquire(timeout=timeout):
107+
future = super().submit(fn, *args, **kwargs)
108+
future.add_done_callback(lambda _: self._semaphore.release())
109+
return future
110+
else:
111+
raise TimeoutError("waiting for semaphore timeout")
112+
113+
114+
db_config = {
115+
"host": "127.0.0.1",
116+
"database": "postgres",
117+
"user": "postgres",
118+
"password": "",
119+
"port": "5432",
120+
}
121+
122+
connector = PieGraphConnector(db_config)
123+
124+
def execute_sql(query):
125+
try:
126+
connection = psycopg2.connect(**db_config)
127+
cursor = connection.cursor()
128+
cursor.execute("CREATE EXTENSION IF NOT EXISTS age;")
129+
cursor.execute("LOAD 'age';")
130+
cursor.execute("SET search_path = ag_catalog, '$user', public;")
131+
132+
cursor.execute(query)
133+
134+
connection.commit()
135+
136+
result = cursor.fetchall()
137+
138+
except Exception as e:
139+
print(f"Error executing query '{query}': {e}")
140+
finally:
141+
if connection:
142+
cursor.close()
143+
connection.close()
144+
145+
semaphore_graph = threading.Semaphore(20)
146+
147+
drop_graph = "SELECT * FROM drop_graph('test_graph', true)"
148+
create_graph = "SELECT * FROM create_graph('test_graph')"
149+
150+
# execute_sql(drop_graph)
151+
execute_sql(create_graph)
152+
153+
def _merge_exec_sql(query: str):
154+
with semaphore_graph:
155+
with connector.conn() as conn:
156+
with conn.cursor() as cursor:
157+
try:
158+
cursor.execute(query)
159+
result = cursor.fetchall()
160+
print(f"Result: OK\n")
161+
except Exception as e:
162+
print(f"Error executing query '{query}': {e}")
163+
conn.commit()
164+
165+
with BoundedThreadPoolExecutor() as executor:
166+
executor.map(lambda q: _merge_exec_sql(q), sqls)
167+
168+
print("All threads have finished execution.")
169+
170+
execute_sql(drop_graph)

regress/sql/concurrent.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
\! python3 regress/python/test_concurrent.py

0 commit comments

Comments
 (0)