Skip to content

Commit 09d4946

Browse files
Fix for handler for postgres (#3992)
* fix(kanon):updated connection cleanup to share 1 thread and added logs for connection leakage Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):refactor wallet and connection pool to use 1 thread Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):removed unecessary callback, added parellel opening to askar and kanon db Signed-off-by: Vinay Singh <vinay@verid.id> * style: applied ruff formatting Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):moved wallet db session to reuse profile session Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):added check to see the correct handler and changed storage to close soon Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):added explicit close to scan Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):added explicit close to scan Signed-off-by: Vinay Singh <vinay@verid.id> * style:applied ruff formating Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):added explicit release Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):test async context manager Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):added detailed logs Signed-off-by: Vinay Singh <vinay@verid.id> * style:applied ruff formating Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):corrected docker params Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):fixes for handlers in release_0_2.py Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):fix docker compose to be properly escaped Signed-off-by: Vinay Singh <vinay@verid.id> * fix:kanon test docker compose Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):session error handling Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon): test docker compose Signed-off-by: Vinay Singh <vinay@verid.id> * fix(kanon):fix handlers Signed-off-by: Vinay Singh <vinay@verid.id> * style:applied ruff formatting Signed-off-by: Vinay Singh <vinay@verid.id> --------- Signed-off-by: Vinay Singh <vinay@verid.id>
1 parent 80ededd commit 09d4946

File tree

5 files changed

+125
-29
lines changed

5 files changed

+125
-29
lines changed

