Skip to content

Commit 3c0b057

Browse files
Ken LippoldKen Lippold
authored andcommitted
Added bulk_copy method to Observations manager
1 parent ac1d6f8 commit 3c0b057

File tree

3 files changed

+34
-7
lines changed

3 files changed

+34
-7
lines changed

sta/management/utils.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,9 @@ def generate_test_timeseries(datastream_id: UUID):
4040
result=result
4141
))
4242

43-
if len(observations) >= 50000:
44-
Observation.objects.bulk_create(observations)
45-
print(f"Created {len(observations)} observations...")
43+
if len(observations) >= 100000:
44+
Observation.objects.bulk_copy(observations, batch_size=100000)
4645
observations.clear()
4746

4847
if observations:
49-
Observation.objects.bulk_create(observations)
50-
print(f"Created {len(observations)} observations...")
48+
Observation.objects.bulk_copy(observations)

sta/models/observation.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import io
12
import uuid
23
import typing
34
from typing import Literal, Optional
4-
from django.db import models
5+
from django.db import models, connection
56
from django.db.models import Q
67
from iam.models import Workspace
78
from iam.models.utils import PermissionChecker
@@ -49,6 +50,34 @@ def removable(self, user: Optional["User"]):
4950
datastream__thing__workspace__collaborators__role__permissions__permission_type__in=["*", "delete"])
5051
)
5152

53+
def bulk_copy(self, observations, batch_size=100_000):
54+
db_table_sql = connection.ops.quote_name(self.model._meta.db_table) # noqa
55+
db_fields = [field.column for field in self.model._meta.fields if not field.primary_key] # noqa
56+
db_fields_sql = ", ".join(connection.ops.quote_name(field) for field in db_fields)
57+
58+
def escape_pg_copy(value):
59+
if value is None:
60+
return r"\N"
61+
if isinstance(value, str):
62+
return value.replace("\\", "\\\\").replace("\t", "\\t").replace("\n", "\\n").replace("\r", "\\r")
63+
return str(value)
64+
65+
with connection.cursor() as cursor:
66+
with cursor.copy(f"COPY {db_table_sql} ({db_fields_sql}) FROM STDIN") as copy:
67+
buffer = io.StringIO()
68+
for i in range(0, len(observations), batch_size):
69+
batch = observations[i: i + batch_size]
70+
buffer.write("\n".join(
71+
"\t".join(escape_pg_copy(getattr(obs, field, None)) for field in db_fields)
72+
for obs in batch
73+
) + "\n")
74+
buffer.seek(0)
75+
copy.write(buffer.read())
76+
buffer.truncate(0)
77+
buffer.seek(0)
78+
79+
return observations
80+
5281

5382
class Observation(models.Model, PermissionChecker):
5483
pk = models.CompositePrimaryKey("datastream_id", "phenomenon_time", "id")

sta/services/sensorthings/observation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def create_observations(
180180
# )
181181

182182
try:
183-
new_observations_for_datastream = Observation.objects.bulk_create([
183+
new_observations_for_datastream = Observation.objects.bulk_copy([
184184
Observation(
185185
datastream_id=observation.datastream.id,
186186
phenomenon_time=observation.phenomenon_time,

0 commit comments

Comments
 (0)