Skip to content

Commit 39e7f2e

Browse files
authored
Merge pull request #678 from complexdatacollective/feature/export-progress
Add real-time progress feedback to interview data export
2 parents 1f9f2d2 + d5e131f commit 39e7f2e

File tree

24 files changed

+2579
-318
lines changed

24 files changed

+2579
-318
lines changed

actions/interviews.ts

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,12 @@
11
'use server';
22

33
import { createId } from '@paralleldrive/cuid2';
4-
import { Effect } from 'effect';
54
import { after } from 'next/server';
65
import { safeRevalidateTag, safeUpdateTag } from '~/lib/cache';
76
import { prisma } from '~/lib/db';
87
import { type Interview } from '~/lib/db/generated/client';
9-
import { ExportLayer } from '~/lib/export/layers/ExportLayer';
10-
import { exportPipeline } from '~/lib/export/pipeline';
118
import { createInitialNetwork } from '~/lib/interviewer/ducks/modules/session';
12-
import type {
13-
ExportOptions,
14-
ExportReturn,
15-
} from '~/lib/network-exporters/utils/types';
16-
import {
17-
captureEvent,
18-
captureException,
19-
shutdownPostHog,
20-
} from '~/lib/posthog-server';
9+
import { captureException, shutdownPostHog } from '~/lib/posthog-server';
2110
import { getAppSetting } from '~/queries/appSettings';
2211
import type { CreateInterview, DeleteInterviews } from '~/schemas/interviews';
2312
import { requireApiAuth } from '~/utils/auth';
@@ -81,44 +70,6 @@ export const updateExportTime = async (interviewIds: Interview['id'][]) => {
8170
}
8271
};
8372

84-
export const exportInterviews = async (
85-
interviewIds: Interview['id'][],
86-
exportOptions: ExportOptions,
87-
): Promise<ExportReturn> => {
88-
await requireApiAuth();
89-
90-
const result = await exportPipeline(interviewIds, exportOptions).pipe(
91-
Effect.catchAll((error) =>
92-
Effect.succeed({
93-
status: 'error' as const,
94-
error: error.userMessage,
95-
} satisfies ExportReturn),
96-
),
97-
Effect.provide(ExportLayer),
98-
Effect.runPromise,
99-
);
100-
101-
after(async () => {
102-
if (result.status === 'error') {
103-
await captureException(new Error(result.error ?? 'Unknown error'), {
104-
interviewCount: interviewIds.length,
105-
exportOptions,
106-
});
107-
} else {
108-
await captureEvent('DataExported', {
109-
status: result.status,
110-
sessions: interviewIds.length,
111-
exportOptions,
112-
result,
113-
});
114-
}
115-
await shutdownPostHog();
116-
});
117-
118-
safeUpdateTag('getInterviews');
119-
return result;
120-
};
121-
12273
export async function createInterview(data: CreateInterview) {
12374
const { participantIdentifier, protocolId } = data;
12475

actions/synthetic-interviews.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,19 @@ import { safeUpdateTag } from '~/lib/cache';
55
import { prisma } from '~/lib/db';
66
import { requireApiAuth } from '~/utils/auth';
77

8+
export async function revalidateSyntheticData() {
9+
await requireApiAuth();
10+
11+
safeUpdateTag([
12+
'getInterviews',
13+
'getParticipants',
14+
'interviewCount',
15+
'participantCount',
16+
'summaryStatistics',
17+
'activityFeed',
18+
]);
19+
}
20+
821
export async function deleteSyntheticData() {
922
await requireApiAuth();
1023

app/api/export-interviews/route.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import { Effect, Queue, Stream } from 'effect';
2+
import { addEvent } from '~/actions/activityFeed';
3+
import { safeRevalidateTag } from '~/lib/cache';
4+
import { type ExportEvent, formatSSE } from '~/lib/export/exportEvents';
5+
import { ExportLayer } from '~/lib/export/layers/ExportLayer';
6+
import { exportPipeline } from '~/lib/export/pipeline';
7+
import {
8+
captureEvent,
9+
captureException,
10+
shutdownPostHog,
11+
} from '~/lib/posthog-server';
12+
import { exportInterviewsSchema } from '~/schemas/export';
13+
import { requireApiAuth } from '~/utils/auth';
14+
15+
export async function POST(request: Request) {
16+
try {
17+
await requireApiAuth();
18+
} catch {
19+
return new Response(JSON.stringify({ error: 'Unauthorized' }), {
20+
status: 401,
21+
});
22+
}
23+
24+
let body: unknown;
25+
try {
26+
body = await request.json();
27+
} catch {
28+
return new Response(JSON.stringify({ error: 'Invalid JSON body' }), {
29+
status: 400,
30+
});
31+
}
32+
33+
const parsed = exportInterviewsSchema.safeParse(body);
34+
35+
if (!parsed.success) {
36+
return new Response(JSON.stringify({ error: 'Invalid request body' }), {
37+
status: 400,
38+
});
39+
}
40+
41+
const { interviewIds, exportOptions } = parsed.data;
42+
43+
const program = Effect.gen(function* () {
44+
const queue = yield* Queue.unbounded<ExportEvent>();
45+
46+
yield* exportPipeline(interviewIds, exportOptions, queue).pipe(
47+
Effect.tap((result) =>
48+
Effect.sync(() => {
49+
safeRevalidateTag(['getInterviews', 'activityFeed']);
50+
void addEvent(
51+
'Data Exported',
52+
`Exported data for ${String(interviewIds.length)} interview(s)`,
53+
);
54+
void captureEvent('Data Exported', {
55+
interviewCount: interviewIds.length,
56+
}).then(() => shutdownPostHog());
57+
}).pipe(
58+
Effect.andThen(
59+
Queue.offer(queue, {
60+
type: 'complete',
61+
zipUrl: result.zipUrl ?? '',
62+
zipKey: result.zipKey ?? '',
63+
}),
64+
),
65+
),
66+
),
67+
Effect.tapError((error) =>
68+
Effect.sync(() => {
69+
void captureException(error).then(() => shutdownPostHog());
70+
}).pipe(
71+
Effect.andThen(
72+
Queue.offer(queue, {
73+
type: 'error',
74+
message: error.userMessage,
75+
}),
76+
),
77+
),
78+
),
79+
Effect.catchAll(() => Effect.void),
80+
Effect.ensuring(Queue.shutdown(queue)),
81+
Effect.provide(ExportLayer),
82+
Effect.forkDaemon,
83+
);
84+
85+
const encoder = new TextEncoder();
86+
const sseStream = Stream.fromQueue(queue).pipe(
87+
Stream.map((event) => encoder.encode(formatSSE(event))),
88+
);
89+
90+
return Stream.toReadableStream(sseStream);
91+
});
92+
93+
const readableStream = await Effect.runPromise(program);
94+
95+
return new Response(readableStream, {
96+
headers: {
97+
'Content-Type': 'text/event-stream',
98+
'Cache-Control': 'no-cache',
99+
'Connection': 'keep-alive',
100+
},
101+
});
102+
}

app/api/generate-test-interviews/route.ts

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { createId } from '@paralleldrive/cuid2';
22
import { addEvent } from '~/actions/activityFeed';
3-
import { safeRevalidateTag } from '~/lib/cache';
43
import { prisma } from '~/lib/db';
54
import { generateNetwork } from '~/lib/synthetic-interviews/generateNetwork';
65
import { generateSyntheticInterviewsSchema } from '~/schemas/synthetic-interviews';
@@ -15,7 +14,15 @@ export async function POST(request: Request) {
1514
});
1615
}
1716