acapy_agent/database_manager/databases/postgresql_normalized/database.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,9 @@ async def scan(
235235
handlers, _, _ = get_release(self.release_number, "postgresql")
236236

237237
handler = handlers.get(category, handlers["default"])
238+
# Update handler's schema_context to match database's schema_context
239+
if hasattr(handler, "set_schema_context"):
240+
handler.set_schema_context(self.schema_context)
238241
profile_id = await self._get_profile_id(profile or self.default_profile)
239242
tag_query = None
240243
if tag_filter:
@@ -282,6 +285,9 @@ async def scan_keyset(
282285
handlers, _, _ = get_release(self.release_number, "postgresql")
283286

284287
handler = handlers.get(category, handlers["default"])
288+
# Update handler's schema_context to match database's schema_context
289+
if hasattr(handler, "set_schema_context"):
290+
handler.set_schema_context(self.schema_context)
285291
profile_id = await self._get_profile_id(profile or self.default_profile)
286292
tag_query = None
287293
if tag_filter:

acapy_agent/database_manager/databases/postgresql_normalized/handlers/generic_handler.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,8 @@ def __init__(
4545
"""Initialize GenericHandler with category and database configuration."""
4646
super().__init__(category)
4747
self.schema_context = schema_context or SchemaContext()
48-
self.tags_table = self.schema_context.qualify_table(
49-
tags_table_name or "items_tags"
50-
)
48+
self._tags_table_name = tags_table_name or "items_tags" # Store unqualified name
49+
self.tags_table = self.schema_context.qualify_table(self._tags_table_name)
5150
self.encoder = encoder_factory.get_encoder(
5251
"postgresql",
5352
lambda x: x,
@@ -64,6 +63,33 @@ def __init__(
6463
self.tags_table,
6564
)
6665

66+
def set_schema_context(self, schema_context: SchemaContext) -> None:
67+
"""Update the schema context and re-qualify table names.
68+
69+
This method should be called when the handler is used with a different
70+
schema than the one it was initialized with (e.g., when handlers are
71+
created at module load time with a default schema).
72+
"""
73+
if (
74+
schema_context
75+
and schema_context.schema_name != self.schema_context.schema_name
76+
):
77+
self.schema_context = schema_context
78+
self.tags_table = self.schema_context.qualify_table(self._tags_table_name)
79+
# Recreate encoder with updated tags_table
80+
self.encoder = encoder_factory.get_encoder(
81+
"postgresql",
82+
lambda x: x,
83+
lambda x: x,
84+
normalized=False,
85+
tags_table=self.tags_table,
86+
)
87+
LOGGER.debug(
88+
"[set_schema_context] Updated schema_context to %s, tags_table=%s",
89+
self.schema_context,
90+
self.tags_table,
91+
)
92+
6793
def _validate_order_by(self, order_by: Optional[str]) -> None:
6894
if order_by and order_by not in self.ALLOWED_ORDER_BY_COLUMNS:
6995
LOGGER.error("[order_by] Invalid column: %s", order_by)
@@ -428,6 +454,13 @@ async def fetch(
428454
entry = Entry(category=category, name=name, value=item_value, tags=tags)
429455
LOGGER.debug("[%s] Returning entry: %s", operation_name, entry)
430456
return entry
457+
else:
458+
# No tag_filter - return entry with empty tags
459+
entry = Entry(category=category, name=name, value=item_value, tags={})
460+
LOGGER.debug(
461+
"[%s] Returning entry (no tag_filter): %s", operation_name, entry
462+
)
463+
return entry
431464

432465
async def fetch_all(
433466
self,

acapy_agent/database_manager/databases/postgresql_normalized/handlers/normalized_handler.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ def __init__(
106106
"""Initialize NormalizedHandler."""
107107
super().__init__(category)
108108
self.schema_context = schema_context or SchemaContext()
109-
self.table = self.schema_context.qualify_table(table_name or category)
109+
self._table_name = table_name or category # Store unqualified table name
110+
self.table = self.schema_context.qualify_table(self._table_name)
110111
self.columns = columns
111112
self.ALLOWED_ORDER_BY_COLUMNS = set(columns) | {"id", "name", "value"}
112113
self.encoder = encoder_factory.get_encoder(
@@ -123,6 +124,25 @@ def __init__(
123124

124125
self.EXPIRY_CLAUSE = "(i.expiry IS NULL OR i.expiry > CURRENT_TIMESTAMP)"
125126

127+
def set_schema_context(self, schema_context: SchemaContext) -> None:
128+
"""Update the schema context and re-qualify table names.
129+
130+
This method should be called when the handler is used with a different
131+
schema than the one it was initialized with (e.g., when handlers are
132+
created at module load time with a default schema).
133+
"""
134+
if (
135+
schema_context
136+
and schema_context.schema_name != self.schema_context.schema_name
137+
):
138+
self.schema_context = schema_context
139+
self.table = self.schema_context.qualify_table(self._table_name)
140+
LOGGER.debug(
141+
"[set_schema_context] Updated schema_context to %s, table=%s",
142+
self.schema_context,
143+
self.table,
144+
)
145+
126146
async def _ensure_utf8(self, _cursor: AsyncCursor) -> None:
127147
# UTF8 encoding is set via connection pool options (-c client_encoding=UTF8)
128148
# No need to execute SET here - would add unnecessary latency

acapy_agent/database_manager/databases/postgresql_normalized/session.py

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,20 @@ def __init__(
4949
self.profile_id = profile_id
5050
self.schema_context = database.schema_context
5151

52+
def _get_handler(self, category: str):
53+
"""Get a handler for the given category with the correct schema context.
54+
55+
Handlers are created at module load time with a default schema context.
56+
This method updates the handler's schema context to match the session's
57+
schema context before returning it.
58+
"""
59+
handlers, _, _ = get_release(self.release_number, "postgresql")
60+
handler = handlers.get(category, handlers["default"])
61+
# Update handler's schema context to match session's schema context
62+
if hasattr(handler, "set_schema_context"):
63+
handler.set_schema_context(self.schema_context)
64+
return handler
65+
5266
def _process_value(
5367
self, value: str | bytes, operation: str, name: str, category: str
5468
) -> str:
@@ -298,8 +312,7 @@ async def _cleanup_session(self):
298312

299313
async def count(self, category: str, tag_filter: str | dict = None) -> int:
300314
"""Count entries in a category."""
301-
handlers, _, _ = get_release(self.release_number, "postgresql")
302-
handler = handlers.get(category, handlers["default"])
315+
handler = self._get_handler(category)
303316
async with self.conn.cursor() as cursor:
304317
try:
305318
count = await handler.count(cursor, self.profile_id, category, tag_filter)
@@ -313,6 +326,10 @@ async def count(self, category: str, tag_filter: str | dict = None) -> int:
313326
if not self.is_txn:
314327
await self.conn.rollback()
315328
raise
329+
except DatabaseError:
330+
if not self.is_txn:
331+
await self.conn.rollback()
332+
raise
316333
except Exception as e:
317334
if not self.is_txn:
318335
await self.conn.rollback()
@@ -334,8 +351,7 @@ async def insert(
334351
expiry_ms: int = None,
335352
):
336353
"""Insert an entry."""
337-
handlers, _, _ = get_release(self.release_number, "postgresql")
338-
handler = handlers.get(category, handlers["default"])
354+
handler = self._get_handler(category)
339355
value = self._process_value(value, "insert", name, category)
340356
async with self.conn.cursor() as cursor:
341357
try:
@@ -351,6 +367,11 @@ async def insert(
351367
if not self.is_txn:
352368
await self.conn.rollback()
353369
raise
370+
except DatabaseError:
371+
# Re-raise DatabaseError as-is to preserve original error code
372+
if not self.is_txn:
373+
await self.conn.rollback()
374+
raise
354375
except Exception as e:
355376
if not self.is_txn:
356377
await self.conn.rollback()
@@ -374,8 +395,7 @@ async def fetch(
374395
for_update: bool = False,
375396
) -> Optional[Entry]:
376397
"""Fetch a single entry."""
377-
handlers, _, _ = get_release(self.release_number, "postgresql")
378-
handler = handlers.get(category, handlers["default"])
398+
handler = self._get_handler(category)
379399
async with self.conn.cursor() as cursor:
380400
try:
381401
result = await handler.fetch(
@@ -398,6 +418,10 @@ async def fetch(
398418
if not self.is_txn:
399419
await self.conn.rollback()
400420
raise
421+
except DatabaseError:
422+
if not self.is_txn:
423+
await self.conn.rollback()
424+
raise
401425
except Exception as e:
402426
if not self.is_txn:
403427
await self.conn.rollback()
@@ -423,8 +447,7 @@ async def fetch_all(
423447
descending: bool = False,
424448
) -> Sequence[Entry]:
425449
"""Fetch all entries matching criteria."""
426-
handlers, _, _ = get_release(self.release_number, "postgresql")
427-
handler = handlers.get(category, handlers["default"])
450+
handler = self._get_handler(category)
428451
async with self.conn.cursor() as cursor:
429452
try:
430453
results = await handler.fetch_all(
@@ -458,6 +481,10 @@ async def fetch_all(
458481
if not self.is_txn:
459482
await self.conn.rollback()
460483
raise
484+
except DatabaseError:
485+
if not self.is_txn:
486+
await self.conn.rollback()
487+
raise
461488
except Exception as e:
462489
if not self.is_txn:
463490
await self.conn.rollback()
@@ -479,8 +506,7 @@ async def replace(
479506
expiry_ms: int = None,
480507
):
481508
"""Replace an entry."""
482-
handlers, _, _ = get_release(self.release_number, "postgresql")
483-
handler = handlers.get(category, handlers["default"])
509+
handler = self._get_handler(category)
484510
value = self._process_value(value, "replace", name, category)
485511
async with self.conn.cursor() as cursor:
486512
try:
@@ -496,6 +522,10 @@ async def replace(
496522
if not self.is_txn:
497523
await self.conn.rollback()
498524
raise
525+
except DatabaseError:
526+
if not self.is_txn:
527+
await self.conn.rollback()
528+
raise
499529
except Exception as e:
500530
if not self.is_txn:
501531
await self.conn.rollback()
@@ -513,8 +543,7 @@ async def replace(
513543

514544
async def remove(self, category: str, name: str):
515545
"""Remove a single entry."""
516-
handlers, _, _ = get_release(self.release_number, "postgresql")
517-
handler = handlers.get(category, handlers["default"])
546+
handler = self._get_handler(category)
518547
async with self.conn.cursor() as cursor:
519548
try:
520549
await handler.remove(cursor, self.profile_id, category, name)
@@ -527,6 +556,10 @@ async def remove(self, category: str, name: str):
527556
if not self.is_txn:
528557
await self.conn.rollback()
529558
raise
559+
except DatabaseError:
560+
if not self.is_txn:
561+
await self.conn.rollback()
562+
raise
530563
except Exception as e:
531564
if not self.is_txn:
532565
await self.conn.rollback()
@@ -544,8 +577,7 @@ async def remove(self, category: str, name: str):
544577

545578
async def remove_all(self, category: str, tag_filter: str | dict = None) -> int:
546579
"""Remove all entries matching criteria."""
547-
handlers, _, _ = get_release(self.release_number, "postgresql")
548-
handler = handlers.get(category, handlers["default"])
580+
handler = self._get_handler(category)
549581
async with self.conn.cursor() as cursor:
550582
try:
551583
result = await handler.remove_all(
@@ -561,6 +593,10 @@ async def remove_all(self, category: str, tag_filter: str | dict = None) -> int:
561593
if not self.is_txn:
562594
await self.conn.rollback()
563595
raise
596+
except DatabaseError:
597+
if not self.is_txn:
598+
await self.conn.rollback()
599+
raise
564600
except Exception as e:
565601
if not self.is_txn:
566602
await self.conn.rollback()

scenarios/examples/kanon_issuance_and_presentation/docker-compose.yml

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
services:
1+
services:
22
kanon-postgres:
33
image: acapy-test
44
ports:
55
- "8031:8031"
66
- "8030:8030"
77
environment:
8-
- DB_USER=myuser
9-
- DB_PASSWORD=mypass
8+
- RUST_LOG=aries-askar::log::target=error
109
command: >
1110
start
1211
--endpoint http://kanon-postgres:8030
@@ -16,16 +15,16 @@
1615
--admin 0.0.0.0 8031
1716
--admin-insecure-mode
1817
--wallet-type kanon-anoncreds
19-
--wallet-storage-type postgres_storage
20-
--recreate-wallet
18+
--wallet-storage-type postgres
2119
--wallet-name kanon-postgres-normalized
2220
--wallet-key insecure
23-
--wallet-storage-config "{\"url\":\"wallet-db:5432\",\"max_connections\":100,\"min_idle_count\":5,\"max_idle\":10.0,\"max_lifetime\":7200.0}"
24-
--wallet-storage-creds "{\"account\":\"myuser\",\"password\":\"mypass\"}"
25-
--dbstore-storage-type postgres_storage
26-
--dbstore-storage-config "{\"url\":\"wallet-db:5432\",\"connection_timeout\":30.0,\"max_connections\":100,\"min_idle_count\":5,\"max_idle\":10.0,\"max_lifetime\":7200.0}"
27-
--dbstore-storage-creds "{\"account\":\"myuser\",\"password\":\"mypass\"}"
21+
--wallet-storage-config '{"url":"wallet-db:5432","max_connections":100,"min_idle_count":5,"max_idle":10.0,"max_lifetime":7200.0}'
22+
--wallet-storage-creds '{"account":"myuser","password":"mypass","admin_account":"myuser","admin_password":"mypass"}'
23+
--dbstore-storage-type postgres
24+
--dbstore-storage-config '{"url":"wallet-db:5432","connection_timeout":30.0,"max_connections":100,"min_idle_count":5,"max_idle":10.0,"max_lifetime":7200.0}'
25+
--dbstore-storage-creds '{"account":"myuser","password":"mypass","admin_account":"myuser","admin_password":"mypass"}'
2826
--dbstore-schema-config normalize
27+
--recreate-wallet
2928
--preserve-exchange-records
3029
--genesis-url http://test.bcovrin.vonx.io/genesis
3130
--tails-server-base-url http://tails:6543
@@ -61,6 +60,8 @@
6160
image: acapy-test
6261
ports:
6362
- "3002:3001"
63+
environment:
64+
- RUST_LOG=aries-askar::log::target=error
6465
command: >
6566
start
6667
--label Bob
@@ -126,4 +127,4 @@
126127
interval: 10s
127128
retries: 5
128129
start_period: 30s
129-
timeout: 10s
130+
timeout: 10s

0 commit comments

Comments
 (0)