Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions app/lib/user-service.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import bcrypt from 'bcryptjs'
import { eq } from 'drizzle-orm'
import ConfirmEmailAddress, {
subject as ConfirmEmailAddressSubject,
} from 'emails/confirm-email'
import DeleteUserEmail, {
subject as DeleteUserEmailSubject,
} from 'emails/delete-user'
import NewUserEmail, { subject as NewUserEmailSubject } from 'emails/new-user'
import PasswordResetEmail, {
subject as PasswordResetEmailSubject,
Expand Down Expand Up @@ -33,12 +39,6 @@ import {
verifyLogin,
} from '~/models/user.server'
import { passwordResetRequest, user, type User } from '~/schema'
import ConfirmEmailAddress, {
subject as ConfirmEmailAddressSubject,
} from 'emails/confirm-email'
import DeleteUserEmail, {
subject as DeleteUserEmailSubject,
} from 'emails/delete-user'

const ONE_HOUR_MILLIS: number = 60 * 60 * 1000

Expand Down
205 changes: 29 additions & 176 deletions app/models/measurement.server.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import { and, desc, eq, gte, lte, sql } from "drizzle-orm";
import { drizzleClient } from "~/db.server";
import {
deviceToLocation,
type LastMeasurement,
location,
measurement,
measurements10minView,
measurements1dayView,
measurements1hourView,
measurements1monthView,
measurements1yearView,
sensor,
} from "~/schema";
import {
type MinimalDevice,
type MeasurementWithLocation,
getLocationUpdates,
findOrCreateLocations,
addLocationUpdates,
insertMeasurementsWithLocation,
updateLastMeasurements,
} from '~/utils/measurement-server-helper'

// This function retrieves measurements from the database based on the provided parameters.
export function getMeasurement(
Expand Down Expand Up @@ -166,100 +174,17 @@ export function getMeasurement(
});
}

interface MeasurementWithLocation {
sensor_id: string;
value: number;
createdAt?: Date;
location?: {
lng: number;
lat: number;
height?: number;
} | null;
}

interface MeasurementWithLocation {
sensor_id: string;
value: number;
createdAt?: Date;
location?: {
lng: number;
lat: number;
height?: number;
} | null;
}

/**
* Get the device location that was valid at a specific timestamp
* Returns the most recent location that was set before or at the given timestamp
*/
async function getDeviceLocationAtTime(
tx: any,
deviceId: string,
timestamp: Date
): Promise<bigint | null> {
const locationAtTime = await tx
.select({
locationId: deviceToLocation.locationId,
})
.from(deviceToLocation)
.where(
and(
eq(deviceToLocation.deviceId, deviceId),
lte(deviceToLocation.time, timestamp)
)
)
.orderBy(desc(deviceToLocation.time))
.limit(1);

return locationAtTime.length > 0 ? locationAtTime[0].locationId : null;
}

async function findOrCreateLocation(
tx: any,
lng: number,
lat: number
): Promise<bigint> {
const existingLocation = await tx
.select({ id: location.id })
.from(location)
.where(
sql`ST_Equals(
${location.location},
ST_SetSRID(ST_MakePoint(${lng}, ${lat}), 4326)
)`
)
.limit(1);


if (existingLocation.length > 0) {
return existingLocation[0].id;
}

const [newLocation] = await tx
.insert(location)
.values({
location: sql`ST_SetSRID(ST_MakePoint(${lng}, ${lat}), 4326)`,
})
.returning();

return newLocation.id;
}


export async function saveMeasurements(
device: any,
device: MinimalDevice,
measurements: MeasurementWithLocation[]
): Promise<void> {
if (!device)
throw new Error("No device given!")
if (!Array.isArray(measurements)) throw new Error("Array expected");

const sensorIds = device.sensors.map((s: any) => s.id);
const lastMeasurements: Record<string, any> = {};

// Track measurements that update device location (those with explicit locations)
const deviceLocationUpdates: Array<{
location: { lng: number; lat: number; height?: number };
time: Date;
}> = [];
const lastMeasurements: Record<string, NonNullable<LastMeasurement>> = {};

// Validate and prepare measurements
for (let i = measurements.length - 1; i >= 0; i--) {
Expand All @@ -286,108 +211,36 @@ export async function saveMeasurements(
throw error;
}

if (!lastMeasurements[m.sensor_id]) {
if (!lastMeasurements[m.sensor_id] ||
lastMeasurements[m.sensor_id].createdAt < measurementTime.toISOString()) {
lastMeasurements[m.sensor_id] = {
value: m.value,
createdAt: measurementTime.toISOString(),
sensorId: m.sensor_id,
};
}

// Track measurements with explicit locations for device location updates
if (m.location) {
deviceLocationUpdates.push({
location: m.location,
time: measurementTime,
});
}
}

// Sort device location updates by time (oldest first) to process in order
deviceLocationUpdates.sort((a, b) => a.time.getTime() - b.time.getTime());
// Track measurements that update device location (those with explicit locations)
const deviceLocationUpdates = getLocationUpdates(measurements);
const locations = await findOrCreateLocations(deviceLocationUpdates);

// First, update device locations for all measurements with explicit locations
// This ensures the location history is complete before we infer locations
await addLocationUpdates(deviceLocationUpdates, device.id, locations);

// Note that the insertion of measurements and update of sensors need to be in one
// transaction, since otherwise other updates could get in between and the data would be
// inconsistent. This shouldn't be a problem for the updates above.
await drizzleClient.transaction(async (tx) => {
// First, update device locations for all measurements with explicit locations
// This ensures the location history is complete before we infer locations
for (const update of deviceLocationUpdates) {
const locationId = await findOrCreateLocation(
tx,
update.location.lng,
update.location.lat
);

// Check if we should add this to device location history
// Only add if it's newer than the current latest location
const currentLatestLocation = await tx
.select({ time: deviceToLocation.time })
.from(deviceToLocation)
.where(eq(deviceToLocation.deviceId, device.id))
.orderBy(desc(deviceToLocation.time))
.limit(1);

const shouldAdd =
currentLatestLocation.length === 0 ||
update.time >= currentLatestLocation[0].time;

if (shouldAdd) {
await tx
.insert(deviceToLocation)
.values({
deviceId: device.id,
locationId: locationId,
time: update.time,
})
.onConflictDoNothing();
}
}

// Now process each measurement and infer locations if needed
for (const m of measurements) {
const measurementTime = m.createdAt || new Date();
let locationId: bigint | null = null;

if (m.location) {
// Measurement has explicit location
locationId = await findOrCreateLocation(
tx,
m.location.lng,
m.location.lat
);
} else {
// No explicit location - infer from device location history
locationId = await getDeviceLocationAtTime(
tx,
device.id,
measurementTime
);
}

// Insert measurement with locationId (may be null for measurements
// without location and before any device location was set)
await tx.insert(measurement).values({
sensorId: m.sensor_id,
value: m.value,
time: measurementTime,
locationId: locationId,
}).onConflictDoNothing();

}

await insertMeasurementsWithLocation(measurements, locations, device.id, tx);
// Update sensor lastMeasurement values
const updatePromises = Object.entries(lastMeasurements).map(
([sensorId, lastMeasurement]) =>
tx
.update(sensor)
.set({ lastMeasurement })
.where(eq(sensor.id, sensorId))
);


await Promise.all(updatePromises);

await updateLastMeasurements(lastMeasurements, tx);
});
}


export async function insertMeasurements(measurements: any[]): Promise<void> {
const measurementInserts = measurements.map(measurement => ({
sensorId: measurement.sensor_id,
Expand Down
Loading
Loading