18-
const body: unknown = await request.json();
17+
let body: unknown;
18+
try {
19+
body = await request.json();
20+
} catch {
21+
return new Response(JSON.stringify({ error: 'Invalid JSON body' }), {
22+
status: 400,
23+
});
24+
}
25+
1926
const parsed = generateSyntheticInterviewsSchema.safeParse(body);
2027

2128
if (!parsed.success) {
@@ -126,15 +133,6 @@ export async function POST(request: Request) {
126133
send({ type: 'progress', current: i + 1, total: count });
127134
}
128135

129-
safeRevalidateTag([
130-
'getInterviews',
131-
'getParticipants',
132-
'interviewCount',
133-
'participantCount',
134-
'summaryStatistics',
135-
'activityFeed',
136-
]);
137-
138136
void addEvent(
139137
'Synthetic Data Generated',
140138
`Generated ${String(count)} synthetic interviews for protocol "${protocol.name}"`,

app/dashboard/_components/InterviewsTable/ActionsDropdown.tsx

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,22 @@ import {
1616
DropdownMenuLabel,
1717
DropdownMenuTrigger,
1818
} from '~/components/ui/dropdown-menu';
19-
import type { Interview } from '~/lib/db/generated/client';
19+
import type { GetInterviewsQuery } from '~/queries/interviews';
2020

21-
export const ActionsDropdown = ({ row }: { row: Row<Interview> }) => {
21+
type InterviewRow = GetInterviewsQuery[number];
22+
23+
export const ActionsDropdown = ({ row }: { row: Row<InterviewRow> }) => {
2224
const [showDeleteModal, setShowDeleteModal] = useState(false);
2325
const [showExportModal, setShowExportModal] = useState(false);
24-
const [selectedInterviews, setSelectedInterviews] = useState<Interview[]>();
26+
const [selectedInterviews, setSelectedInterviews] =
27+
useState<InterviewRow[]>();
2528

26-
const handleDelete = (data: Interview) => {
29+
const handleDelete = (data: InterviewRow) => {
2730
setSelectedInterviews([data]);
2831
setShowDeleteModal(true);
2932
};
3033

31-
const handleExport = (data: Interview) => {
34+
const handleExport = (data: InterviewRow) => {
3235
setSelectedInterviews([data]);
3336
setShowExportModal(true);
3437
};

app/dashboard/_components/InterviewsTable/Columns.tsx

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,16 @@ export const InterviewColumns = (): ColumnDef<
130130
{
131131
id: 'progress',
132132
accessorFn: (row) => {
133-
const stages = row.protocol.stages;
134-
return Array.isArray(stages)
135-
? (row.currentStep / stages.length) * 100
136-
: 0;
133+
const stageCount = row.protocol.stageCount;
134+
return stageCount > 0 ? (row.currentStep / stageCount) * 100 : 0;
137135
},
138136
header: ({ column }) => {
139137
return <DataTableColumnHeader column={column} title="Progress" />;
140138
},
141139
cell: ({ row }) => {
142-
const stages = row.original.protocol.stages;
143-
const progress = (row.original.currentStep / stages.length) * 100;
140+
const stageCount = row.original.protocol.stageCount;
141+
const progress =
142+
stageCount > 0 ? (row.original.currentStep / stageCount) * 100 : 0;
144143
return (
145144
<div className="flex items-center whitespace-nowrap">
146145
<ProgressBar
@@ -158,18 +157,15 @@ export const InterviewColumns = (): ColumnDef<
158157
enableSorting: false,
159158
accessorFn: (row) => {
160159
const network = row.network;
161-
const nodeCount = network?.nodes?.length ?? 0;
162-
const edgeCount = network?.edges?.length ?? 0;
160+
const nodeCount = network.nodes.reduce((sum, n) => sum + n.count, 0);
161+
const edgeCount = network.edges.reduce((sum, e) => sum + e.count, 0);
163162
return nodeCount + edgeCount;
164163
},
165164
header: ({ column }) => {
166165
return <DataTableColumnHeader column={column} title="Network" />;
167166
},
168167
cell: ({ row }) => {
169-
const network = row.original.network;
170-
const codebook = row.original.protocol.codebook;
171-
172-
return <NetworkSummary network={network} codebook={codebook} />;
168+
return <NetworkSummary network={row.original.network} />;
173169
},
174170
},
175171
{

app/dashboard/_components/InterviewsTable/NetworkSummary.tsx

Lines changed: 22 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import { type Codebook } from '@codaco/protocol-validation';
2-
import type { NcNetwork } from '@codaco/shared-consts';
3-
import Node from '~/components/Node';
1+
import Node, { type NodeColorSequence } from '~/components/Node';
2+
import type { GetInterviewsQuery } from '~/queries/interviews';
43
import { cx } from '~/utils/cva';
54

65
// TODO: Move to shared-consts or protocol-validation
@@ -109,53 +108,36 @@ function EdgeSummary({ color, count, typeName }: EdgeSummaryProps) {
109108

110109
const NetworkSummary = ({
111110
network,
112-
codebook,
113111
}: {
114-
network: NcNetwork | null;
115-
codebook: Codebook | null;
112+
network: GetInterviewsQuery[number]['network'];
116113
}) => {
117-
if (!network || !codebook) {
118-
return <div className="text-xs">No interview data</div>;
119-
}
120-
const nodeSummaries = Object.entries(
121-
network.nodes?.reduce<Record<string, number>>((acc, node) => {
122-
acc[node.type] = (acc[node.type] ?? 0) + 1;
123-
return acc;
124-
}, {}) ?? {},
125-
).map(([nodeType, count]) => {
126-
const nodeInfo = codebook.node?.[nodeType];
127-
128-
return (
114+
const nodeSummaries = network.nodes.map(
115+
({ type: nodeType, count, name, color }) => (
129116
<div className="flex flex-col items-center" key={nodeType}>
130117
<Node
131118
size="xxs"
132-
color={nodeInfo?.color}
119+
color={color as NodeColorSequence}
133120
label={count.toLocaleString()}
134121
/>
135-
<span className="pt-1 text-xs">{nodeInfo?.name ?? 'Unknown'}</span>
122+
<span className="pt-1 text-xs">{name}</span>
136123
</div>
137-
);
138-
});
139-
140-
const edgeSummaries = Object.entries(
141-
network.edges?.reduce<Record<string, number>>((acc, edge) => {
142-
acc[edge.type] = (acc[edge.type] ?? 0) + 1;
143-
return acc;
144-
}, {}) ?? {},
145-
).map(([edgeType, count]) => {
146-
const edgeInfo = codebook.edge?.[edgeType];
124+
),
125+
);
147126

148-
if (!edgeInfo) return null;
127+
const edgeSummaries = network.edges
128+
.map(({ type: edgeType, count, name, color }) => {
129+
if (!color) return null;
149130

150-
return (
151-
<EdgeSummary
152-
key={edgeType}
153-
color={edgeInfo.color as EdgeColorSequence}
154-
count={count}
155-
typeName={edgeInfo.name}
156-
/>
157-
);
158-
});
131+
return (
132+
<EdgeSummary
133+
key={edgeType}
134+
color={color as EdgeColorSequence}
135+
count={count}
136+
typeName={name}
137+
/>
138+
);
139+
})
140+
.filter(Boolean);
159141

160142
if (nodeSummaries.length === 0 && edgeSummaries.length === 0) {
161143
return <div className="text-xs">No nodes or edges</div>;

0 commit comments

Comments
 (0)