Skip to content

Commit d7df360

Browse files
timber-theyTimberscheidtdav
authored
Fix/deadlock possible when posting box locations via measurement (#667)
* Upgrade timescaledb * feat: bulk insert locations * feat: add TODO (for unit test?) * feat: bulk insert location updates * feat: bulk insert and update * fix: some test problems * fix: add cast to conform to type * feat: delete locations after test * feat: add tests and fix tested code * fix: remove obsolete code / comments and fix test * fix: lint * fix: docker-compose for ci * refactor: use sensible values for lat/lng --------- Co-authored-by: Timber <[email protected]> Co-authored-by: David Scheidt <[email protected]> Co-authored-by: David Scheidt <[email protected]>
1 parent fbd6688 commit d7df360

File tree

8 files changed

+714
-204
lines changed

8 files changed

+714
-204
lines changed

app/lib/user-service.server.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import bcrypt from 'bcryptjs'
22
import { eq } from 'drizzle-orm'
3+
import ConfirmEmailAddress, {
4+
subject as ConfirmEmailAddressSubject,
5+
} from 'emails/confirm-email'
6+
import DeleteUserEmail, {
7+
subject as DeleteUserEmailSubject,
8+
} from 'emails/delete-user'
39
import NewUserEmail, { subject as NewUserEmailSubject } from 'emails/new-user'
410
import PasswordResetEmail, {
511
subject as PasswordResetEmailSubject,
@@ -33,12 +39,6 @@ import {
3339
verifyLogin,
3440
} from '~/models/user.server'
3541
import { passwordResetRequest, user, type User } from '~/schema'
36-
import ConfirmEmailAddress, {
37-
subject as ConfirmEmailAddressSubject,
38-
} from 'emails/confirm-email'
39-
import DeleteUserEmail, {
40-
subject as DeleteUserEmailSubject,
41-
} from 'emails/delete-user'
4242

4343
const ONE_HOUR_MILLIS: number = 60 * 60 * 1000
4444

app/models/measurement.server.ts

Lines changed: 29 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
import { and, desc, eq, gte, lte, sql } from "drizzle-orm";
22
import { drizzleClient } from "~/db.server";
33
import {
4-
deviceToLocation,
4+
type LastMeasurement,
55
location,
66
measurement,
77
measurements10minView,
88
measurements1dayView,
99
measurements1hourView,
1010
measurements1monthView,
1111
measurements1yearView,
12-
sensor,
1312
} from "~/schema";
13+
import {
14+
type MinimalDevice,
15+
type MeasurementWithLocation,
16+
getLocationUpdates,
17+
findOrCreateLocations,
18+
addLocationUpdates,
19+
insertMeasurementsWithLocation,
20+
updateLastMeasurements,
21+
} from '~/utils/measurement-server-helper'
1422

1523
// This function retrieves measurements from the database based on the provided parameters.
1624
export function getMeasurement(
@@ -166,100 +174,17 @@ export function getMeasurement(
166174
});
167175
}
168176

169-
interface MeasurementWithLocation {
170-
sensor_id: string;
171-
value: number;
172-
createdAt?: Date;
173-
location?: {
174-
lng: number;
175-
lat: number;
176-
height?: number;
177-
} | null;
178-
}
179-
180-
interface MeasurementWithLocation {
181-
sensor_id: string;
182-
value: number;
183-
createdAt?: Date;
184-
location?: {
185-
lng: number;
186-
lat: number;
187-
height?: number;
188-
} | null;
189-
}
190-
191-
/**
192-
* Get the device location that was valid at a specific timestamp
193-
* Returns the most recent location that was set before or at the given timestamp
194-
*/
195-
async function getDeviceLocationAtTime(
196-
tx: any,
197-
deviceId: string,
198-
timestamp: Date
199-
): Promise<bigint | null> {
200-
const locationAtTime = await tx
201-
.select({
202-
locationId: deviceToLocation.locationId,
203-
})
204-
.from(deviceToLocation)
205-
.where(
206-
and(
207-
eq(deviceToLocation.deviceId, deviceId),
208-
lte(deviceToLocation.time, timestamp)
209-
)
210-
)
211-
.orderBy(desc(deviceToLocation.time))
212-
.limit(1);
213-
214-
return locationAtTime.length > 0 ? locationAtTime[0].locationId : null;
215-
}
216-
217-
async function findOrCreateLocation(
218-
tx: any,
219-
lng: number,
220-
lat: number
221-
): Promise<bigint> {
222-
const existingLocation = await tx
223-
.select({ id: location.id })
224-
.from(location)
225-
.where(
226-
sql`ST_Equals(
227-
${location.location},
228-
ST_SetSRID(ST_MakePoint(${lng}, ${lat}), 4326)
229-
)`
230-
)
231-
.limit(1);
232-
233-
234-
if (existingLocation.length > 0) {
235-
return existingLocation[0].id;
236-
}
237-
238-
const [newLocation] = await tx
239-
.insert(location)
240-
.values({
241-
location: sql`ST_SetSRID(ST_MakePoint(${lng}, ${lat}), 4326)`,
242-
})
243-
.returning();
244-
245-
return newLocation.id;
246-
}
247-
248177

249178
export async function saveMeasurements(
250-
device: any,
179+
device: MinimalDevice,
251180
measurements: MeasurementWithLocation[]
252181
): Promise<void> {
182+
if (!device)
183+
throw new Error("No device given!")
253184
if (!Array.isArray(measurements)) throw new Error("Array expected");
254185

255186
const sensorIds = device.sensors.map((s: any) => s.id);
256-
const lastMeasurements: Record<string, any> = {};
257-
258-
// Track measurements that update device location (those with explicit locations)
259-
const deviceLocationUpdates: Array<{
260-
location: { lng: number; lat: number; height?: number };
261-
time: Date;
262-
}> = [];
187+
const lastMeasurements: Record<string, NonNullable<LastMeasurement>> = {};
263188

264189
// Validate and prepare measurements
265190
for (let i = measurements.length - 1; i >= 0; i--) {
@@ -286,108 +211,36 @@ export async function saveMeasurements(
286211
throw error;
287212
}
288213

289-
if (!lastMeasurements[m.sensor_id]) {
214+
if (!lastMeasurements[m.sensor_id] ||
215+
lastMeasurements[m.sensor_id].createdAt < measurementTime.toISOString()) {
290216
lastMeasurements[m.sensor_id] = {
291217
value: m.value,
292218
createdAt: measurementTime.toISOString(),
293219
sensorId: m.sensor_id,
294220
};
295221
}
296-
297-
// Track measurements with explicit locations for device location updates
298-
if (m.location) {
299-
deviceLocationUpdates.push({
300-
location: m.location,
301-
time: measurementTime,
302-
});
303-
}
304222
}
305223

306-
// Sort device location updates by time (oldest first) to process in order
307-
deviceLocationUpdates.sort((a, b) => a.time.getTime() - b.time.getTime());
224+
// Track measurements that update device location (those with explicit locations)
225+
const deviceLocationUpdates = getLocationUpdates(measurements);
226+
const locations = await findOrCreateLocations(deviceLocationUpdates);
227+
228+
// First, update device locations for all measurements with explicit locations
229+
// This ensures the location history is complete before we infer locations
230+
await addLocationUpdates(deviceLocationUpdates, device.id, locations);
308231

232+
// Note that the insertion of measurements and update of sensors need to be in one
233+
// transaction, since otherwise other updates could get in between and the data would be
234+
// inconsistent. This shouldn't be a problem for the updates above.
309235
await drizzleClient.transaction(async (tx) => {
310-
// First, update device locations for all measurements with explicit locations
311-
// This ensures the location history is complete before we infer locations
312-
for (const update of deviceLocationUpdates) {
313-
const locationId = await findOrCreateLocation(
314-
tx,
315-
update.location.lng,
316-
update.location.lat
317-
);
318-
319-
// Check if we should add this to device location history
320-
// Only add if it's newer than the current latest location
321-
const currentLatestLocation = await tx
322-
.select({ time: deviceToLocation.time })
323-
.from(deviceToLocation)
324-
.where(eq(deviceToLocation.deviceId, device.id))
325-
.orderBy(desc(deviceToLocation.time))
326-
.limit(1);
327-
328-
const shouldAdd =
329-
currentLatestLocation.length === 0 ||
330-
update.time >= currentLatestLocation[0].time;
331-
332-
if (shouldAdd) {
333-
await tx
334-
.insert(deviceToLocation)
335-
.values({
336-
deviceId: device.id,
337-
locationId: locationId,
338-
time: update.time,
339-
})
340-
.onConflictDoNothing();
341-
}
342-
}
343-
344236
// Now process each measurement and infer locations if needed
345-
for (const m of measurements) {
346-
const measurementTime = m.createdAt || new Date();
347-
let locationId: bigint | null = null;
348-
349-
if (m.location) {
350-
// Measurement has explicit location
351-
locationId = await findOrCreateLocation(
352-
tx,
353-
m.location.lng,
354-
m.location.lat
355-
);
356-
} else {
357-
// No explicit location - infer from device location history
358-
locationId = await getDeviceLocationAtTime(
359-
tx,
360-
device.id,
361-
measurementTime
362-
);
363-
}
364-
365-
// Insert measurement with locationId (may be null for measurements
366-
// without location and before any device location was set)
367-
await tx.insert(measurement).values({
368-
sensorId: m.sensor_id,
369-
value: m.value,
370-
time: measurementTime,
371-
locationId: locationId,
372-
}).onConflictDoNothing();
373-
374-
}
375-
237+
await insertMeasurementsWithLocation(measurements, locations, device.id, tx);
376238
// Update sensor lastMeasurement values
377-
const updatePromises = Object.entries(lastMeasurements).map(
378-
([sensorId, lastMeasurement]) =>
379-
tx
380-
.update(sensor)
381-
.set({ lastMeasurement })
382-
.where(eq(sensor.id, sensorId))
383-
);
384-
385-
386-
await Promise.all(updatePromises);
387-
239+
await updateLastMeasurements(lastMeasurements, tx);
388240
});
389241
}
390242

243+
391244
export async function insertMeasurements(measurements: any[]): Promise<void> {
392245
const measurementInserts = measurements.map(measurement => ({
393246
sensorId: measurement.sensor_id,

0 commit comments

Comments
 (0)