diff --git a/apps/queue/src/domain/handler.ts b/apps/queue/src/domain/handler.ts index 8822cfaff..3380e4eef 100644 --- a/apps/queue/src/domain/handler.ts +++ b/apps/queue/src/domain/handler.ts @@ -1,6 +1,8 @@ import type { MailJob } from "./model/mail-job"; +import type { ZapierJob } from "./model/zapier-job"; import notificationQueue from "./notification-queue"; import mailQueue from "./queue"; +import zapierQueue from "./zapier-queue"; export async function addMailJob({ to, subject, body, from }: MailJob) { for (const recipient of to) { @@ -16,3 +18,7 @@ export async function addMailJob({ to, subject, body, from }: MailJob) { export async function addNotificationJob(notification) { await notificationQueue.add("notification", notification); } + +export async function addZapierJob(job: ZapierJob) { + await zapierQueue.add("zapier", job); +} diff --git a/apps/queue/src/domain/model/zapier-job.ts b/apps/queue/src/domain/model/zapier-job.ts new file mode 100644 index 000000000..dbef140a3 --- /dev/null +++ b/apps/queue/src/domain/model/zapier-job.ts @@ -0,0 +1,9 @@ +import { z } from "zod"; + +export const ZapierJob = z.object({ + domainId: z.string(), + action: z.string(), + payload: z.any(), +}); + +export type ZapierJob = z.infer; diff --git a/apps/queue/src/domain/zapier-queue.ts b/apps/queue/src/domain/zapier-queue.ts new file mode 100644 index 000000000..4d36b7b9e --- /dev/null +++ b/apps/queue/src/domain/zapier-queue.ts @@ -0,0 +1,8 @@ +import { Queue } from "bullmq"; +import redis from "../redis"; + +const zapierQueue = new Queue("zapier", { + connection: redis, +}); + +export default zapierQueue; diff --git a/apps/queue/src/index.ts b/apps/queue/src/index.ts index 437e98a3d..3b9e7ece1 100644 --- a/apps/queue/src/index.ts +++ b/apps/queue/src/index.ts @@ -5,6 +5,7 @@ import sseRoutes from "./sse/routes"; // start workers import "./domain/worker"; import "./workers/notifications"; +import "./workers/zapier"; // start loops import { startEmailAutomation } from "./start-email-automation"; diff --git a/apps/queue/src/job/routes.ts b/apps/queue/src/job/routes.ts index 1cb131fe7..a0b601920 100644 --- a/apps/queue/src/job/routes.ts +++ b/apps/queue/src/job/routes.ts @@ -1,7 +1,8 @@ import express from "express"; -import { addMailJob, addNotificationJob } from "../domain/handler"; +import { addMailJob, addNotificationJob, addZapierJob } from "../domain/handler"; import { logger } from "../logger"; import { MailJob } from "../domain/model/mail-job"; +import { ZapierJob } from "../domain/model/zapier-job"; import NotificationModel from "../domain/model/notification"; import { ObjectId } from "mongodb"; import { User } from "@courselit/common-models"; @@ -56,4 +57,18 @@ router.post( }, ); +router.post("/zapier", async (req: express.Request, res: express.Response) => { + try { + const { domainId, action, payload } = req.body; + ZapierJob.parse({ domainId, action, payload }); + + await addZapierJob({ domainId, action, payload }); + + res.status(200).json({ message: "Success" }); + } catch (err: any) { + logger.error(err); + res.status(500).json({ error: err.message }); + } +}); + export default router; diff --git a/apps/queue/src/workers/zapier.ts b/apps/queue/src/workers/zapier.ts new file mode 100644 index 000000000..f330c459d --- /dev/null +++ b/apps/queue/src/workers/zapier.ts @@ -0,0 +1,26 @@ +import { Worker } from "bullmq"; +import redis from "../redis"; +import { logger } from "../logger"; +import { ZapierJob } from "../domain/model/zapier-job"; + +const worker = new Worker( + "zapier", + async (job) => { + const { domainId, action, payload } = job.data as ZapierJob; + logger.info(`Processing zapier job for domain ${domainId}`, { + action, + payload, + }); + }, + { + connection: redis, + }, +); + +worker.on("completed", (job) => { + logger.info(`Zapier job ${job.id} completed`); +}); + +worker.on("failed", (job, err) => { + logger.error(`Zapier job ${job.id} failed with error ${err.message}`); +}); diff --git a/apps/web/app/api/payment/webhook/route.ts b/apps/web/app/api/payment/webhook/route.ts index 4d1285fc9..03506b4ce 100644 --- a/apps/web/app/api/payment/webhook/route.ts +++ b/apps/web/app/api/payment/webhook/route.ts @@ -14,6 +14,9 @@ import { error } from "@/services/logger"; import mongoose from "mongoose"; import Payment from "@/payments-new/payment"; import { activateMembership } from "../helpers"; +import UserModel from "@models/User"; +import CourseModel from "@models/Course"; +import { addZapierJob } from "@/services/queue"; export async function POST(req: NextRequest) { try { @@ -56,7 +59,7 @@ export async function POST(req: NextRequest) { membership, ); - await handleInvoice( + const invoice = await handleInvoice( domain, invoiceId, membership, @@ -81,6 +84,19 @@ export async function POST(req: NextRequest) { await activateMembership(domain, membership, paymentPlan); + const user = await UserModel.findOne({ userId: membership.userId, domain: domain._id }); + const course = await CourseModel.findOne({ courseId: membership.entityId, domain: domain._id }); + + await addZapierJob({ + domainId: domain._id.toString(), + action: "product_purchased", + payload: { + user, + course, + invoice, + }, + }); + return Response.json({ message: "success" }); } catch (e) { error(`Error in payment webhook: ${e.message}`, { @@ -145,8 +161,8 @@ async function handleInvoice( paymentMethod: any, currencyISOCode: string, body: any, -) { - const invoice = await InvoiceModel.findOne({ +): Promise { + let invoice = await InvoiceModel.findOne({ domain: domain._id, invoiceId, status: Constants.InvoiceStatus.PENDING, @@ -155,9 +171,9 @@ async function handleInvoice( invoice.paymentProcessorTransactionId = paymentMethod.getPaymentIdentifier(body); invoice.status = Constants.InvoiceStatus.PAID; - await (invoice as any).save(); + invoice = await (invoice as any).save(); } else { - await InvoiceModel.create({ + invoice = await InvoiceModel.create({ domain: domain._id, membershipId: membership.membershipId, membershipSessionId: membership.sessionId, @@ -174,6 +190,8 @@ async function handleInvoice( currencyISOCode, }); } + + return invoice!; } async function handleEMICancellation( diff --git a/apps/web/graphql/communities/logic.ts b/apps/web/graphql/communities/logic.ts index a7a00356d..3f783aff4 100644 --- a/apps/web/graphql/communities/logic.ts +++ b/apps/web/graphql/communities/logic.ts @@ -45,7 +45,7 @@ import { } from "./helpers"; import { error } from "@/services/logger"; import NotificationModel from "@models/Notification"; -import { addNotification } from "@/services/queue"; +import { addNotification, addZapierJob } from "@/services/queue"; import { hasActiveSubscription } from "../users/logic"; import { internal } from "@config/strings"; import { hasCommunityPermission as hasPermission } from "@ui-lib/utils"; @@ -492,6 +492,17 @@ export async function joinCommunity({ forUserIds: communityManagers.map((m) => m.userId), userId: ctx.user.userId, }); + + if (member.status === Constants.MembershipStatus.ACTIVE) { + await addZapierJob({ + domainId: ctx.subdomain._id.toString(), + action: "community_joined", + payload: { + user: ctx.user, + community, + }, + }); + } } return member; diff --git a/apps/web/graphql/lessons/logic.ts b/apps/web/graphql/lessons/logic.ts index f5d4faf6b..b744de544 100644 --- a/apps/web/graphql/lessons/logic.ts +++ b/apps/web/graphql/lessons/logic.ts @@ -24,6 +24,7 @@ import LessonEvaluation from "../../models/LessonEvaluation"; import { checkPermission } from "@courselit/utils"; import { recordActivity } from "../../lib/record-activity"; import { InternalCourse } from "@courselit/common-logic"; +import { addZapierJob } from "@/services/queue"; const { permissions, quiz } = constants; @@ -314,6 +315,7 @@ export const markLessonCompleted = async ( lessonId, courseId: lesson.courseId, user: ctx.user, + domainId: ctx.subdomain._id.toString(), }); await recordActivity({ @@ -357,6 +359,15 @@ const recordCourseCompleted = async (courseId: string, ctx: GQLContext) => { type: Constants.ActivityType.COURSE_COMPLETED, entityId: courseId, }); + + await addZapierJob({ + domainId: ctx.subdomain._id.toString(), + action: "course_completed", + payload: { + user: ctx.user, + course, + }, + }); }; export const evaluateLesson = async ( diff --git a/apps/web/graphql/users/logic.ts b/apps/web/graphql/users/logic.ts index a5d113695..47261a6f9 100644 --- a/apps/web/graphql/users/logic.ts +++ b/apps/web/graphql/users/logic.ts @@ -30,7 +30,8 @@ import { generateEmailFrom } from "@/lib/utils"; import MembershipModel from "@models/Membership"; import CommunityModel from "@models/Community"; import CourseModel from "@models/Course"; -import { addMailJob } from "@/services/queue"; +import LessonModel from "@models/Lesson"; +import { addMailJob, addZapierJob } from "@/services/queue"; import { getPaymentMethodFromSettings } from "@/payments-new"; import { checkForInvalidPermissions } from "@/lib/check-invalid-permissions"; import { activateMembership } from "@/app/api/payment/helpers"; @@ -183,6 +184,15 @@ export const inviteCustomer = async ( await activateMembership(ctx.subdomain!, membership, paymentPlan); + await addZapierJob({ + domainId: ctx.subdomain._id.toString(), + action: "course_enrolled", + payload: { + user, + course, + }, + }); + try { const emailBody = pug.render(courseEnrollTemplate, { courseName: course.title, @@ -289,10 +299,12 @@ export const recordProgress = async ({ lessonId, courseId, user, + domainId, }: { lessonId: string; - courseId: string; + courseId:string; user: User; + domainId: string; }) => { const enrolledItemIndex = user.purchases.findIndex( (progress: Progress) => progress.courseId === courseId, @@ -308,6 +320,19 @@ export const recordProgress = async ({ ) { user.purchases[enrolledItemIndex].completedLessons.push(lessonId); await (user as any).save(); + + const course = await CourseModel.findOne({ courseId, domain: domainId }); + const lesson = await LessonModel.findOne({ lessonId, courseId }); + + await addZapierJob({ + domainId, + action: "lesson_completed", + payload: { + user, + course, + lesson, + }, + }); } }; @@ -398,6 +423,12 @@ export async function createUser({ type: "newsletter_subscribed", }); } + + await addZapierJob({ + domainId: domain._id.toString(), + action: "new_user", + payload: createdUser, + }); } return createdUser; diff --git a/apps/web/lib/trigger-sequences.ts b/apps/web/lib/trigger-sequences.ts index a22921b5c..4d13f75f2 100644 --- a/apps/web/lib/trigger-sequences.ts +++ b/apps/web/lib/trigger-sequences.ts @@ -10,6 +10,7 @@ import RuleModel from "@models/Rule"; import SequenceModel from "@models/Sequence"; import mongoose from "mongoose"; import { error } from "../services/logger"; +import { addZapierJob } from "@/services/queue"; export async function triggerSequences({ user, @@ -73,6 +74,15 @@ export async function triggerSequences({ { _id: sequence._id }, { $addToSet: { entrants: user.userId } }, ); + + await addZapierJob({ + domainId: user.domain.toString(), + action: "sequence_subscribed", + payload: { + user, + sequence, + }, + }); } } catch (err: any) { error(err.message, { diff --git a/apps/web/services/queue.ts b/apps/web/services/queue.ts index bab4249ae..cab4167e8 100644 --- a/apps/web/services/queue.ts +++ b/apps/web/services/queue.ts @@ -103,6 +103,30 @@ export async function addMailJob({ to, from, subject, body }: MailProps) { } } +export async function addZapierJob(payload: any) { + try { + const jwtSecret = getJwtSecret(); + const token = jwtUtils.generateToken({ service: "app" }, jwtSecret); + const response = await fetch(`${queueServer}/job/zapier`, { + method: "POST", + headers: { + "content-type": "application/json", + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify(payload), + }); + const jsonResponse = await response.json(); + + if (response.status !== 200) { + throw new Error(jsonResponse.error); + } + } catch (err) { + error(`Error adding zapier job: ${err.message}`, { + payload + }); + } +} + export async function addNotification({ domain, entityId,