Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3c8667d
feat(base): add shared configuration and base classes (base.py)
tavallaie Feb 24, 2026
223b2f2
feat(sql): add centralized SQL templates module (_sql.py)
tavallaie Feb 24, 2026
ece8aaf
feat(queue): refactor sync client with full PGMQ feature support (que…
tavallaie Feb 24, 2026
919ae09
feat(async_queue): add complete async client implementation (async_qu…
tavallaie Feb 24, 2026
601844b
feat(decorators): add unified transaction decorators (decorators.py)
tavallaie Feb 24, 2026
abef4b0
feat(init): update public API exports with backward compatibility (__…
tavallaie Feb 24, 2026
0b442d2
chore(types): add PEP 561 typed package marker (py.typed)
tavallaie Feb 24, 2026
3d4e8f6
feat(logger): refactor logging with idempotent configuration (logger.py)
tavallaie Feb 24, 2026
6acd004
feat(messages): update dataclasses to match PGMQ extension schema (me…
tavallaie Feb 24, 2026
1b85ce0
feat(Makefile): update pgmq container version
tavallaie Feb 24, 2026
1ae83f2
feat(messages): add __str__ method to QueueRecord for backward compat…
tavallaie Feb 24, 2026
74bf999
feat(base.py): remove backward compatibility property proxies
tavallaie Feb 24, 2026
abc5421
feat(async_queue.py): refactor async implementation with backward com…
tavallaie Feb 24, 2026
9fc0491
feat(queue.py): refactor sync implementation with backward compatibility
tavallaie Feb 24, 2026
b952494
feat(tests): reorganize test structure and update for QueueRecord ret…
tavallaie Feb 24, 2026
3babf8d
chore: style
tavallaie Feb 24, 2026
19477f4
feat: add PostgreSQL LISTEN/NOTIFY support for real-time queue notifi…
tavallaie Feb 24, 2026
52af30d
Merge pull request #1 from tavallaie/3-client-side-implementation-for…
tavallaie Feb 24, 2026
9060b41
feat: complete V2 unit tests
tavallaie Feb 24, 2026
b33525b
Merge branch 'v2.0.0' of github.com:tavallaie/pgmq-py into v2.0.0
tavallaie Feb 24, 2026
313485a
Merge branch 'main' into v2.0.0
tavallaie Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ clear-postgres:
docker rm -f pgmq-postgres || true

run-pgmq-postgres:
docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 ghcr.io/pgmq/pg17-pgmq:latest
docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 ghcr.io/pgmq/pg18-pgmq:v1.10.0

test: clear-postgres run-pgmq-postgres
sleep 10 # Give PostgreSQL time to start
Expand Down
43 changes: 41 additions & 2 deletions src/pgmq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,54 @@
# src/pgmq/__init__.py
"""
PGMQ Python Client - A Python client for the PGMQ PostgreSQL extension.

from pgmq.queue import Message, PGMQueue # type: ignore
This package provides both synchronous and asynchronous clients for interacting
with PGMQ (Postgres Message Queue) functionality.
"""

# Core message types
from pgmq.messages import (
Message,
QueueMetrics,
QueueRecord,
TopicBinding,
RoutingResult,
NotificationThrottle,
)

# Client classes
from pgmq.queue import PGMQueue as SyncPGMQueue
from pgmq.async_queue import PGMQueue as AsyncPGMQueue

# Decorators
from pgmq.decorators import transaction, async_transaction

# Logging utilities
from pgmq.logger import PGMQLogger, create_logger, log_performance

# Backward compatibility: PGMQueue points to sync version
PGMQueue = SyncPGMQueue

__version__ = "0.5.0"
__all__ = [
# Clients
"PGMQueue", # Sync (backward compatible alias)
"SyncPGMQueue", # Explicit sync
"AsyncPGMQueue", # Async (clear naming)
# Data classes
"Message",
"PGMQueue",
"QueueMetrics",
"QueueRecord",
"TopicBinding",
"RoutingResult",
"NotificationThrottle",
# Decorators
"transaction",
"async_transaction",
# Logging
"PGMQLogger",
"create_logger",
"log_performance",
# Version
"__version__",
]
244 changes: 244 additions & 0 deletions src/pgmq/_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# src/pgmq/_sql.py
"""
Centralized SQL templates for PGMQ operations.

This module contains all SQL queries used by the PGMQ client, ensuring
consistency between sync and async implementations and making the code
easier to maintain and audit.
"""


# ============================================================================
# Queue Management
# ============================================================================

CREATE_QUEUE = "SELECT pgmq.create(%s);"
CREATE_UNLOGGED_QUEUE = "SELECT pgmq.create_unlogged(%s);"
CREATE_PARTITIONED_QUEUE = "SELECT pgmq.create_partitioned(%s, %s::text, %s::text);"
CREATE_NON_PARTITIONED = "SELECT pgmq.create_non_partitioned(%s);"
DROP_QUEUE = "SELECT pgmq.drop_queue(%s);"
LIST_QUEUES = "SELECT queue_name, created_at, is_partitioned, is_unlogged FROM pgmq.list_queues();"
VALIDATE_QUEUE_NAME = "SELECT pgmq.validate_queue_name(%s);"

# ============================================================================
# Sending Messages
# ============================================================================

SEND = "SELECT * FROM pgmq.send(queue_name=>%s::text, msg=>%s::jsonb);"
SEND_WITH_DELAY_INT = (
"SELECT * FROM pgmq.send(queue_name=>%s::text, msg=>%s::jsonb, delay=>%s::integer);"
)
SEND_WITH_DELAY_TZ = "SELECT * FROM pgmq.send(queue_name=>%s::text, msg=>%s::jsonb, delay=>%s::timestamptz);"
SEND_WITH_HEADERS = (
"SELECT * FROM pgmq.send(queue_name=>%s::text, msg=>%s::jsonb, headers=>%s::jsonb);"
)
SEND_WITH_HEADERS_DELAY_INT = "SELECT * FROM pgmq.send(queue_name=>%s::text, msg=>%s::jsonb, headers=>%s::jsonb, delay=>%s::integer);"
SEND_WITH_HEADERS_DELAY_TZ = "SELECT * FROM pgmq.send(queue_name=>%s::text, msg=>%s::jsonb, headers=>%s::jsonb, delay=>%s::timestamptz);"

SEND_BATCH = "SELECT * FROM pgmq.send_batch(queue_name=>%s::text, msgs=>%s::jsonb[]);"
SEND_BATCH_WITH_DELAY_INT = "SELECT * FROM pgmq.send_batch(queue_name=>%s::text, msgs=>%s::jsonb[], delay=>%s::integer);"
SEND_BATCH_WITH_DELAY_TZ = "SELECT * FROM pgmq.send_batch(queue_name=>%s::text, msgs=>%s::jsonb[], delay=>%s::timestamptz);"
SEND_BATCH_WITH_HEADERS = "SELECT * FROM pgmq.send_batch(queue_name=>%s::text, msgs=>%s::jsonb[], headers=>%s::jsonb[]);"
SEND_BATCH_WITH_HEADERS_DELAY_INT = "SELECT * FROM pgmq.send_batch(queue_name=>%s::text, msgs=>%s::jsonb[], headers=>%s::jsonb[], delay=>%s::integer);"
SEND_BATCH_WITH_HEADERS_DELAY_TZ = "SELECT * FROM pgmq.send_batch(queue_name=>%s::text, msgs=>%s::jsonb[], headers=>%s::jsonb[], delay=>%s::timestamptz);"

# ============================================================================
# Topic-Based Routing
# ============================================================================

SEND_TOPIC = "SELECT pgmq.send_topic(%s::text, %s::jsonb);"
SEND_TOPIC_WITH_HEADERS = "SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::jsonb);"
SEND_TOPIC_WITH_DELAY_INT = "SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::integer);"
SEND_TOPIC_WITH_HEADERS_DELAY_INT = (
"SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::jsonb, %s::integer);"
)

SEND_BATCH_TOPIC = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[]);"
SEND_BATCH_TOPIC_WITH_HEADERS = (
"SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[]);"
)
SEND_BATCH_TOPIC_WITH_DELAY_INT = (
"SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::integer);"
)
SEND_BATCH_TOPIC_WITH_DELAY_TZ = (
"SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::timestamptz);"
)
SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_INT = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[], %s::integer);"
SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_TZ = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[], %s::timestamptz);"
Comment on lines +49 to +67
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The queries for send_topic and send_batch_topic use positional arguments. This is inconsistent with the non-topic send functions that use named arguments (e.g., queue_name=>..., msg=>...). Using named arguments for all function calls improves readability and makes the code more robust against changes in parameter order or function overloads.

Suggested change
SEND_TOPIC = "SELECT pgmq.send_topic(%s::text, %s::jsonb);"
SEND_TOPIC_WITH_HEADERS = "SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::jsonb);"
SEND_TOPIC_WITH_DELAY_INT = "SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::integer);"
SEND_TOPIC_WITH_HEADERS_DELAY_INT = (
"SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::jsonb, %s::integer);"
)
SEND_BATCH_TOPIC = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[]);"
SEND_BATCH_TOPIC_WITH_HEADERS = (
"SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[]);"
)
SEND_BATCH_TOPIC_WITH_DELAY_INT = (
"SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::integer);"
)
SEND_BATCH_TOPIC_WITH_DELAY_TZ = (
"SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::timestamptz);"
)
SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_INT = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[], %s::integer);"
SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_TZ = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[], %s::timestamptz);"
SEND_TOPIC = "SELECT pgmq.send_topic(routing_key=>%s::text, msg=>%s::jsonb);"
SEND_TOPIC_WITH_HEADERS = "SELECT pgmq.send_topic(routing_key=>%s::text, msg=>%s::jsonb, headers=>%s::jsonb);"
SEND_TOPIC_WITH_DELAY_INT = "SELECT pgmq.send_topic(routing_key=>%s::text, msg=>%s::jsonb, delay=>%s::integer);"
SEND_TOPIC_WITH_HEADERS_DELAY_INT = (
"SELECT pgmq.send_topic(routing_key=>%s::text, msg=>%s::jsonb, headers=>%s::jsonb, delay=>%s::integer);"
)
SEND_BATCH_TOPIC = "SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[]);"
SEND_BATCH_TOPIC_WITH_HEADERS = (
"SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[], headers=>%s::jsonb[]);"
)
SEND_BATCH_TOPIC_WITH_DELAY_INT = (
"SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[], delay=>%s::integer);"
)
SEND_BATCH_TOPIC_WITH_DELAY_TZ = (
"SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[], delay=>%s::timestamptz);"
)
SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_INT = "SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[], headers=>%s::jsonb[], delay=>%s::integer);"
SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_TZ = "SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[], headers=>%s::jsonb[], delay=>%s::timestamptz);"


