Skip to content

Commit e403d60

Browse files
authored
vdk-postgres: batch inserts during ingestion (#3121)
Insert data is painfully slow even small amounts making even testing and pocs hard. So this is a quick and slight optimization Before insert 300 rows took 15 minutes. Now it's about 1 minute Testing Done: made sure existing test pass. Tested with a job sending 300 rows in 2 tables. --------- Signed-off-by: Antoni Ivanov <[email protected]>
1 parent 6369417 commit e403d60

File tree

2 files changed

+27
-29
lines changed

2 files changed

+27
-29
lines changed

projects/vdk-plugins/vdk-postgres/src/vdk/plugin/postgres/ingest_to_postgres.py

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
import logging
44
from typing import List
55
from typing import Optional
6-
from typing import Tuple
76

87
from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Cursor
98
from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin
109
from vdk.internal.builtin_plugins.run.job_context import JobContext
1110
from vdk.internal.core import errors
11+
from vdk.internal.util.decorators import closing_noexcept_on_close
1212

1313
log = logging.getLogger(__name__)
1414

@@ -38,16 +38,17 @@ def ingest_payload(
3838
f"collection_id: {collection_id}"
3939
)
4040

41-
with self._context.connections.open_connection(
42-
"POSTGRES"
43-
).connect() as connection:
44-
cursor = connection.cursor()
41+
# this is managed connection, no need to close it here.
42+
connection = self._context.connections.open_connection("POSTGRES")
43+
with closing_noexcept_on_close(connection.cursor()) as cursor:
4544
query, parameters = self._populate_query_parameters_tuple(
4645
destination_table, cursor, payload
4746
)
4847

4948
try:
50-
cursor.execute(query, parameters)
49+
cursor.executemany(
50+
query, parameters
51+
) # Use executemany for bulk insertion
5152
connection.commit()
5253
log.debug("Payload was ingested.")
5354
except Exception as e:
@@ -57,28 +58,24 @@ def ingest_payload(
5758
@staticmethod
5859
def _populate_query_parameters_tuple(
5960
destination_table: str, cursor: PEP249Cursor, payload: List[dict]
60-
) -> (str, Tuple[str]):
61+
) -> (str, list):
6162
"""
62-
Returns insert into destination table tuple of query and parameters;
63-
E.g. for a table dest_table with columns val1, val2 and payload size 3, this method will return:
64-
'INSERT INTO dest_table (val1, val2) VALUES (%s, %s), (%s, %s), (%s, %s)', ['val1', 'val2']
63+
Prepare the SQL query and parameters for bulk insertion.
6564
66-
:param destination_table: str
67-
the name of the destination table
68-
:param cursor: PEP249Cursor
69-
the database cursor
70-
:param payload: List[dict]
71-
the payloads to be ingested
72-
:return: Tuple[str, Tuple[str]]
73-
tuple containing the query and parameters
65+
Returns insert into destination table tuple of query and parameters;
66+
E.g. for a table dest_table with columns val1, val2 and payload size 2, this method will return:
67+
'INSERT INTO dest_table (val1, val2) VALUES (%s, %s)',
68+
[('val1', 'val2'), ('val1', 'val2')]
7469
"""
7570
cursor.execute(f"SELECT * FROM {destination_table} WHERE false")
76-
columns = [c.name for c in cursor.description]
71+
columns = [desc[0] for desc in cursor.description]
7772

78-
row_placeholder = f"({', '.join('%s' for column in columns)})"
73+
placeholders = ", ".join(["%s"] * len(columns))
74+
query = f"INSERT INTO {destination_table} ({', '.join(columns)}) VALUES ({placeholders})"
7975

80-
return (
81-
f"INSERT INTO {destination_table} ({', '.join(columns)}) "
82-
f"VALUES {', '.join([row_placeholder for i in range(len(payload))])}",
83-
tuple(obj[column] for obj in payload for column in columns),
84-
)
76+
parameters = []
77+
for obj in payload:
78+
row = tuple(obj[column] for column in columns)
79+
parameters.append(row)
80+
81+
return query, parameters

projects/vdk-plugins/vdk-postgres/tests/conftest.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
# Copyright 2021-2024 VMware, Inc.
22
# SPDX-License-Identifier: Apache-2.0
3+
import logging
34
import os
45
import time
5-
from functools import partial
66
from unittest import mock
77

88
import pytest
99
from testcontainers.core.container import DockerContainer
1010
from testcontainers.core.waiting_utils import wait_container_is_ready
11-
from testcontainers.core.waiting_utils import wait_for
1211
from vdk.plugin.postgres import postgres_plugin
1312
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner
1413

@@ -19,6 +18,8 @@
1918
VDK_POSTGRES_HOST = "VDK_POSTGRES_HOST"
2019
VDK_POSTGRES_PORT = "VDK_POSTGRES_PORT"
2120

21+
log = logging.getLogger(__name__)
22+
2223

2324
@wait_container_is_ready(Exception)
2425
def wait_for_postgres_to_be_responsive(runner):
@@ -59,7 +60,7 @@ def postgres_service(request):
5960
# wait 2 seconds to make sure the service is up and responsive
6061
# might be unnecessary but it's out of abundance of caution
6162
time.sleep(2)
62-
print(
63+
log.info(
6364
f"Postgres service started on port {container.get_exposed_port(port)} and host {container.get_container_host_ip()}"
6465
)
6566
except Exception as e:
@@ -70,7 +71,7 @@ def postgres_service(request):
7071

7172
def stop_container():
7273
container.stop()
73-
print("Postgres service stopped")
74+
log.info("Postgres service stopped")
7475

7576
request.addfinalizer(stop_container)
7677

0 commit comments

Comments
 (0)