Skip to content

Commit 0fc1067

Browse files
committed
BUGFIX:
- Improved error handling on pool timeouts, retries added.
1 parent 7ad6df0 commit 0fc1067

File tree

5 files changed

+40
-21
lines changed

5 files changed

+40
-21
lines changed

core/pubsub.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,9 @@ async def do_read():
118118
query = sql.SQL("DELETE FROM {table} WHERE id = ANY(%s::int[])").format(table=sql.Identifier(self.name))
119119
await conn.execute(query, (ids_to_delete,))
120120

121-
await asyncio.sleep(1) # Ensure the rest of __init__ has finished
122121
while not self._stop_event.is_set():
122+
# wait one second to clean up any issue
123+
await asyncio.sleep(1)
123124
with suppress(OperationalError):
124125
async with await AsyncConnection.connect(self.url, autocommit=True) as conn:
125126
while not self._stop_event.is_set():

plugins/cloud/logger.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def __init__(self, node: Node, url: str):
2828
self.node = node
2929
self.url = url
3030
self.cwd = os.getcwd()
31+
self.pending_futures = set()
3132

3233
def format_traceback(self, trace: traceback) -> tuple[str, int, list[str]]:
3334
ret = []
@@ -80,4 +81,15 @@ async def send_post(self, record: logging.LogRecord):
8081
def emit(self, record: logging.LogRecord):
8182
if record.levelno in [logging.ERROR, logging.CRITICAL] and record.exc_info is not None:
8283
with suppress(Exception):
83-
asyncio.create_task(self.send_post(record))
84+
loop = asyncio.get_event_loop()
85+
future = asyncio.run_coroutine_threadsafe(self.send_post(record), loop)
86+
self.pending_futures.add(future)
87+
future.add_done_callback(lambda f: self.pending_futures.discard(f))
88+
89+
def close(self):
90+
for future in list(self.pending_futures):
91+
try:
92+
future.result(timeout=1.0)
93+
except Exception:
94+
pass
95+
super().close()

plugins/mission/commands.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,13 @@ def __init__(self, bot: DCSServerBot, listener: Type[MissionEventListener] = Non
135135
self.update_channel_name.add_exception_type(AttributeError)
136136
self.update_channel_name.start()
137137
self.afk_check.start()
138+
self.check_for_unban.add_exception_type(psycopg.DatabaseError)
138139
self.check_for_unban.start()
139140
self.expire_token.add_exception_type(psycopg.DatabaseError)
140141
self.expire_token.start()
141142
if self.bot.locals.get('autorole', {}):
143+
self.check_roles.add_exception_type(psycopg.DatabaseError)
144+
self.check_roles.add_exception_type(discord.errors.DiscordException)
142145
self.check_roles.start()
143146

144147
async def cog_unload(self):
@@ -1872,25 +1875,22 @@ async def expire_token(self):
18721875

18731876
@tasks.loop(minutes=1.0)
18741877
async def check_for_unban(self):
1875-
try:
1876-
async with self.apool.connection() as conn:
1877-
async with conn.transaction():
1878-
cursor = await conn.execute("""
1879-
SELECT ucid FROM bans WHERE banned_until < (NOW() AT TIME ZONE 'utc')
1880-
""")
1881-
rows = await cursor.fetchall()
1882-
for row in rows:
1883-
for server in self.bot.servers.values():
1884-
if server.status not in [Status.PAUSED, Status.RUNNING, Status.STOPPED]:
1885-
continue
1886-
await server.send_to_dcs({
1887-
"command": "unban",
1888-
"ucid": row[0]
1889-
})
1890-
# delete unbanned accounts from the database
1891-
await conn.execute("DELETE FROM bans WHERE ucid = %s", (row[0], ))
1892-
except Exception as ex:
1893-
self.log.exception(ex)
1878+
async with self.apool.connection() as conn:
1879+
async with conn.transaction():
1880+
cursor = await conn.execute("""
1881+
SELECT ucid FROM bans WHERE banned_until < (NOW() AT TIME ZONE 'utc')
1882+
""")
1883+
rows = await cursor.fetchall()
1884+
for row in rows:
1885+
for server in self.bot.servers.values():
1886+
if server.status not in [Status.PAUSED, Status.RUNNING, Status.STOPPED]:
1887+
continue
1888+
await server.send_to_dcs({
1889+
"command": "unban",
1890+
"ucid": row[0]
1891+
})
1892+
# delete unbanned accounts from the database
1893+
await conn.execute("DELETE FROM bans WHERE ucid = %s", (row[0], ))
18941894

18951895
@check_for_unban.before_loop
18961896
async def before_check_unban(self):

run.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
from datetime import datetime
1717
from psycopg import OperationalError
18+
from psycopg_pool import PoolTimeout
1819

1920
# DCSServerBot imports
2021
try:
@@ -265,6 +266,9 @@ async def run_node(name, config_dir=None, no_autoupdate=False) -> int:
265266
# do not restart again
266267
exit(-2)
267268
except psycopg.OperationalError as ex:
269+
if isinstance(ex, PoolTimeout):
270+
# try again on pool timeouts
271+
exit(-1)
268272
log.error(f"Database Error: {ex}", exc_info=True)
269273
input("Press any key to continue ...")
270274
# do not restart again

services/monitoring/service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import os
77
import psutil
8+
import psycopg
89
import shutil
910
import sys
1011

@@ -51,6 +52,7 @@ async def start(self):
5152
self.space_warning_sent['C:'] = False
5253
self.space_alert_sent['C:'] = False
5354
self.check_autoexec()
55+
self.monitoring.add_exception_type(psycopg.DatabaseError)
5456
self.monitoring.start()
5557
if self.get_config().get('time_sync', False):
5658
time_server = self.get_config().get('time_server', None)

0 commit comments

Comments
 (0)