Skip to content

Commit aaeccfd

Browse files
error pushes
1 parent 6e6651b commit aaeccfd

File tree

1 file changed

+50
-46
lines changed

1 file changed

+50
-46
lines changed

packages/utils/src/core/destination.ts

Lines changed: 50 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,8 @@ export async function pushToDestinations(
4949
event?: WalkerOS.Event,
5050
): Promise<Elb.PushResult> {
5151
const { consent, globals, user } = instance;
52-
const results: Array<{
53-
id: string;
54-
destination: WalkerOSDestination.Destination;
55-
skipped?: boolean;
56-
queue?: WalkerOS.Events;
57-
error?: unknown;
58-
}> = [];
59-
60-
return Promise.all(
52+
53+
const results = await Promise.all(
6154
// Process all destinations in parallel
6255
Object.entries(destinations).map(async ([id, destination]) => {
6356
// Setup queue of events to be processed
@@ -115,64 +108,75 @@ export async function pushToDestinations(
115108
let error: unknown;
116109

117110
// Process allowed events and store failed ones in the dead letter queue (dlq)
118-
const dlq = await Promise.all(
119-
allowedEvents.filter(async (event) => {
111+
const pushes = await Promise.all(
112+
allowedEvents.map(async (event) => {
120113
if (error) {
121-
// Skip if an error occurred
122-
destination.queue?.push(event); // Add back to queue
114+
// Add back to queue
115+
destination.queue?.push(event);
116+
117+
// Skip further processing
118+
// @TODO do we really want to skip?
119+
return;
123120
}
124121

125122
// Merge event with instance state, prioritizing event properties
126123
event = assign({}, event);
127124
event.globals = assign(globals, event.globals);
128125
event.user = assign(user, event.user);
129126

130-
return !(await tryCatchAsync(destinationPush, (err) => {
127+
await tryCatchAsync(destinationPush, (err) => {
131128
// Call custom error handling if available
132129
if (instance.config.onError) instance.config.onError(err, instance);
133130
error = err; // Captured error from destination
134-
})(instance, destination, event));
131+
132+
return false;
133+
})(instance, destination, event);
134+
135+
return event;
135136
}),
136137
);
137138

139+
const dlq = pushes.filter(isDefined);
140+
138141
// Concatenate failed events with unprocessed ones in the queue
142+
// @TODO add to destination.dlq with error
139143
queue.concat(dlq);
140144

141145
return { id, destination, queue, error };
142146
}),
143-
).then((results) => {
144-
const successful: WalkerOSDestination.PushSuccess = [];
145-
const queued: WalkerOSDestination.PushSuccess = [];
146-
const failed: WalkerOSDestination.PushFailure = [];
147-
148-
for (const result of results) {
149-
if (result.skipped) continue;
150-
151-
const id = result.id;
152-
const destination = result.destination;
153-
154-
if (result.error) {
155-
failed.push({
156-
id,
157-
destination,
158-
error: String(result.error),
159-
});
160-
} else if (result.queue && result.queue.length) {
161-
// Merge queue with existing queue
162-
destination.queue = (destination.queue || []).concat(result.queue);
163-
queued.push({ id, destination });
164-
} else {
165-
successful.push({ id, destination });
166-
}
147+
);
148+
149+
const successful: WalkerOSDestination.PushSuccess = [];
150+
const queued: WalkerOSDestination.PushSuccess = [];
151+
const failed: WalkerOSDestination.PushFailure = [];
152+
153+
for (const result of results) {
154+
if (result.skipped) continue;
155+
156+
const id = result.id;
157+
const destination = result.destination;
158+
159+
if (result.error) {
160+
failed.push({
161+
id,
162+
destination,
163+
error: String(result.error),
164+
});
165+
} else if (result.queue && result.queue.length) {
166+
// Merge queue with existing queue
167+
destination.queue = (destination.queue || []).concat(result.queue);
168+
queued.push({ id, destination });
169+
} else {
170+
successful.push({ id, destination });
167171
}
172+
}
168173

169-
return {
170-
status: { ok: true },
171-
successful,
172-
queued,
173-
failed,
174-
};
175-
});
174+
return {
175+
status: { ok: true },
176+
successful,
177+
queued,
178+
failed,
179+
};
176180
}
177181

178182
export async function destinationInit<

0 commit comments

Comments
 (0)