Skip to content

Commit fc6b95a

Browse files
committed
Fix the retry queue still stuck after retry success and logging level not adjustable
1 parent b8ed25d commit fc6b95a

File tree

6 files changed

+393
-18
lines changed

6 files changed

+393
-18
lines changed

backend/app.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,32 @@
44
from flask_cors import CORS
55
import json, logging, os, re
66
from werkzeug.exceptions import HTTPException
7+
from config import LOG_LEVEL, LOG_EXCLUDE_LEVELS, LOG_FORMAT, LOG_DATE_FORMAT
8+
9+
class ConfigurableLogFilter(logging.Filter):
10+
"""Filter out specific log levels based on configuration."""
11+
def __init__(self, exclude_levels_str):
12+
super().__init__()
13+
# Parse excluded levels from config (e.g., "WARNING,DEBUG" -> [30, 10])
14+
self.excluded_levels = set()
15+
if exclude_levels_str:
16+
for level_name in exclude_levels_str.split(','):
17+
level_name = level_name.strip().upper()
18+
if level_name and hasattr(logging, level_name):
19+
self.excluded_levels.add(getattr(logging, level_name))
20+
21+
def filter(self, record):
22+
return record.levelno not in self.excluded_levels
23+
24+
logging.basicConfig(
25+
level=getattr(logging, LOG_LEVEL, logging.INFO),
26+
format=LOG_FORMAT,
27+
datefmt=LOG_DATE_FORMAT
28+
)
29+
30+
log_filter = ConfigurableLogFilter(LOG_EXCLUDE_LEVELS)
31+
for handler in logging.root.handlers:
32+
handler.addFilter(log_filter)
733

834
from services.db import redis_client
935
from services.canvas_counter import get_canvas_draw_count

backend/config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,10 @@
7171
RATE_LIMIT_SEARCH_MINUTE = int(os.getenv("RATE_LIMIT_SEARCH_MINUTE", "30"))
7272

7373
# Burst protection
74-
RATE_LIMIT_BURST_SECOND = int(os.getenv("RATE_LIMIT_BURST_SECOND", "10"))
74+
RATE_LIMIT_BURST_SECOND = int(os.getenv("RATE_LIMIT_BURST_SECOND", "10"))
75+
76+
# Logging Configuration
77+
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() # DEBUG, INFO, WARNING, ERROR, CRITICAL
78+
LOG_EXCLUDE_LEVELS = os.getenv("LOG_EXCLUDE_LEVELS", "WARNING") # Comma-separated: WARNING,DEBUG
79+
LOG_FORMAT = os.getenv("LOG_FORMAT", "%(asctime)s [%(levelname)s] %(name)s:%(lineno)d – %(message)s")
80+
LOG_DATE_FORMAT = os.getenv("LOG_DATE_FORMAT", "%Y-%m-%d %H:%M:%S")

backend/incubator-resilientdb-resilient-python-cache/example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async def main():
6161
)
6262

6363
resilient_db_config = ResilientDBConfig(
64-
base_url="resilientdb://crow.resilientdb.com",
64+
base_url="resilientdb://dev-crow.resilientdb.com",
6565
http_secure=True,
6666
ws_secure=True
6767
)

backend/routes/rooms.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1738,11 +1738,6 @@ def get_strokes(roomId):
17381738

17391739
logger.warning(f"=" * 80)
17401740

1741-
if filtered_strokes:
1742-
logger.info(f"GET strokes debug - returning {len(filtered_strokes)} strokes")
1743-
for i, stroke in enumerate(filtered_strokes[:2]):
1744-
logger.info(f"Stroke {i}: {json.dumps(stroke, indent=2)}")
1745-
17461741
for stroke in filtered_strokes:
17471742
if 'brushColor' in stroke and 'color' not in stroke:
17481743
stroke['color'] = stroke['brushColor']

backend/services/graphql_retry_queue.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,15 @@ def get_pending_retries(limit: int = 100) -> list:
8181
limit: Maximum number of items to retrieve
8282
8383
Returns:
84-
List of retry items (oldest first)
84+
List of tuples: (original_json_string, parsed_dict) (oldest first)
85+
The original JSON string is needed for removal from Redis
8586
"""
8687
try:
8788
# Get oldest items first (FIFO)
8889
items = redis_client.zrange(RETRY_QUEUE_KEY, 0, limit - 1)
89-
return [json.loads(item) for item in items]
90+
# Return both original JSON string AND parsed dict
91+
# This ensures we can use the exact key for removal
92+
return [(item.decode() if isinstance(item, bytes) else item, json.loads(item)) for item in items]
9093
except Exception as e:
9194
logger.error(f"Failed to get pending retries: {e}")
9295
return []
@@ -98,15 +101,16 @@ def remove_from_retry_queue(stroke_id: str, retry_item_json: str) -> None:
98101
99102
Args:
100103
stroke_id: Stroke identifier
101-
retry_item_json: The JSON string of the retry item (used as Redis key)
102-
MUST be the same JSON string used when adding (with sort_keys=True)
104+
retry_item_json: The ORIGINAL JSON string from Redis (not reconstructed!)
105+
This must be the exact string that was used as the Redis key
103106
"""
104107
try:
105108
result = redis_client.zrem(RETRY_QUEUE_KEY, retry_item_json)
106109
if result > 0:
107-
logger.info(f"Removed stroke {stroke_id} from retry queue (success)")
110+
logger.info(f"Removed stroke {stroke_id} from retry queue")
108111
else:
109-
logger.warning(f"Stroke {stroke_id} not found in retry queue (may have been already removed)")
112+
logger.error(f"Stroke {stroke_id} not found in retry queue key mismatch! Queue will not shrink!")
113+
logger.error(f"This indicates a bug in JSON serialization consistency")
110114
except Exception as e:
111115
logger.error(f"Failed to remove stroke {stroke_id} from retry queue: {e}")
112116

@@ -162,7 +166,7 @@ def process_retry_queue(max_items: int = 50) -> Dict[str, int]:
162166

163167
logger.info(f"Processing {len(pending_items)} pending GraphQL retries")
164168

165-
for item in pending_items:
169+
for original_json, item in pending_items:
166170
stroke_id = item.get("stroke_id")
167171
asset_data = item.get("asset_data")
168172

@@ -175,7 +179,7 @@ def process_retry_queue(max_items: int = 50) -> Dict[str, int]:
175179
attempts = get_retry_attempts(stroke_id)
176180
if attempts >= MAX_RETRY_ATTEMPTS:
177181
logger.error(f"Stroke {stroke_id} exceeded max retry attempts ({MAX_RETRY_ATTEMPTS}), removing from queue")
178-
remove_from_retry_queue(stroke_id, json.dumps(item))
182+
remove_from_retry_queue(stroke_id, original_json)
179183
stats["skipped"] += 1
180184
continue
181185

@@ -193,9 +197,9 @@ def process_retry_queue(max_items: int = 50) -> Dict[str, int]:
193197
txn_id = commit_transaction_via_graphql(prep)
194198
logger.info(f"RETRY SUCCESS: Stroke {stroke_id} committed to ResilientDB: {txn_id}")
195199

196-
# Remove from queue on success - use deterministic JSON serialization
197-
retry_item_json = json.dumps(item, sort_keys=True)
198-
remove_from_retry_queue(stroke_id, retry_item_json)
200+
# Use original JSON string as Redis key for removal
201+
# This ensures exact match with the key that was stored
202+
remove_from_retry_queue(stroke_id, original_json)
199203
stats["success"] += 1
200204

201205
except Exception as e:

0 commit comments

Comments
 (0)