Skip to content

Commit d2ce9fe

Browse files
committed
processing flow, rss fixing
1 parent 8aead85 commit d2ce9fe

File tree

12 files changed

+2183
-42
lines changed

12 files changed

+2183
-42
lines changed

apps/app/src/components/processing/ProcessingHistory.tsx

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,20 @@ export function ProcessingHistory({
6060
}),
6161
jobColumnHelper.accessor("status", {
6262
header: "Status",
63-
cell: (info) => (
64-
<Badge
65-
variant={
66-
info.getValue() === "completed"
67-
? "secondary"
68-
: info.getValue() === "failed"
69-
? "destructive"
70-
: info.getValue() === "processing"
71-
? "default"
72-
: "outline"
73-
}
74-
>
75-
{info.getValue()}
76-
</Badge>
77-
),
63+
cell: (info) => {
64+
const status = info.getValue();
65+
const variant =
66+
status === "completed"
67+
? "secondary"
68+
: status === "failed"
69+
? "destructive"
70+
: status === "processing"
71+
? "default"
72+
: status === "completed_with_errors"
73+
? "warning"
74+
: "outline";
75+
return <Badge variant={variant}>{status}</Badge>;
76+
},
7877
}),
7978
jobColumnHelper.accessor("startedAt", {
8079
header: "Started",
@@ -175,7 +174,6 @@ export function ProcessingHistory({
175174
return (
176175
<div className="flex items-center gap-2">
177176
<Button
178-
variant="ghost"
179177
size="sm"
180178
onClick={() => {
181179
setSelectedStep(step);

apps/app/src/components/processing/ProcessingStepDetails.tsx

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { useTweakAndReprocessStep } from "@/lib/api";
22
import { ProcessingStep } from "@curatedotfun/types";
33
import { format } from "date-fns";
4-
import { Loader2 } from "lucide-react";
4+
import { ClipboardCopy, Loader2 } from "lucide-react";
55
import { useEffect, useState } from "react";
66
import { Badge } from "../ui/badge";
77
import { Button } from "../ui/button";
@@ -126,12 +126,22 @@ export function ProcessingStepDetails({
126126
{step.error && <TabsTrigger value="error">Error</TabsTrigger>}
127127
</TabsList>
128128
<TabsContent value="input" className="flex-1 flex flex-col gap-4">
129-
<Textarea
130-
value={editableInput}
131-
onChange={(e) => setEditableInput(e.target.value)}
132-
className="flex-1 font-mono text-sm"
133-
rows={15}
134-
/>
129+
<div className="relative flex-1">
130+
<Textarea
131+
value={editableInput}
132+
onChange={(e) => setEditableInput(e.target.value)}
133+
className="flex-1 font-mono text-sm h-full"
134+
rows={15}
135+
/>
136+
<Button
137+
variant="ghost"
138+
size="icon"
139+
className="absolute top-2 right-2"
140+
onClick={() => navigator.clipboard.writeText(editableInput)}
141+
>
142+
<ClipboardCopy className="h-4 w-4" />
143+
</Button>
144+
</div>
135145
<Button
136146
onClick={() => {
137147
tweakAndReprocess({
@@ -150,25 +160,57 @@ export function ProcessingStepDetails({
150160
</Button>
151161
</TabsContent>
152162
<TabsContent value="output" className="flex-1 overflow-hidden">
153-
<div className="h-[300px] w-full rounded-md border p-4 overflow-auto">
163+
<div className="h-[300px] w-full rounded-md border p-4 overflow-auto relative">
154164
<pre className="text-sm">
155165
{step.output ? formatJson(step.output) : "No output data"}
156166
</pre>
167+
{step.output && (
168+
<Button
169+
variant="ghost"
170+
size="icon"
171+
className="absolute top-2 right-2"
172+
onClick={() =>
173+
navigator.clipboard.writeText(formatJson(step.output))
174+
}
175+
>
176+
<ClipboardCopy className="h-4 w-4" />
177+
</Button>
178+
)}
157179
</div>
158180
</TabsContent>
159181
{step.config && (
160182
<TabsContent value="config" className="flex-1 overflow-hidden">
161-
<div className="h-[300px] w-full rounded-md border p-4 overflow-auto">
183+
<div className="h-[300px] w-full rounded-md border p-4 overflow-auto relative">
162184
<pre className="text-sm">{formatJson(step.config)}</pre>
185+
<Button
186+
variant="ghost"
187+
size="icon"
188+
className="absolute top-2 right-2"
189+
onClick={() =>
190+
navigator.clipboard.writeText(formatJson(step.config))
191+
}
192+
>
193+
<ClipboardCopy className="h-4 w-4" />
194+
</Button>
163195
</div>
164196
</TabsContent>
165197
)}
166198
{step.error && (
167199
<TabsContent value="error" className="flex-1 overflow-hidden">
168-
<div className="h-[300px] w-full rounded-md border p-4 bg-red-50 dark:bg-red-950 overflow-auto">
200+
<div className="h-[300px] w-full rounded-md border p-4 bg-red-50 dark:bg-red-950 overflow-auto relative">
169201
<pre className="text-sm text-red-600 dark:text-red-400">
170202
{formatJson(step.error)}
171203
</pre>
204+
<Button
205+
variant="ghost"
206+
size="icon"
207+
className="absolute top-2 right-2"
208+
onClick={() =>
209+
navigator.clipboard.writeText(formatJson(step.error))
210+
}
211+
>
212+
<ClipboardCopy className="h-4 w-4" />
213+
</Button>
172214
</div>
173215
</TabsContent>
174216
)}

apps/app/src/components/ui/badge.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ const badgeVariants = cva(
2121
"py-[4px] rounded-md border-none text-sm bg-red-100 min-w-[80px] text-red-600 leading-6 capitalize font-medium",
2222
pending:
2323
"py-[4px] rounded-md border-none text-center text-sm bg-yellow-100 min-w-[80px] text-yellow-600 leading-6 capitalize font-medium",
24+
warning:
25+
"border-transparent bg-yellow-500 text-neutral-50 shadow hover:bg-yellow-500/80 dark:bg-yellow-900 dark:text-neutral-50 dark:hover:bg-yellow-900/80",
2426
},
2527
},
2628
defaultVariants: {

apps/app/src/routes/_layout/feed/$feedId/_tabs/processing.tsx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ function ProcessingTab() {
141141
</TableCell>
142142
<TableCell className="text-right">
143143
<Button
144-
variant="ghost"
145144
size="sm"
146145
onClick={() => toggleExpand(submission.tweetId)}
147146
>

apps/app/src/store/feed-creation-store.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ const feedCreationStateCreator: StateCreator<
101101
plugin: "@curatedotfun/rss",
102102
config: {
103103
apiSecret: "{{RSS_SERVICE_API_SECRET}}",
104+
feedId: "",
104105
feedConfig: {
105106
title: "",
106107
description: "",
@@ -133,8 +134,10 @@ const feedCreationStateCreator: StateCreator<
133134
};
134135
state.feedConfig.outputs.stream.distribute.push(rssDistributor);
135136
}
137+
rssDistributor.config.feedId = values.id;
136138
rssDistributor.config.feedConfig.title = values.name ?? "";
137139
rssDistributor.config.feedConfig.description = values.description ?? "";
140+
rssDistributor.config.feedConfig.siteUrl = `https://app.curate.fun/feed/${values.id}`;
138141
rssDistributor.config.serviceUrl = `https://rss.curate.fun/${values.id}`;
139142
const objectTransform = rssDistributor.transform?.find(
140143
(t) => t.plugin === "@curatedotfun/object-transform",

packages/core-services/src/services/distribution.service.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { DistributionResult, RichSubmission } from "@curatedotfun/shared-db";
22
import type { ActionArgs, DistributorConfig } from "@curatedotfun/types";
33
import { PluginError, PluginErrorCode } from "@curatedotfun/utils";
44
import type { Logger } from "pino";
5+
import { ZodError } from "zod";
56
import type { ConfigService } from "./config.service";
67
import { logPluginError } from "../utils/error";
78
import { sanitizeJson } from "../utils/sanitize";
@@ -49,8 +50,16 @@ export class DistributionService implements IBaseService {
4950
await plugin.distribute(args);
5051
return { success: true };
5152
} catch (error) {
53+
let errorMessage =
54+
error instanceof Error ? error.message : "Distribution failed";
55+
if (error instanceof ZodError) {
56+
errorMessage = error.errors
57+
.map((e) => ` - [${e.path.join(".")}]: ${e.message} (${e.code})`)
58+
.join("\n");
59+
errorMessage = `Configuration validation failed:\n${errorMessage}`;
60+
}
5261
const pluginError = new PluginError(
53-
error instanceof Error ? error.message : "Distribution failed",
62+
errorMessage,
5463
{
5564
pluginName,
5665
operation: "distribute",

packages/core-services/src/services/processing.service.ts

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,48 +81,58 @@ export class ProcessingService implements IBaseService {
8181
);
8282

8383
let currentContent: unknown = content;
84+
const executedSteps: SelectProcessingStep[] = [];
8485

8586
try {
8687
for (let i = 0; i < plan.length; i++) {
8788
const planStep = plan[i];
8889
const stepOrder = i + 1;
8990

9091
if (planStep.type === "transformation") {
91-
currentContent = await this.executeTransform(
92+
const result = await this.executeTransform(
9293
job.id,
9394
currentContent,
9495
planStep,
9596
stepOrder,
9697
);
98+
executedSteps.push(result.step);
99+
currentContent = result.output;
97100
} else if (planStep.type === "distribution") {
98-
await this.executeDistribution(
101+
const step = await this.executeDistribution(
99102
job.id,
100103
currentContent,
101104
planStep,
102105
stepOrder,
103106
);
107+
executedSteps.push(step);
104108
}
105109
}
106110

107-
await this.processingRepository.updateJob(job.id, {
108-
status: "completed",
111+
const hasFailures = executedSteps.some((s) => s.status === "failed");
112+
const finalStatus = hasFailures ? "completed_with_errors" : "completed";
113+
114+
const updatedJob = await this.processingRepository.updateJob(job.id, {
115+
status: finalStatus,
109116
completedAt: new Date(),
110117
});
111-
this.logger.info({ jobId: job.id }, "Processing job completed.");
118+
this.logger.info(
119+
{ jobId: job.id, finalStatus },
120+
"Processing job finished.",
121+
);
122+
return updatedJob;
112123
} catch (error) {
113124
await this.handleProcessingError(job.id, error);
114125
throw error;
115126
}
116-
return job;
117127
}
118128

119129
private async executeTransform(
120130
jobId: string,
121131
input: unknown,
122132
planStep: ProcessingPlanStep,
123133
stepOrder: number,
124-
): Promise<unknown> {
125-
const step = await this.processingRepository.createStep({
134+
): Promise<{ output: unknown; step: SelectProcessingStep }> {
135+
let step = await this.processingRepository.createStep({
126136
jobId,
127137
stepOrder,
128138
type: "transformation",
@@ -139,12 +149,12 @@ export class ProcessingService implements IBaseService {
139149
const output = await this.transformationService.applyTransforms(input, [
140150
planStep as TransformConfig,
141151
]);
142-
await this.processingRepository.updateStep(step.id, {
152+
const updatedStep = await this.processingRepository.updateStep(step.id, {
143153
status: "success",
144154
output: output as Json,
145155
completedAt: new Date(),
146156
});
147-
return output;
157+
return { output, step: updatedStep };
148158
} catch (error) {
149159
const stepError: StepError = {
150160
message: error instanceof Error ? error.message : String(error),
@@ -172,7 +182,7 @@ export class ProcessingService implements IBaseService {
172182
input: unknown,
173183
planStep: ProcessingPlanStep,
174184
stepOrder: number,
175-
) {
185+
): Promise<SelectProcessingStep> {
176186
const step = await this.processingRepository.createStep({
177187
jobId,
178188
stepOrder,
@@ -191,7 +201,7 @@ export class ProcessingService implements IBaseService {
191201
planStep as DistributorConfig,
192202
input,
193203
);
194-
await this.processingRepository.updateStep(step.id, {
204+
return this.processingRepository.updateStep(step.id, {
195205
status: "success",
196206
output: output as Json,
197207
completedAt: new Date(),
@@ -202,12 +212,15 @@ export class ProcessingService implements IBaseService {
202212
stack: error instanceof Error ? error.stack : undefined,
203213
stepName: planStep.plugin,
204214
};
205-
await this.processingRepository.updateStep(step.id, {
215+
this.logger.warn(
216+
{ jobId, stepId: step.id, plugin: planStep.plugin, error },
217+
"Distribution step failed. Continuing processing.",
218+
);
219+
return this.processingRepository.updateStep(step.id, {
206220
status: "failed",
207221
error: stepError as Json,
208222
completedAt: new Date(),
209223
});
210-
throw error;
211224
}
212225
}
213226

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TYPE "public"."processing_job_status" ADD VALUE 'completed_with_errors';

0 commit comments

Comments
 (0)