BIND_TOPIC = "SELECT pgmq.bind_topic(%s::text, %s::text);"
UNBIND_TOPIC = "SELECT pgmq.unbind_topic(%s::text, %s::text);"
LIST_TOPIC_BINDINGS = "SELECT pattern, queue_name, bound_at, compiled_regex FROM pgmq.list_topic_bindings();"
LIST_TOPIC_BINDINGS_FOR_QUEUE = "SELECT pattern, queue_name, bound_at, compiled_regex FROM pgmq.list_topic_bindings(%s);"
TEST_ROUTING = "SELECT pattern, queue_name, compiled_regex FROM pgmq.test_routing(%s);"

# ============================================================================
# Reading Messages
# ============================================================================

READ = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.read(queue_name=>%s::text, vt=>%s::integer, qty=>%s::integer);"""

READ_WITH_POLL = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.read_with_poll(queue_name=>%s::text, vt=>%s::integer, qty=>%s::integer,
max_poll_seconds=>%s::integer, poll_interval_ms=>%s::integer);"""

READ_CONDITIONAL = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.read(queue_name=>%s::text, vt=>%s::integer, qty=>%s::integer, conditional=>%s::jsonb);"""

READ_WITH_POLL_CONDITIONAL = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.read_with_poll(queue_name=>%s::text, vt=>%s::integer, qty=>%s::integer,
max_poll_seconds=>%s::integer, poll_interval_ms=>%s::integer, conditional=>%s::jsonb);"""

