Skip to content

Commit 2ae8fa6

Browse files
committed
make the advisory lock prefix the oid
1 parent 5866567 commit 2ae8fa6

File tree

3 files changed

+19
-13
lines changed

3 files changed

+19
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
## 0.0.2
66

7-
Add ability to info start and end date from filters
7+
Add ability to infer start and end date from filters
88

99

1010
## 0.0.1

nbs/01_pgvectorizer.ipynb

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
" self.trigger_name_fn = client.QueryBuilder._quote_ident(trigger_name_fn) \n",
7373
"\n",
7474
"\n",
75-
" def register(self):\n",
75+
" def register(self): \n",
7676
" with psycopg2.connect(self.service_url) as conn:\n",
7777
" with conn.cursor() as cursor:\n",
7878
" cursor.execute(f\"\"\"\n",
@@ -81,9 +81,7 @@
8181
" table_exists = cursor.fetchone()[0]\n",
8282
" if table_exists:\n",
8383
" return\n",
84-
" \n",
85-
" with psycopg2.connect(self.service_url) as conn:\n",
86-
" with conn.cursor() as cursor:\n",
84+
" \n",
8785
" cursor.execute(f\"\"\"\n",
8886
" CREATE TABLE {self.schema_name}.{self.work_queue_table_name} (\n",
8987
" id int\n",
@@ -110,21 +108,26 @@
110108
" INSERT INTO {self.schema_name}.{self.work_queue_table_name} SELECT {self.id_column_name} FROM {self.schema_name}.{self.table_name};\n",
111109
" \"\"\")\n",
112110
"\n",
113-
" def process(self, embed_and_write_cb, batch_size:int=10, advisory_prefix=47859, autoregister=True):\n",
111+
" def process(self, embed_and_write_cb, batch_size:int=10, autoregister=True):\n",
114112
" if autoregister:\n",
115113
" self.register()\n",
116114
" \n",
117115
" with psycopg2.connect(self.service_url) as conn:\n",
118116
" with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:\n",
119117
" cursor.execute(f\"\"\"\n",
118+
" SELECT to_regclass('{self.schema_name}.{self.work_queue_table_name}')::oid; \n",
119+
" \"\"\")\n",
120+
" table_oid = cursor.fetchone()[0]\n",
121+
" \n",
122+
" cursor.execute(f\"\"\"\n",
120123
" WITH selected_rows AS (\n",
121124
" SELECT id\n",
122125
" FROM {self.schema_name}.{self.work_queue_table_name}\n",
123126
" LIMIT {int(batch_size)}\n",
124127
" FOR UPDATE SKIP LOCKED\n",
125128
" ), \n",
126129
" locked_items AS (\n",
127-
" SELECT id, pg_try_advisory_xact_lock({int(advisory_prefix)}, id) AS locked\n",
130+
" SELECT id, pg_try_advisory_xact_lock({int(table_oid)}, id) AS locked\n",
128131
" FROM (SELECT DISTINCT id FROM selected_rows ORDER BY id) as ids\n",
129132
" ),\n",
130133
" deleted_rows AS (\n",

timescale_vector/pgvectorizer.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def __init__(self,
4545
self.trigger_name_fn = client.QueryBuilder._quote_ident(trigger_name_fn)
4646

4747

48-
def register(self):
48+
def register(self):
4949
with psycopg2.connect(self.service_url) as conn:
5050
with conn.cursor() as cursor:
5151
cursor.execute(f"""
@@ -54,9 +54,7 @@ def register(self):
5454
table_exists = cursor.fetchone()[0]
5555
if table_exists:
5656
return
57-
58-
with psycopg2.connect(self.service_url) as conn:
59-
with conn.cursor() as cursor:
57+
6058
cursor.execute(f"""
6159
CREATE TABLE {self.schema_name}.{self.work_queue_table_name} (
6260
id int
@@ -83,12 +81,17 @@ def register(self):
8381
INSERT INTO {self.schema_name}.{self.work_queue_table_name} SELECT {self.id_column_name} FROM {self.schema_name}.{self.table_name};
8482
""")
8583

86-
def process(self, embed_and_write_cb, batch_size:int=10, advisory_prefix=47859, autoregister=True):
84+
def process(self, embed_and_write_cb, batch_size:int=10, autoregister=True):
8785
if autoregister:
8886
self.register()
8987

9088
with psycopg2.connect(self.service_url) as conn:
9189
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
90+
cursor.execute(f"""
91+
SELECT to_regclass('{self.schema_name}.{self.work_queue_table_name}')::oid;
92+
""")
93+
table_oid = cursor.fetchone()[0]
94+
9295
cursor.execute(f"""
9396
WITH selected_rows AS (
9497
SELECT id
@@ -97,7 +100,7 @@ def process(self, embed_and_write_cb, batch_size:int=10, advisory_prefix=47859,
97100
FOR UPDATE SKIP LOCKED
98101
),
99102
locked_items AS (
100-
SELECT id, pg_try_advisory_xact_lock({int(advisory_prefix)}, id) AS locked
103+
SELECT id, pg_try_advisory_xact_lock({int(table_oid)}, id) AS locked
101104
FROM (SELECT DISTINCT id FROM selected_rows ORDER BY id) as ids
102105
),
103106
deleted_rows AS (

0 commit comments

Comments
 (0)