Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 33 additions & 28 deletions apps/orchestrator/src/activities/post.activity.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import {
Activity,
ActivityMethod,
Expand All @@ -16,7 +16,10 @@ import { AuthTokenDetails } from '@gitroom/nestjs-libraries/integrations/social/
import { RefreshIntegrationService } from '@gitroom/nestjs-libraries/integrations/refresh.integration.service';
import { timer } from '@gitroom/helpers/utils/timer';
import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service';
import { WebhooksService } from '@gitroom/nestjs-libraries/database/prisma/webhooks/webhooks.service';
import {
PostWebhookEvent,
WebhooksService,
} from '@gitroom/nestjs-libraries/database/prisma/webhooks/webhooks.service';
import { TypedSearchAttributes } from '@temporalio/common';
import {
organizationId,
Expand All @@ -26,6 +29,8 @@ import {
@Injectable()
@Activity()
export class PostActivity {
private readonly logger = new Logger(PostActivity.name);

constructor(
private _postService: PostsService,
private _notificationService: NotificationService,
Expand Down Expand Up @@ -246,32 +251,32 @@ export class PostActivity {
}

@ActivityMethod()
async sendWebhooks(postId: string, orgId: string, integrationId: string) {
const webhooks = (await this._webhookService.getWebhooks(orgId)).filter(
(f) => {
return (
f.integrations.length === 0 ||
f.integrations.some((i) => i.integration.id === integrationId)
);
}
);

const post = await this._postService.getPostByForWebhookId(postId);
return Promise.all(
webhooks.map(async (webhook) => {
try {
await fetch(webhook.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(post),
});
} catch (e) {
/**empty**/
}
})
);
async sendWebhooks(
orgId: string,
integrationId: string,
event: PostWebhookEvent,
postId: string,
error?: string
) {
try {
const postRows = await this._postService.getPostByForWebhookId(postId);
const post = Array.isArray(postRows) ? postRows[0] : postRows;
const data = post
? { ...post, ...(error ? { error } : {}) }
: { postId, ...(error ? { error } : {}) };
await this._webhookService.sendPostEvent(
orgId,
integrationId,
event,
data as Record<string, unknown>
);
} catch (err) {
this.logger.error(
`sendWebhooks failed (event=${event}, postId=${postId}, orgId=${orgId}): ${err instanceof Error ? err.message : String(err)}`,
err instanceof Error ? err.stack : undefined
);
throw err;
}
}
@ActivityMethod()
async processPlug(data: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ export async function postWorkflowV101({
const refresh = await refreshToken(post.integration);
if (!refresh || !refresh.accessToken) {
await changeState(postsList[0].id, 'ERROR', err, postsList);
await sendWebhooks(
post.organizationId,
post.integration.id,
'post.failed',
postsList[0].id,
err?.message ?? String(err)
);
return false;
}

Expand All @@ -207,6 +214,13 @@ export async function postWorkflowV101({

// for other errors, change state and inform the user if needed
await changeState(postsList[0].id, 'ERROR', err, postsList);
await sendWebhooks(
post.organizationId,
post.integration.id,
'post.failed',
postsList[0].id,
err?.message ?? String(err)
);

// specific case for bad body errors
if (
Expand Down Expand Up @@ -237,11 +251,12 @@ export async function postWorkflowV101({
}
}

// send webhooks for the post
// send webhooks for the post (use internal post id)
await sendWebhooks(
postsResults[0].postId,
post.organizationId,
post.integration.id
post.integration.id,
'post.published',
postsList[0].id
);

// load internal plugs like repost by other users
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
BadRequestException,
Injectable,
Logger,
ValidationPipe,
} from '@nestjs/common';
import { PostsRepository } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.repository';
Expand Down Expand Up @@ -35,6 +36,7 @@ import { timer } from '@gitroom/helpers/utils/timer';
import { ioRedis } from '@gitroom/nestjs-libraries/redis/redis.service';
import { RefreshToken } from '@gitroom/nestjs-libraries/integrations/social.abstract';
import { RefreshIntegrationService } from '@gitroom/nestjs-libraries/integrations/refresh.integration.service';
import { WebhooksService } from '@gitroom/nestjs-libraries/database/prisma/webhooks/webhooks.service';

type PostWithConditionals = Post & {
integration?: Integration;
Expand All @@ -43,6 +45,7 @@ type PostWithConditionals = Post & {

@Injectable()
export class PostsService {
private readonly logger = new Logger(PostsService.name);
private storage = UploadFactory.createStorage();
constructor(
private _postRepository: PostsRepository,
Expand All @@ -52,7 +55,8 @@ export class PostsService {
private _shortLinkService: ShortLinkService,
private _openaiService: OpenaiService,
private _temporalService: TemporalService,
private _refreshIntegrationService: RefreshIntegrationService
private _refreshIntegrationService: RefreshIntegrationService,
private _webhooksService: WebhooksService
) {}

searchForMissingThreeHoursPosts() {
Expand Down Expand Up @@ -503,6 +507,13 @@ export class PostsService {
}

async deletePost(orgId: string, group: string) {
// fetch post data before soft-delete for webhook payload
const postsInGroup = await this._postRepository.getPostsByGroup(
orgId,
group
);
const firstPost = postsInGroup?.[0];

const post = await this._postRepository.deletePost(orgId, group);

if (post?.id) {
Expand Down Expand Up @@ -530,6 +541,38 @@ export class PostsService {
} catch (err) {}
}

// send webhook for post.deleted
if (firstPost) {
try {
const data = {
id: firstPost.id,
content: firstPost.content,
publishDate: firstPost.publishDate,
releaseURL: firstPost.releaseURL,
state: firstPost.state,
integration: firstPost.integration
? {
id: firstPost.integration.id,
name: firstPost.integration.name,
providerIdentifier: firstPost.integration.providerIdentifier,
picture: firstPost.integration.picture,
type: firstPost.integration.type,
}
: undefined,
};
await this._webhooksService.sendPostEvent(
orgId,
firstPost.integrationId,
'post.deleted',
data as Record<string, unknown>
);
} catch (err) {
this.logger.warn(
`Webhook send failed for post.deleted (postId=${firstPost.id}, orgId=${orgId}): ${err instanceof Error ? err.message : String(err)}`
);
}
}

return { error: true };
}

Expand Down Expand Up @@ -636,6 +679,24 @@ export class PostsService {
).catch((err) => {});
}

// send webhook for post.created or post.updated
try {
const webhookData = await this.getPostByForWebhookId(posts[0].id);
const data = Array.isArray(webhookData) ? webhookData[0] : webhookData;
if (data) {
await this._webhooksService.sendPostEvent(
orgId,
post.integration.id,
body.type === 'update' ? 'post.updated' : 'post.created',
data as Record<string, unknown>
);
}
} catch (err) {
this.logger.warn(
`Webhook send failed for ${body.type === 'update' ? 'post.updated' : 'post.created'} (postId=${posts[0].id}, orgId=${orgId}): ${err instanceof Error ? err.message : String(err)}`
);
}

Sentry.metrics.count('post_created', 1);
postList.push({
postId: posts[0].id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { WebhooksRepository } from '@gitroom/nestjs-libraries/database/prisma/webhooks/webhooks.repository';
import { WebhooksDto } from '@gitroom/nestjs-libraries/dtos/webhooks/webhooks.dto';

export type PostWebhookEvent =
| 'post.created'
| 'post.updated'
| 'post.deleted'
| 'post.published'
| 'post.failed';

@Injectable()
export class WebhooksService {
private readonly logger = new Logger(WebhooksService.name);

constructor(private _webhooksRepository: WebhooksRepository) {}

getTotal(orgId: string) {
Expand All @@ -21,4 +30,80 @@ export class WebhooksService {
deleteWebhook(orgId: string, id: string) {
return this._webhooksRepository.deleteWebhook(orgId, id);
}

/**
* Sends a post-related event to all configured webhooks for the org that match the integration.
* Webhooks with no integrations configured receive all events; otherwise only when integrationId matches.
*/
async sendPostEvent(
orgId: string,
integrationId: string,
event: PostWebhookEvent,
data: Record<string, unknown>
): Promise<void> {
let webhooks: Awaited<ReturnType<WebhooksRepository['getWebhooks']>>;
try {
webhooks = (await this._webhooksRepository.getWebhooks(orgId)).filter(
(w) =>
w.integrations.length === 0 ||
w.integrations.some((i) => i.integration.id === integrationId)
);
} catch (err) {
this.logger.error(
`Failed to load webhooks for org ${orgId}: ${err instanceof Error ? err.message : String(err)}`,
err instanceof Error ? err.stack : undefined
);
return;
}

if (webhooks.length === 0) {
this.logger.debug(
`No webhooks configured for org ${orgId} (integration ${integrationId}), event ${event}`
);
return;
}

const payload = {
event,
timestamp: new Date().toISOString(),
data,
};

this.logger.log(
`Sending webhook event ${event} to ${webhooks.length} endpoint(s) (orgId=${orgId})`
);

const results = await Promise.allSettled(
webhooks.map((webhook: (typeof webhooks)[number]) =>
fetch(webhook.url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
})
)
);

results.forEach(
(result: PromiseSettledResult<Response>, index: number) => {
const webhook = webhooks[index];
if (result.status === 'fulfilled') {
const res = result.value;
if (!res.ok) {
this.logger.warn(
`Webhook ${webhook.url} returned ${res.status} for event ${event} (orgId=${orgId}, webhookId=${webhook.id})`
);
} else {
this.logger.debug(
`Webhook delivered to ${webhook.url} for event ${event} (webhookId=${webhook.id})`
);
}
} else {
this.logger.error(
`Webhook failed for ${webhook.url} (event=${event}, orgId=${orgId}, webhookId=${webhook.id}): ${result.reason instanceof Error ? result.reason.message : String(result.reason)}`,
result.reason instanceof Error ? result.reason.stack : undefined
);
}
}
);
}
}
Loading