Skip to content

Commit e41cf5f

Browse files
committed
pg-import optimizations
- add a bunch more status/timing info - commit in small transactions rather than doing everything inside one massive transaction. This increases performance considerably on a large import. I used batches of 10 rows at a time here as it seemed to be about optimal (going down to autocommit, and going up to 50 were both a bit slower). - remove constraint deferring; we don't *have* any deferrable constraints and this seems to just slow things down slightly.
1 parent d330035 commit e41cf5f

File tree

1 file changed

+48
-5
lines changed

1 file changed

+48
-5
lines changed

contrib/pg-import.py

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import importlib.resources
1515
import sys
1616
import re
17+
import time
1718

1819
import sogs # noqa: F401
1920

@@ -50,7 +51,7 @@
5051
old = sqlite3.connect(f"file:{args.sogs_db[0]}?mode=ro", uri=True)
5152
old.row_factory = sqlite3.Row
5253

53-
pgsql = psycopg.connect(args.postgresql_url[0])
54+
pgsql = psycopg.connect(args.postgresql_url[0], autocommit=True)
5455

5556
TABLES = [
5657
"rooms",
@@ -113,8 +114,6 @@
113114
# We have circular foreign keys that we need to break for the copy to work:
114115
curout.execute("ALTER TABLE rooms DROP CONSTRAINT room_image_fk")
115116

116-
curout.execute("SET CONSTRAINTS ALL DEFERRED")
117-
118117
def copy(table):
119118

120119
cols = [r['name'] for r in curin.execute(f"PRAGMA table_info({table})")]
@@ -141,6 +140,9 @@ def copy(table):
141140
)
142141

143142
count = 0
143+
count_total = curin.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
144+
print(f"Importing {table}: {count}/{count_total}", end="", flush=True)
145+
started = time.time()
144146
for row in curin.execute(f"SELECT * FROM {table}"):
145147
colnames = ', '.join('"' + c + '"' if c == "user" else c for c in cols)
146148
vals = ', '.join('%s' for _ in cols)
@@ -153,27 +155,67 @@ def copy(table):
153155
)
154156
count += 1
155157

156-
print(f"rooms: {count} rows copied")
158+
if count % 10 == 0 and args.commit:
159+
curout.execute("COMMIT; BEGIN")
160+
if count % 1000 == 0:
161+
print(f"\rImporting {table}: {count}/{count_total}", end="", flush=True)
162+
163+
if args.commit:
164+
curout.execute("COMMIT; BEGIN")
165+
print(
166+
f"\rFinished importing {table}: {count}/{count_total} rows imported "
167+
f"[{time.time() - started:.3f}s]",
168+
flush=True,
169+
)
157170

158171
for t in TABLES:
159172
copy(t)
160173

161174
# Put the foreign key we deleted back in:
175+
print("Reactivating room_image foreign key...", end="", flush=True)
176+
started = time.time()
162177
curout.execute(
163178
"ALTER TABLE rooms ADD CONSTRAINT room_image_fk FOREIGN KEY (image)"
164179
" REFERENCES files ON DELETE SET NULL"
165180
)
181+
if args.commit:
182+
curout.execute("COMMIT; BEGIN")
183+
print(f" done [{time.time() - started:.3f}s].")
166184

167185
# Our DB triggers mess with the seqno/updates values, so restore them:
186+
print("Updating message sequence counters...", end="", flush=True)
187+
started = time.time()
188+
count = 0
189+
count_total = curin.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
168190
for row in curin.execute("SELECT id, seqno FROM messages"):
169191
curout.execute("UPDATE messages SET seqno = %s WHERE id = %s", [row[1], row[0]])
192+
count += 1
193+
if count % 1000 == 0:
194+
if args.commit:
195+
curout.execute("COMMIT; BEGIN")
196+
print(
197+
f"\rUpdating message sequence counters... {count}/{count_total}", end="", flush=True
198+
)
199+
if args.commit:
200+
curout.execute("COMMIT; BEGIN")
201+
print(
202+
f"\rUpdated message sequence counters... {count}/{count_total} "
203+
f"[{time.time() - started:.3f}s]",
204+
flush=True,
205+
)
206+
207+
print("Updating room sequence/updates counters...")
208+
started = time.time()
170209
for row in curin.execute("SELECT id, message_sequence, info_updates FROM rooms"):
171210
curout.execute(
172211
"UPDATE rooms SET message_sequence = %s, info_updates = %s WHERE id = %s",
173212
[row[1], row[2], row[0]],
174213
)
214+
print(f" done [{time.time() - started:.3f}s].")
175215

176216
# Restart the identity sequences (otherwise new inserts will fail with conflicting ids)
217+
print("Restarting identity sequences...", end="", flush=True)
218+
started = time.time()
177219
identities = [
178220
(r[0], r[1])
179221
for r in curout.execute(
@@ -184,8 +226,9 @@ def copy(table):
184226
for table, col in identities:
185227
next_id = curout.execute(f"SELECT MAX({col}) FROM {table}").fetchone()[0]
186228
if next_id is not None:
187-
print(f"Updating {table}.{col} identity to start at {next_id+1}")
229+
print(f" {table}.{col}={next_id+1}", end="", flush=True)
188230
curout.execute(f"ALTER TABLE {table} ALTER COLUMN {col} RESTART WITH {next_id+1}")
231+
print(f". Done [{time.time() - started:.3f}s].")
189232

190233
if not args.commit:
191234
print("Import finished, aborting transaction (because --commit not given)")

0 commit comments

Comments
 (0)