# ============================================================================
# FIFO Reading
# ============================================================================

READ_GROUPED = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.read_grouped(queue_name=>%s::text, vt=>%s::integer, qty=>%s::integer);"""

READ_GROUPED_WITH_POLL = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.read_grouped_with_poll(queue_name=>%s::text, vt=>%s::integer, qty=>%s::integer,
max_poll_seconds=>%s::integer, poll_interval_ms=>%s::integer);"""

READ_GROUPED_RR = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.read_grouped_rr(queue_name=>%s::text, vt=>%s::integer, qty=>%s::integer);"""

READ_GROUPED_RR_WITH_POLL = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.read_grouped_rr_with_poll(queue_name=>%s::text, vt=>%s::integer, qty=>%s::integer,
max_poll_seconds=>%s::integer, poll_interval_ms=>%s::integer);"""

# ============================================================================
# Pop
# ============================================================================

POP = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.pop(queue_name=>%s::text, qty=>%s::integer);"""

# ============================================================================
# Deleting/Archiving
# ============================================================================

DELETE = "SELECT pgmq.delete(queue_name=>%s::text, msg_id=>%s::bigint);"
DELETE_BATCH = "SELECT * FROM pgmq.delete(queue_name=>%s::text, msg_ids=>%s::bigint[]);"
ARCHIVE = "SELECT pgmq.archive(queue_name=>%s::text, msg_id=>%s::bigint);"
ARCHIVE_BATCH = (
"SELECT * FROM pgmq.archive(queue_name=>%s::text, msg_ids=>%s::bigint[]);"
)
PURGE_QUEUE = "SELECT pgmq.purge_queue(queue_name=>%s);"

# ============================================================================
# Visibility Timeout
# ============================================================================

SET_VT = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.set_vt(queue_name=>%s::text, msg_id=>%s::bigint, vt=>%s);"""

