Conversation
- Create PGMQConfig dataclass for centralized configuration management - Implement BaseQueue with common initialization and logging setup - Support both legacy kwargs and modern config object patterns - Add backward compatibility properties for existing code
- Extract all SQL queries from inline code to dedicated module - Add helper functions for dynamic SQL selection based on parameters - Support all PGMQ extension features: standard ops, topics, FIFO, notifications - Enable code reuse between sync and async implementations
…ue.py) - Implement complete sync client using psycopg3 and ConnectionPool - Add topic-based routing: send_topic, bind_topic, test_routing, etc. - Add FIFO operations: read_grouped, read_grouped_rr with poll variants - Add notification management: enable/disable/update notify - Add batch set_vt support - Fix pop() to properly handle qty parameter - Maintain 100% backward compatibility with existing API
…eue.py) - Implement async client using asyncpg with identical API to sync version - Add init() method for async pool initialization - Add close() method for proper cleanup - Mirror all sync features: topics, FIFO, notifications, batch operations - Use async_transaction decorator for transaction management - Return AsyncPGMQueue alias in public API
- Create @transaction for sync operations with psycopg3 - Create @async_transaction for async operations with asyncpg - Implement automatic connection injection and transaction management - Support nested transactions via conn parameter detection - Handle commit/rollback automatically with proper error propagation
…init__.py) - Export SyncPGMQueue and AsyncPGMQueue with clear naming - Maintain PGMQueue alias pointing to sync version for compatibility - Export all message dataclasses including new types - Keep existing decorator and logging exports - Add __version__ for package version tracking
- Mark package as fully typed for mypy and other type checkers - Enable better IDE autocomplete and type checking for users
- Replace complex handler removal logic with caching system - Add LoggingManager for centralized logger creation - Support both stdlib logging and loguru backends - Implement structured logging with JSON output option - Maintain backward compatible create_logger() function - Add log_with_context for structured context logging - Add log_performance decorator for timing
…ssages.py) - Add missing fields: last_read_at, headers, queue_visible_length - Create new dataclasses: QueueRecord, TopicBinding, RoutingResult, NotificationThrottle - Add BatchTopicResult for batch topic operations - Implement from_row factory methods for database result parsing - Maintain backward compatibility with existing Message usage
…ibility - Add __str__ method returning queue_name for string conversion - Remove redundant docstring from from_row classmethod - Ensures QueueRecord instances print queue name when converted to string
- Remove property methods that proxied `self.config` attributes - Removed properties: host, port, database, username, password, delay, vt, pool_size, verbose, log_filename - Clean up module comment and simplify BaseQueue class docstring - Enforces direct access to configuration via `self.config` BREAKING CHANGE: Accessing configuration properties directly on the queue instance (e.g., `queue.host`) is no longer supported. Use `queue.config.host` instead.
…patibility Major Changes: - Add _convert_sql() to convert psycopg %s placeholders to asyncpg , ... style - Integrate orjson for explicit JSON/JSONB serialization in all message operations - Implement __post_init__ to build PGMQConfig from backward-compatible fields - Apply explicit type casting for vt parameter to resolve ambiguous function errors Backward Compatibility: - Preserve direct field access (host, port, database, etc.) on PGMQueue instance - Add tz parameter as alias for delay in send operations - Add read_batch() method as alias for read() with batch_size - Emit UserWarning when list_queues() returns QueueRecord objects instead of strings - Maintain existing method signatures where possible Code Quality: - Remove redundant module comment lines - Update docstrings with version change notes - Improve validate_queue_name() to raise exception on invalid names - Clean up imports and organize field definitions BREAKING CHANGE: list_queues() now returns List[QueueRecord] instead of List[str]. Access queue names via the .queue_name attribute. A deprecation warning is emitted. BREAKING CHANGE: Configuration should be accessed via queue.config.* instead of queue.* directly. Direct field access is maintained for backward compatibility but may be removed in future versions.
Major Changes: - Implement __post_init__ to build PGMQConfig from backward-compatible fields - Preserve direct field access (host, port, database, etc.) on PGMQueue instance - Simplify all method docstrings by removing verbose Args/Returns sections - Add read_batch() method as backward compatibility alias for read() Backward Compatibility: - Add tz parameter as alias for delay in send operations - Add read_batch() method as alias for read() with batch_size parameter - Emit UserWarning when list_queues() returns QueueRecord objects instead of strings - Maintain existing method signatures where possible - Keep deprecated detach_archive() method as no-op for compatibility Code Quality: - Remove redundant module comment line - Update list_queues() docstring with version change notes - Improve validate_queue_name() to raise exception on invalid names - Clean up imports and organize field definitions with clear sections - Standardize docstring format across all public methods Breaking Changes: - list_queues() now returns List[QueueRecord] instead of List[str]. Access queue names via the .queue_name attribute. A deprecation warning is emitted. - Configuration should be accessed via queue.config.* instead of queue.* directly. Direct field access is maintained for backward compatibility but may be removed in future versions. - validate_queue_name() now raises exception on invalid names instead of returning False. Update error handling accordingly.
…urn type Test Reorganization: - Create tests/backward_compatibility/v1/ directory structure - Move test_integration.py to tests/backward_compatibility/v1/test_integration.py - Move test_async_integration.py to tests/backward_compatibility/v1/test_async_integration.py - Add __init__.py files for proper package structure Test Updates: - Update test_list_queues() in test_integration.py to handle QueueRecord objects - Update test_list_queues() in test_async_integration.py to handle QueueRecord objects - Change assertions from string comparison to .queue_name attribute access - Add comments documenting the return type change Backward Compatibility: - Tests now reflect v2.0.0 API where list_queues() returns List[QueueRecord] - Preserves test coverage for core queue operations under v1 compatibility folder
…cations
Add SyncNotificationListener and AsyncNotificationListener classes to enable
real-time message notifications using PostgreSQL's NOTIFY/LISTEN mechanism.
- SyncNotificationListener: blocking listener using psycopg3 with graceful
shutdown via connection close to interrupt notifies() generator
- AsyncNotificationListener: asyncio-based listener using asyncpg with
internal queue for callback dispatch
- Both support channel naming convention `pgmq_insert_{queue_name}`
- Includes comprehensive test suite for logging isolation and features
…-queue-notifications feat: add PostgreSQL LISTEN/NOTIFY support for real-time queue notifications
Summary of ChangesHello @tavallaie, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request represents a major version update for the PGMQ Python client, focusing on a comprehensive architectural refactor to enhance modularity, maintainability, and extensibility. It centralizes SQL operations, introduces a robust configuration and logging system, and expands the client's capabilities to fully leverage advanced PGMQ extension features like topic routing and notifications. The changes aim to provide a more stable, type-safe, and feature-rich experience for developers. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a major refactoring for version 2.0.0, significantly improving the library's structure, features, and maintainability. Key improvements include a unified configuration model, centralized SQL queries, the introduction of a base class for sync and async clients, and a complete overhaul of the logging system. The addition of new features like topic-based routing, FIFO queues, and notification listeners is well-supported by an extensive new test suite. The code is much clearer and more robust. I've included a couple of suggestions to further improve the robustness of SQL queries, particularly for overloaded functions, to prevent potential ambiguity and enhance maintainability.
| SEND_TOPIC = "SELECT pgmq.send_topic(%s::text, %s::jsonb);" | ||
| SEND_TOPIC_WITH_HEADERS = "SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::jsonb);" | ||
| SEND_TOPIC_WITH_DELAY_INT = "SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::integer);" | ||
| SEND_TOPIC_WITH_HEADERS_DELAY_INT = ( | ||
| "SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::jsonb, %s::integer);" | ||
| ) | ||
|
|
||
| SEND_BATCH_TOPIC = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[]);" | ||
| SEND_BATCH_TOPIC_WITH_HEADERS = ( | ||
| "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[]);" | ||
| ) | ||
| SEND_BATCH_TOPIC_WITH_DELAY_INT = ( | ||
| "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::integer);" | ||
| ) | ||
| SEND_BATCH_TOPIC_WITH_DELAY_TZ = ( | ||
| "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::timestamptz);" | ||
| ) | ||
| SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_INT = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[], %s::integer);" | ||
| SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_TZ = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[], %s::timestamptz);" |
There was a problem hiding this comment.
The queries for send_topic and send_batch_topic use positional arguments. This is inconsistent with the non-topic send functions that use named arguments (e.g., queue_name=>..., msg=>...). Using named arguments for all function calls improves readability and makes the code more robust against changes in parameter order or function overloads.
| SEND_TOPIC = "SELECT pgmq.send_topic(%s::text, %s::jsonb);" | |
| SEND_TOPIC_WITH_HEADERS = "SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::jsonb);" | |
| SEND_TOPIC_WITH_DELAY_INT = "SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::integer);" | |
| SEND_TOPIC_WITH_HEADERS_DELAY_INT = ( | |
| "SELECT pgmq.send_topic(%s::text, %s::jsonb, %s::jsonb, %s::integer);" | |
| ) | |
| SEND_BATCH_TOPIC = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[]);" | |
| SEND_BATCH_TOPIC_WITH_HEADERS = ( | |
| "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[]);" | |
| ) | |
| SEND_BATCH_TOPIC_WITH_DELAY_INT = ( | |
| "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::integer);" | |
| ) | |
| SEND_BATCH_TOPIC_WITH_DELAY_TZ = ( | |
| "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::timestamptz);" | |
| ) | |
| SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_INT = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[], %s::integer);" | |
| SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_TZ = "SELECT * FROM pgmq.send_batch_topic(%s::text, %s::jsonb[], %s::jsonb[], %s::timestamptz);" | |
| SEND_TOPIC = "SELECT pgmq.send_topic(routing_key=>%s::text, msg=>%s::jsonb);" | |
| SEND_TOPIC_WITH_HEADERS = "SELECT pgmq.send_topic(routing_key=>%s::text, msg=>%s::jsonb, headers=>%s::jsonb);" | |
| SEND_TOPIC_WITH_DELAY_INT = "SELECT pgmq.send_topic(routing_key=>%s::text, msg=>%s::jsonb, delay=>%s::integer);" | |
| SEND_TOPIC_WITH_HEADERS_DELAY_INT = ( | |
| "SELECT pgmq.send_topic(routing_key=>%s::text, msg=>%s::jsonb, headers=>%s::jsonb, delay=>%s::integer);" | |
| ) | |
| SEND_BATCH_TOPIC = "SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[]);" | |
| SEND_BATCH_TOPIC_WITH_HEADERS = ( | |
| "SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[], headers=>%s::jsonb[]);" | |
| ) | |
| SEND_BATCH_TOPIC_WITH_DELAY_INT = ( | |
| "SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[], delay=>%s::integer);" | |
| ) | |
| SEND_BATCH_TOPIC_WITH_DELAY_TZ = ( | |
| "SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[], delay=>%s::timestamptz);" | |
| ) | |
| SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_INT = "SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[], headers=>%s::jsonb[], delay=>%s::integer);" | |
| SEND_BATCH_TOPIC_WITH_HEADERS_DELAY_TZ = "SELECT * FROM pgmq.send_batch_topic(routing_key=>%s::text, msgs=>%s::jsonb[], headers=>%s::jsonb[], delay=>%s::timestamptz);" |
| SET_VT = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers | ||
| FROM pgmq.set_vt(queue_name=>%s::text, msg_id=>%s::bigint, vt=>%s);""" | ||
|
|
||
| SET_VT_BATCH = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers | ||
| FROM pgmq.set_vt(queue_name=>%s::text, msg_ids=>%s::bigint[], vt=>%s);""" |
There was a problem hiding this comment.
The set_vt function in PostgreSQL is overloaded to accept either an integer or a timestamptz for the visibility timeout. Relying on a single SQL string for both can lead to ambiguity and requires fragile string manipulation in the client code. To make the queries more robust and explicit, it's better to define separate constants for each type. The client code in queue.py and async_queue.py will need to be updated to select the appropriate constant.
| SET_VT = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers | |
| FROM pgmq.set_vt(queue_name=>%s::text, msg_id=>%s::bigint, vt=>%s);""" | |
| SET_VT_BATCH = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers | |
| FROM pgmq.set_vt(queue_name=>%s::text, msg_ids=>%s::bigint[], vt=>%s);""" | |
| SET_VT_INT = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers | |
| FROM pgmq.set_vt(queue_name=>%s::text, msg_id=>%s::bigint, vt=>%s::integer);""" | |
| SET_VT_TZ = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers | |
| FROM pgmq.set_vt(queue_name=>%s::text, msg_id=>%s::bigint, vt=>%s::timestamptz);""" | |
| SET_VT_BATCH_INT = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers | |
| FROM pgmq.set_vt(queue_name=>%s::text, msg_ids=>%s::bigint[], vt=>%s::integer);""" | |
| SET_VT_BATCH_TZ = """SELECT msg_id, read_ct, enqueued_at, last_read_at, vt, message, headers | |
| FROM pgmq.set_vt(queue_name=>%s::text, msg_ids=>%s::bigint[], vt=>%s::timestamptz);""" |
No description provided.