diff --git a/packages/backend/src/__tests__/drivers/util.driver.ts b/packages/backend/src/__tests__/drivers/util.driver.ts index 37f39d109..211967588 100644 --- a/packages/backend/src/__tests__/drivers/util.driver.ts +++ b/packages/backend/src/__tests__/drivers/util.driver.ts @@ -1,8 +1,11 @@ -import { WithId } from "mongodb"; +import { ObjectId, WithId } from "mongodb"; +import { faker } from "@faker-js/faker"; +import { Schema_Sync } from "@core/types/sync.types"; import { Schema_User } from "@core/types/user.types"; import { SyncDriver } from "@backend/__tests__/drivers/sync.driver"; import { UserDriver } from "@backend/__tests__/drivers/user.driver"; import { WaitListDriver } from "@backend/__tests__/drivers/waitlist.driver"; +import mongoService from "@backend/common/services/mongo.service"; export class UtilDriver { static async setupTestUser(): Promise<{ user: WithId }> { @@ -17,4 +20,38 @@ export class UtilDriver { return { user }; } + + static async generateV0SyncData( + numUsers = 3, + ): Promise>>> { + const users = await Promise.all( + Array.from({ length: numUsers }, UserDriver.createUser), + ); + + const data = users.map((user) => ({ + _id: new ObjectId(), + user: user._id.toString(), + google: { + events: [ + { + resourceId: faker.string.ulid(), + gCalendarId: user.email, + lastSyncedAt: faker.date.past(), + nextSyncToken: faker.string.alphanumeric(32), + channelId: faker.string.uuid(), + expiration: faker.date.future().getTime().toString(), + }, + ], + calendarlist: [ + { + nextSyncToken: faker.string.alphanumeric(32), + gCalendarId: user.email, + lastSyncedAt: faker.date.past(), + }, + ], + }, + })); + + return mongoService.sync.insertMany(data).then(() => data); + } } diff --git a/packages/backend/src/common/constants/collections.ts b/packages/backend/src/common/constants/collections.ts index a4635b1a0..a2b54a30a 100644 --- a/packages/backend/src/common/constants/collections.ts +++ b/packages/backend/src/common/constants/collections.ts @@ -9,4 +9,5 @@ export const Collections = { SYNC: IS_DEV ? "_dev.sync" : "sync", USER: IS_DEV ? "_dev.user" : "user", WAITLIST: IS_DEV ? "_dev.waitlist" : "waitlist", + WATCH: IS_DEV ? "_dev.watch" : "watch", }; diff --git a/packages/backend/src/common/services/mongo.service.ts b/packages/backend/src/common/services/mongo.service.ts index d4ea9cc29..11c4d4bdc 100644 --- a/packages/backend/src/common/services/mongo.service.ts +++ b/packages/backend/src/common/services/mongo.service.ts @@ -11,13 +11,14 @@ import { } from "mongodb"; import { Logger } from "@core/logger/winston.logger"; import { - CompassCalendar, - Schema_CalendarList as Schema_Calendar, + Schema_CalendarList as Schema_CalList, + Schema_Calendar, } from "@core/types/calendar.types"; import { Schema_Event } from "@core/types/event.types"; import { Schema_Sync } from "@core/types/sync.types"; import { Schema_User } from "@core/types/user.types"; import { Schema_Waitlist } from "@core/types/waitlist/waitlist.types"; +import { Schema_Watch } from "@core/types/watch.types"; import { Collections } from "@backend/common/constants/collections"; import { ENV } from "@backend/common/constants/env.constants"; import { waitUntilEvent } from "@backend/common/helpers/common.util"; @@ -27,12 +28,13 @@ const logger = Logger("app:mongo.service"); interface InternalClient { db: Db; client: MongoClient; - calendar: Collection; - calendarList: Collection; + calendar: Collection; + calendarList: Collection; event: Collection>; sync: Collection; user: Collection; waitlist: Collection; + watch: Collection>; } class MongoService { @@ -96,6 +98,15 @@ class MongoService { return this.#accessInternalCollectionProps("waitlist"); } + /** + * watch + * + * mongo collection + */ + get watch(): InternalClient["watch"] { + return this.#accessInternalCollectionProps("watch"); + } + private onConnect(client: MongoClient, useDynamicDb = false) { this.#internalClient = this.createInternalClient(client, useDynamicDb); @@ -127,12 +138,13 @@ class MongoService { return { db, client, - calendar: db.collection(Collections.CALENDAR), - calendarList: db.collection(Collections.CALENDARLIST), + calendar: db.collection(Collections.CALENDAR), + calendarList: db.collection(Collections.CALENDARLIST), event: db.collection>(Collections.EVENT), sync: db.collection(Collections.SYNC), user: db.collection(Collections.USER), waitlist: db.collection(Collections.WAITLIST), + watch: db.collection>(Collections.WATCH), }; } diff --git a/packages/core/src/types/calendar.types.test.ts b/packages/core/src/types/calendar.types.test.ts index 9f805c2a6..271c610f4 100644 --- a/packages/core/src/types/calendar.types.test.ts +++ b/packages/core/src/types/calendar.types.test.ts @@ -1,3 +1,4 @@ +import { ObjectId } from "bson"; import { faker } from "@faker-js/faker"; import { CompassCalendarSchema, @@ -65,7 +66,7 @@ describe("Calendar Types", () => { describe("CompassCalendarSchema", () => { const compassCalendar = { - _id: faker.database.mongodbObjectId(), + _id: new ObjectId(), user: faker.database.mongodbObjectId(), backgroundColor: gCalendar.backgroundColor!, color: gCalendar.foregroundColor!, diff --git a/packages/core/src/types/calendar.types.ts b/packages/core/src/types/calendar.types.ts index 57d150ad9..c81a70250 100644 --- a/packages/core/src/types/calendar.types.ts +++ b/packages/core/src/types/calendar.types.ts @@ -5,6 +5,7 @@ import { IDSchemaV4, RGBHexSchema, TimezoneSchema, + zObjectId, } from "@core/types/type.utils"; // @deprecated - will be replaced by Schema_Calendar @@ -55,7 +56,7 @@ export const GoogleCalendarMetadataSchema = z.object({ }); export const CompassCalendarSchema = z.object({ - _id: IDSchemaV4, + _id: zObjectId, user: IDSchemaV4, backgroundColor: RGBHexSchema, color: RGBHexSchema, @@ -67,4 +68,4 @@ export const CompassCalendarSchema = z.object({ metadata: GoogleCalendarMetadataSchema, // use union when other providers present }); -export type CompassCalendar = z.infer; +export type Schema_Calendar = z.infer; diff --git a/packages/core/src/types/type.utils.ts b/packages/core/src/types/type.utils.ts index 2fc7d381d..b7c2a2d81 100644 --- a/packages/core/src/types/type.utils.ts +++ b/packages/core/src/types/type.utils.ts @@ -1,6 +1,7 @@ import { ObjectId } from "bson"; import { z } from "zod"; import { z as zod4 } from "zod/v4"; +import { z as zod4Mini } from "zod/v4-mini"; export type KeyOfType = keyof { [P in keyof T as T[P] extends V ? P : never]: unknown; @@ -19,6 +20,16 @@ export const IDSchemaV4 = zod4.string().refine(ObjectId.isValid, { message: "Invalid id", }); +export const zObjectIdMini = zod4Mini.pipe( + zod4Mini.custom(ObjectId.isValid), + zod4Mini.transform((v) => new ObjectId(v)), +); + +export const zObjectId = zod4.pipe( + zod4.custom((v) => ObjectId.isValid(v as string)), + zod4.transform((v) => new ObjectId(v)), +); + export const TimezoneSchema = zod4.string().refine( (timeZone) => { try { diff --git a/packages/core/src/types/watch.types.test.ts b/packages/core/src/types/watch.types.test.ts new file mode 100644 index 000000000..7d61e480c --- /dev/null +++ b/packages/core/src/types/watch.types.test.ts @@ -0,0 +1,67 @@ +import { ObjectId } from "bson"; +import { faker } from "@faker-js/faker"; +import { Schema_Watch, WatchSchema } from "@core/types/watch.types"; + +describe("Watch Types", () => { + const validWatch: Schema_Watch = { + _id: new ObjectId(), + user: faker.database.mongodbObjectId(), + resourceId: faker.string.alphanumeric(20), + expiration: faker.date.future(), + createdAt: new Date(), + }; + + describe("WatchSchema", () => { + it("parses valid watch data", () => { + expect(() => WatchSchema.parse(validWatch)).not.toThrow(); + }); + + it("defaults createdAt to current date when not provided", () => { + const watchWithoutCreatedAt = { + ...validWatch, + createdAt: undefined, + }; + + const parsed = WatchSchema.parse(watchWithoutCreatedAt); + expect(parsed.createdAt).toBeInstanceOf(Date); + }); + + it("accepts valid MongoDB ObjectId for user", () => { + const watchData = { + ...validWatch, + user: faker.database.mongodbObjectId(), + }; + + expect(() => WatchSchema.parse(watchData)).not.toThrow(); + }); + + it("rejects invalid MongoDB ObjectId for user", () => { + const watchData = { + ...validWatch, + user: "invalid-object-id", + }; + + expect(() => WatchSchema.parse(watchData)).toThrow(); + }); + + it("requires all mandatory fields", () => { + const requiredFields = ["_id", "user", "resourceId", "expiration"]; + + requiredFields.forEach((field) => { + const incompleteWatch = { ...validWatch }; + delete incompleteWatch[field as keyof Schema_Watch]; + + expect(() => WatchSchema.parse(incompleteWatch)).toThrow(); + }); + }); + + it("requires expiration to be a Date", () => { + const watchData = { + ...validWatch, + expiration: "2024-12-31T23:59:59Z", // string instead of Date + }; + + expect(() => WatchSchema.parse(watchData)).toThrow(); + }); + }); +}); diff --git a/packages/core/src/types/watch.types.ts b/packages/core/src/types/watch.types.ts new file mode 100644 index 000000000..0aa663d02 --- /dev/null +++ b/packages/core/src/types/watch.types.ts @@ -0,0 +1,22 @@ +import { z } from "zod/v4"; +import { IDSchemaV4, zObjectId } from "@core/types/type.utils"; + +/** + * Watch collection schema for Google Calendar push notification channels + * + * This schema stores channel metadata for Google Calendar push notifications + * to enable reliable lifecycle management of channels (creation, renewal, + * expiration, deletion) separately from sync data. + */ +export const WatchSchema = z.object({ + _id: zObjectId, // channel_id - unique identifier for the notification channel + user: IDSchemaV4, // user who owns this watch channel + resourceId: z.string(), // Google Calendar resource identifier + expiration: z.date(), // when the channel expires + createdAt: z + .date() + .optional() + .default(() => new Date()), // when this watch was created +}); + +export type Schema_Watch = z.infer; diff --git a/packages/scripts/src/__tests__/integration/2025.10.03T01.19.59.calendar-schema.test.ts b/packages/scripts/src/__tests__/integration/2025.10.03T01.19.59.calendar-schema.test.ts index 57794ed99..bc857e7fa 100644 --- a/packages/scripts/src/__tests__/integration/2025.10.03T01.19.59.calendar-schema.test.ts +++ b/packages/scripts/src/__tests__/integration/2025.10.03T01.19.59.calendar-schema.test.ts @@ -1,3 +1,4 @@ +import { ObjectId } from "bson"; import { faker } from "@faker-js/faker"; import { zodToMongoSchema } from "@scripts/common/zod-to-mongo-schema"; import Migration from "@scripts/migrations/2025.10.03T01.19.59.calendar-schema"; @@ -121,7 +122,7 @@ describe("2025.10.03T01.19.59.calendar-schema", () => { const gCalendar = GoogleCalendarMetadataSchema.parse(gCalendarEntry); return CompassCalendarSchema.parse({ - _id: faker.database.mongodbObjectId(), + _id: new ObjectId(), user: faker.database.mongodbObjectId(), backgroundColor: gCalendarEntry.backgroundColor!, color: gCalendarEntry.foregroundColor!, diff --git a/packages/scripts/src/__tests__/integration/2025.10.13T14.18.20.watch-collection.test.ts b/packages/scripts/src/__tests__/integration/2025.10.13T14.18.20.watch-collection.test.ts new file mode 100644 index 000000000..f660e9e4c --- /dev/null +++ b/packages/scripts/src/__tests__/integration/2025.10.13T14.18.20.watch-collection.test.ts @@ -0,0 +1,167 @@ +import { ObjectId, WithId } from "mongodb"; +import { faker } from "@faker-js/faker"; +import { zodToMongoSchema } from "@scripts/common/zod-to-mongo-schema"; +import Migration from "@scripts/migrations/2025.10.13T14.18.20.watch-collection"; +import { Schema_Watch, WatchSchema } from "@core/types/watch.types"; +import { + cleanupCollections, + cleanupTestDb, + setupTestDb, +} from "@backend/__tests__/helpers/mock.db.setup"; +import { Collections } from "@backend/common/constants/collections"; +import mongoService from "@backend/common/services/mongo.service"; + +type PartialWatch = Partial>>; + +describe("2025.10.13T14.18.20.watch-collection", () => { + const migration = new Migration(); + const collectionName = Collections.WATCH; + + beforeAll(setupTestDb); + beforeEach(cleanupCollections); + afterEach(() => mongoService.watch.drop()); + afterAll(cleanupTestDb); + + function generateWatch(): WithId> { + return { + _id: new ObjectId(), + user: faker.database.mongodbObjectId(), + resourceId: faker.string.alphanumeric(20), + expiration: faker.date.future(), + createdAt: faker.date.recent(), + }; + } + + async function validateUpMigration() { + const indexes = await mongoService.watch.indexes(); + const collectionInfo = await mongoService.watch.options(); + const $jsonSchema = zodToMongoSchema(WatchSchema); + + expect(collectionInfo["validationLevel"]).toBe("strict"); + expect(collectionInfo["validator"]).toBeDefined(); + expect(collectionInfo["validator"]).toHaveProperty("$jsonSchema"); + expect(collectionInfo["validator"]["$jsonSchema"]).toEqual($jsonSchema); + + expect(indexes).toEqual( + expect.arrayContaining([ + expect.objectContaining({ name: "_id_", key: { _id: 1 } }), + expect.objectContaining({ + name: `${collectionName}_user_index`, + key: { user: 1 }, + }), + expect.objectContaining({ + name: `${collectionName}_user_expiration_index`, + key: { user: 1, expiration: 1 }, + }), + ]), + ); + } + + it("up() creates collection when it doesn't exist", async () => { + const existsBefore = await mongoService.collectionExists(collectionName); + + expect(existsBefore).toBe(false); + + await migration.up(); + + const existsAfter = await mongoService.collectionExists(collectionName); + + expect(existsAfter).toBe(true); + + await validateUpMigration(); + }); + + it("up() modifies collection when it exists", async () => { + // Create collection first + await mongoService.db.createCollection(collectionName); + + const existsBefore = await mongoService.collectionExists(collectionName); + + expect(existsBefore).toBe(true); + + await migration.up(); + + await validateUpMigration(); + }); + + it("down() removes schema validation and indexes without dropping collection", async () => { + await migration.up(); + + const existsBefore = await mongoService.collectionExists(collectionName); + + expect(existsBefore).toBe(true); + + await validateUpMigration(); + + await migration.down(); + + const existsAfter = await mongoService.collectionExists(collectionName); + + expect(existsAfter).toBe(true); + + const indexes = await mongoService.watch.indexes(); + const collectionInfo = await mongoService.watch.options(); + + expect(indexes).toHaveLength(1); + expect(indexes).toEqual([ + expect.objectContaining({ name: "_id_", key: { _id: 1 } }), + ]); + + expect(collectionInfo["validationLevel"]).toBe("off"); + expect(collectionInfo["validationAction"]).toBe("error"); + expect(collectionInfo).not.toHaveProperty("validator"); + }); + + describe("mongo $jsonSchema validation", () => { + beforeEach(migration.up.bind(migration)); + + it("allows valid watch documents", async () => { + const watch = generateWatch(); + + await expect(mongoService.watch.insertOne(watch)).resolves.toMatchObject({ + acknowledged: true, + insertedId: watch._id, + }); + }); + + it("rejects documents with missing required fields", async () => { + const incompleteWatch = generateWatch(); + + delete (incompleteWatch as PartialWatch).resourceId; + delete (incompleteWatch as PartialWatch).expiration; + + await expect( + mongoService.watch.insertOne(incompleteWatch), + ).rejects.toThrow(); + }); + + it("rejects documents with missing user", async () => { + const watchWithoutUserId = generateWatch(); + + delete (watchWithoutUserId as PartialWatch).user; + + await expect( + mongoService.watch.insertOne(watchWithoutUserId), + ).rejects.toThrow(/Document failed validation/); + }); + + it("rejects documents with additional properties", async () => { + const watchWithExtra = { + ...generateWatch(), + extraProperty: "should-not-be-allowed", + }; + + await expect( + mongoService.watch.insertOne(watchWithExtra), + ).rejects.toThrow(); + }); + + it("enforces unique constraint on _id (channelId)", async () => { + const watch = generateWatch(); + + await mongoService.watch.insertOne(watch); + + await expect(mongoService.watch.insertOne(watch)).rejects.toThrow(); + }); + }); +}); diff --git a/packages/scripts/src/__tests__/integration/2025.10.13T14.22.21.migrate-sync-watch-data.test.ts b/packages/scripts/src/__tests__/integration/2025.10.13T14.22.21.migrate-sync-watch-data.test.ts new file mode 100644 index 000000000..a48f4d85a --- /dev/null +++ b/packages/scripts/src/__tests__/integration/2025.10.13T14.22.21.migrate-sync-watch-data.test.ts @@ -0,0 +1,99 @@ +import { ObjectId } from "mongodb"; +import { faker } from "@faker-js/faker"; +import { MigratorType } from "@scripts/common/cli.types"; +import WatchMigration from "@scripts/migrations/2025.10.13T14.18.20.watch-collection"; +import Migration from "@scripts/migrations/2025.10.13T14.22.21.migrate-sync-watch-data"; +import { Logger } from "@core/logger/winston.logger"; +import { UtilDriver } from "@backend/__tests__/drivers/util.driver"; +import { + cleanupCollections, + cleanupTestDb, + setupTestDb, +} from "@backend/__tests__/helpers/mock.db.setup"; +import mongoService from "@backend/common/services/mongo.service"; + +describe("2025.10.13T14.22.21.migrate-sync-watch-data", () => { + const migration = new Migration(); + const syncCount = faker.number.int({ min: 1, max: 5 }); + + const migrationContext = { + name: migration.name, + context: { + logger: Logger(""), + migratorType: MigratorType.MIGRATION, + unsafe: false, + }, + }; + + beforeAll(setupTestDb); + beforeEach(WatchMigration.prototype.up); + beforeEach(UtilDriver.generateV0SyncData.bind(null, syncCount)); + afterEach(cleanupCollections); + afterEach(() => mongoService.watch.drop()); + afterAll(cleanupTestDb); + + it("migrates events watch data from sync to watch collection", async () => { + const syncDocs = await mongoService.sync.find().toArray(); + + // Verify only exact sync data count exists initially + expect(syncDocs).toHaveLength(syncCount); + + // Run migration + await migration.up(migrationContext); + + // Verify watch data was created + const watchDocs = await mongoService.watch.find().toArray(); + + // Verify each watch document has correct data + // calendarlist will be absent since we do not currently store resourceId + expect(watchDocs).toEqual( + expect.arrayContaining( + syncDocs.flatMap(({ user, google }) => + google.events.map(() => + expect.objectContaining({ + _id: expect.any(ObjectId), + user, + resourceId: expect.any(String), + expiration: expect.any(Date), + createdAt: expect.any(Date), + }), + ), + ), + ), + ); + + // Verify original sync data is unchanged + const syncDocsAfter = await mongoService.sync.find().toArray(); + + expect(syncDocsAfter).toHaveLength(syncCount); + }); + + it("is non-destructive - does not modify watch collection on down", async () => { + // Setup some watch data + await migration.up(migrationContext); + + const syncDocs = await mongoService.sync.find().toArray(); + + // Run down migration + await migration.down(migrationContext); + + // Verify watch data is unchanged + const watchDocs = await mongoService.watch.find().toArray(); + + expect(watchDocs).toEqual( + expect.arrayContaining( + syncDocs.flatMap(({ user, google }) => + google.events.map(() => + expect.objectContaining({ + _id: expect.any(ObjectId), + user, + resourceId: expect.any(String), + expiration: expect.any(Date), + createdAt: expect.any(Date), + }), + ), + ), + ), + ); + }); +}); diff --git a/packages/scripts/src/common/zod-to-mongo-schema.test.ts b/packages/scripts/src/common/zod-to-mongo-schema.test.ts index a982d20a4..76d68bbff 100644 --- a/packages/scripts/src/common/zod-to-mongo-schema.test.ts +++ b/packages/scripts/src/common/zod-to-mongo-schema.test.ts @@ -1,11 +1,8 @@ // derived from https://github.com/mission-apprentissage/zod-to-mongodb-schema/blob/main/src/index.test.ts import { z } from "zod/v4"; import type { JSONSchema } from "zod/v4/core"; -import { - zObjectId, - zObjectIdMini, - zodToMongoSchema, -} from "@scripts/common/zod-to-mongo-schema"; +import { zodToMongoSchema } from "@scripts/common/zod-to-mongo-schema"; +import { zObjectId, zObjectIdMini } from "@core/types/type.utils"; describe("zodToMongoSchema", () => { it("should convert zod object properly", () => { diff --git a/packages/scripts/src/common/zod-to-mongo-schema.ts b/packages/scripts/src/common/zod-to-mongo-schema.ts index 1924c9a34..837f49b44 100644 --- a/packages/scripts/src/common/zod-to-mongo-schema.ts +++ b/packages/scripts/src/common/zod-to-mongo-schema.ts @@ -1,9 +1,7 @@ // derived from https://github.com/mission-apprentissage/zod-to-mongodb-schema/tree/main/index.ts -import { ObjectId } from "bson"; -import { z } from "zod/v4"; -import { z as zMini } from "zod/v4-mini"; import type { $ZodType, JSONSchema } from "zod/v4/core"; import { registry, toJSONSchema } from "zod/v4/core"; +import { zObjectId, zObjectIdMini } from "@core/types/type.utils"; type MongoType = "object" | "array" | "number" | "boolean" | "string" | "null"; @@ -124,16 +122,6 @@ function convertTypeToBsonType( } } -export const zObjectIdMini = zMini.pipe( - zMini.custom(ObjectId.isValid), - zMini.transform((v) => new ObjectId(v)), -); - -export const zObjectId = z.pipe( - z.custom((v) => ObjectId.isValid(v as string)), - z.transform((v) => new ObjectId(v)), -); - function resolveRef(root: JSONSchema.Schema, ref: string) { const parts: string[] = ref.split("/").slice(1); const schema = parts.reduce((acc, part) => { diff --git a/packages/scripts/src/migrations/2025.10.13T14.18.20.watch-collection.ts b/packages/scripts/src/migrations/2025.10.13T14.18.20.watch-collection.ts new file mode 100644 index 000000000..39e9eab03 --- /dev/null +++ b/packages/scripts/src/migrations/2025.10.13T14.18.20.watch-collection.ts @@ -0,0 +1,62 @@ +import type { RunnableMigration } from "umzug"; +import { MigrationContext } from "@scripts/common/cli.types"; +import { zodToMongoSchema } from "@scripts/common/zod-to-mongo-schema"; +import { WatchSchema } from "@core/types/watch.types"; +import mongoService from "@backend/common/services/mongo.service"; + +export default class Migration implements RunnableMigration { + readonly name: string = "2025.10.13T14.18.20.watch-collection"; + readonly path: string = "2025.10.13T14.18.20.watch-collection.ts"; + + async up(): Promise { + const { collectionName } = mongoService.watch; + const exists = await mongoService.collectionExists(collectionName); + const $jsonSchema = zodToMongoSchema(WatchSchema); + + if (exists) { + // do not run in session + await mongoService.db.command({ + collMod: collectionName, + validationLevel: "strict", + validator: { $jsonSchema }, + }); + } else { + await mongoService.db.createCollection(collectionName, { + validator: { $jsonSchema }, + validationLevel: "strict", + }); + } + + // Create index on user for efficient user-based queries + await mongoService.watch.createIndex( + { user: 1 }, + { name: `${collectionName}_user_index` }, + ); + + // Create compound index on user and expiration for cleanup operations + await mongoService.watch.createIndex( + { user: 1, expiration: 1 }, + { name: `${collectionName}_user_expiration_index` }, + ); + } + + async down(): Promise { + // do not drop table, just remove indexes and schema validation + const { collectionName } = mongoService.watch; + const exists = await mongoService.collectionExists(collectionName); + + if (!exists) return; + + await mongoService.db.command({ + collMod: collectionName, + validationLevel: "off", + validator: {}, + }); + + // _id index is built-in, no need to drop + await mongoService.watch.dropIndex(`${collectionName}_user_index`); + await mongoService.watch.dropIndex( + `${collectionName}_user_expiration_index`, + ); + } +} diff --git a/packages/scripts/src/migrations/2025.10.13T14.22.21.migrate-sync-watch-data.ts b/packages/scripts/src/migrations/2025.10.13T14.22.21.migrate-sync-watch-data.ts new file mode 100644 index 000000000..a97d2efd6 --- /dev/null +++ b/packages/scripts/src/migrations/2025.10.13T14.22.21.migrate-sync-watch-data.ts @@ -0,0 +1,85 @@ +import { ObjectId, WithId } from "mongodb"; +import type { MigrationParams, RunnableMigration } from "umzug"; +import { MigrationContext } from "@scripts/common/cli.types"; +import { Schema_Watch, WatchSchema } from "@core/types/watch.types"; +import { getGcalClient } from "@backend/auth/services/google.auth.service"; +import gcalService from "@backend/common/services/gcal/gcal.service"; +import mongoService from "@backend/common/services/mongo.service"; +import syncService from "@backend/sync/services/sync.service"; +import { getChannelExpiration } from "@backend/sync/util/sync.util"; + +export default class Migration implements RunnableMigration { + readonly name: string = "2025.10.13T14.22.21.migrate-sync-watch-data"; + readonly path: string = "2025.10.13T14.22.21.migrate-sync-watch-data.ts"; + + async up(params: MigrationParams): Promise { + const { logger } = params.context; + const session = await mongoService.startSession(); + // This is a non-destructive migration to copy events watch data + // from sync collection to watch collection + + const cursor = mongoService.sync.find( + { "google.events": { $exists: true, $ne: [] } }, + { batchSize: 100, session }, + ); + + let migratedCount = 0; + + for await (const syncDoc of cursor) { + if (!syncDoc.google?.events?.length) continue; + + const watchDocuments: Array>> = []; + // we will not migrate calendarlist watches as we do not store resourceId + // for them currently and they are unused + const syncDocs = syncDoc.google.events; + const gcal = await getGcalClient(syncDoc.user); + const expiration = getChannelExpiration(); + + await Promise.allSettled([ + ...syncDocs.map(async (s) => { + await syncService + .stopWatch(syncDoc.user, s.channelId, s.resourceId, gcal) + .catch(logger.error); + + const _id = new ObjectId(); + const channelId = _id.toString(); + + const { watch } = await gcalService.watchEvents(gcal, { + channelId, + expiration, + gCalendarId: s.gCalendarId, + nextSyncToken: s.nextSyncToken, + }); + + watchDocuments.push( + WatchSchema.parse({ + _id, + user: syncDoc.user, + resourceId: watch.resourceId!, + expiration: new Date(parseInt(watch.expiration!)), + createdAt: new Date(), // Set current time as creation time for migration + }), + ); + }), + ]); + + if (watchDocuments.length > 0) { + const result = await mongoService.watch.insertMany(watchDocuments, { + session, + }); + migratedCount += result.insertedCount; + } + } + + logger.info( + `Migrated ${migratedCount} events watch channels to watch collection`, + ); + } + + async down(params: MigrationParams): Promise { + const { logger } = params.context; + // This is a non-destructive migration, we don't remove the data from watch collection + // because it might have been updated or new watches might have been added + logger.info("Non-destructive migration: watch collection data left intact"); + } +}