SET_VT_BATCH = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.set_vt(queue_name=>%s::text, msg_ids=>%s::bigint[], vt=>%s);"""
Comment on lines +134 to +138
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The set_vt function in PostgreSQL is overloaded to accept either an integer or a timestamptz for the visibility timeout. Relying on a single SQL string for both can lead to ambiguity and requires fragile string manipulation in the client code. To make the queries more robust and explicit, it's better to define separate constants for each type. The client code in queue.py and async_queue.py will need to be updated to select the appropriate constant.

Suggested change
SET_VT = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.set_vt(queue_name=>%s::text, msg_id=>%s::bigint, vt=>%s);"""
SET_VT_BATCH = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.set_vt(queue_name=>%s::text, msg_ids=>%s::bigint[], vt=>%s);"""
SET_VT_INT = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.set_vt(queue_name=>%s::text, msg_id=>%s::bigint, vt=>%s::integer);"""
SET_VT_TZ = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.set_vt(queue_name=>%s::text, msg_id=>%s::bigint, vt=>%s::timestamptz);"""
SET_VT_BATCH_INT = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.set_vt(queue_name=>%s::text, msg_ids=>%s::bigint[], vt=>%s::integer);"""
SET_VT_BATCH_TZ = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers
FROM pgmq.set_vt(queue_name=>%s::text, msg_ids=>%s::bigint[], vt=>%s::timestamptz);"""


# ============================================================================
# Metrics
# ============================================================================

METRICS = "SELECT * FROM pgmq.metrics(queue_name=>%s);"
METRICS_ALL = "SELECT * FROM pgmq.metrics_all();"

# ============================================================================
# Notifications
# ============================================================================

ENABLE_NOTIFY = "SELECT pgmq.enable_notify_insert(%s::text, %s::integer);"
DISABLE_NOTIFY = "SELECT pgmq.disable_notify_insert(%s::text);"
UPDATE_NOTIFY = "SELECT pgmq.update_notify_insert(%s::text, %s::integer);"
LIST_NOTIFY_THROTTLES = "SELECT queue_name, throttle_interval_ms, last_notified_at FROM pgmq.list_notify_insert_throttles();"

# ============================================================================
# Utilities
# ============================================================================

VALIDATE_ROUTING_KEY = "SELECT pgmq.validate_routing_key(%s);"
VALIDATE_TOPIC_PATTERN = "SELECT pgmq.validate_topic_pattern(%s);"
CREATE_FIFO_INDEX = "SELECT pgmq.create_fifo_index(%s);"
CREATE_FIFO_INDEXES_ALL = "SELECT pgmq.create_fifo_indexes_all();"
CONVERT_ARCHIVE_PARTITIONED = "SELECT pgmq.convert_archive_partitioned(%s, %s, %s, %s);"
DETACH_ARCHIVE = "SELECT pgmq.detach_archive(%s);"


def get_send_sql(
headers: bool = False,
delay: bool = False,
delay_is_timestamp: bool = False,
) -> str:
"""Get appropriate send SQL based on parameters."""
if headers and delay:
return (
SEND_WITH_HEADERS_DELAY_TZ
if delay_is_timestamp
else SEND_WITH_HEADERS_DELAY_INT
)
elif headers:
return SEND_WITH_HEADERS
elif delay:
return SEND_WITH_DELAY_TZ if delay_is_timestamp else SEND_WITH_DELAY_INT
return SEND


def get_send_batch_sql(
headers: bool = False,
delay: bool = False,
delay_is_timestamp: bool = False,
) -> str:
"""Get appropriate send_batch SQL based on parameters."""
if headers and delay:
return (
SEND_BATCH_WITH_HEADERS_DELAY_TZ
if delay_is_timestamp
else SEND_BATCH_WITH_HEADERS_DELAY_INT
)
elif headers:
return SEND_BATCH_WITH_HEADERS
elif delay:
return (
SEND_BATCH_WITH_DELAY_TZ
if delay_is_timestamp
else SEND_BATCH_WITH_DELAY_INT
)
return SEND_BATCH


def get_send_topic_sql(
headers: bool = False,
delay: bool = False,
) -> str:
"""Get appropriate send_topic SQL based on parameters."""
if headers and delay:
return SEND_TOPIC_WITH_HEADERS_DELAY_INT
elif headers:
return SEND_TOPIC_WITH_HEADERS
elif delay:
return SEND_TOPIC_WITH_DELAY_INT
return SEND_TOPIC


def get_send_batch_topic_sql(
headers: bool = False,
delay: bool = False,
delay_is_timestamp: bool = False,
) -> str:
"""Get appropriate send_batch_topic SQL based on parameters."""
if headers and delay:
return (
SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_TZ
if delay_is_timestamp
else SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_INT
)
elif headers:
return SEND_BATCH_TOPIC_WITH_HEADERS
elif delay:
return (
SEND_BATCH_TOPIC_WITH_DELAY_TZ
if delay_is_timestamp
else SEND_BATCH_TOPIC_WITH_DELAY_INT
)
return SEND_BATCH_TOPIC
Loading
Loading