Skip to content

Commit de1ae17

Browse files
authored
fix(stac-loader): only ingest the last item/collection for a given id in a batch (#227)
* fix(stac-loader): only ingest the last item/collection for a given id in a batch * fix(stac-loader): reduce number of stac-loader redrives from 5 to 2 * chore(stac-loader): refactor stac-loader handler into component functions
1 parent f49852c commit de1ae17

File tree

6 files changed

+421
-140
lines changed

6 files changed

+421
-140
lines changed

lib/stac-loader/index.ts

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,29 @@ export interface StacLoaderProps {
195195
* This approach balances throughput (larger batches = fewer database connections)
196196
* with latency (time-based triggers prevent indefinite waiting).
197197
*
198+
* ## Message Ordering and Deduplication
199+
*
200+
* **Standard Queues**: This construct uses standard (non-FIFO) SNS topics and SQS queues,
201+
* which means messages are **not guaranteed to arrive in order**. Multiple messages
202+
* with the same STAC item or collection ID may arrive in any sequence.
203+
*
204+
* **Timestamp-Based Deduplication**: Within each batch, the loader uses SNS timestamps
205+
* to ensure only the newest version of each item/collection is ingested:
206+
*
207+
* - When multiple messages have the same item/collection ID in a batch, the loader
208+
* compares their SNS Timestamps (automatically set when messages are published)
209+
* - Only the message with the **newest timestamp** is kept for database insertion
210+
* - Older versions are discarded and logged at the debug level
211+
* - This guarantees that within a batch, the chronologically latest update wins
212+
*
213+
* **Important Limitations**:
214+
* - Deduplication only occurs **within a single batch** - messages in different batches
215+
* are not compared across batches
216+
* - The database upsert operation will update existing records, so later batches can
217+
* still overwrite earlier batches regardless of timestamps
218+
* - For guaranteed ordering across all messages, consider implementing version tracking
219+
* in your STAC metadata and database constraints
220+
*
198221
* ## Error Handling and Dead Letter Queue
199222
*
200223
* Failed messages are sent to the dead letter queue after 5 processing attempts.
@@ -396,7 +419,7 @@ export class StacLoader extends Construct {
396419
visibilityTimeout: Duration.seconds(timeoutSeconds + 10),
397420
encryption: sqs.QueueEncryption.SQS_MANAGED,
398421
deadLetterQueue: {
399-
maxReceiveCount: 5,
422+
maxReceiveCount: 2,
400423
queue: this.deadLetterQueue,
401424
},
402425
});
@@ -408,7 +431,7 @@ export class StacLoader extends Construct {
408431

409432
// Subscribe the queue to the topic
410433
this.topic.addSubscription(
411-
new snsSubscriptions.SqsSubscription(this.queue)
434+
new snsSubscriptions.SqsSubscription(this.queue),
412435
);
413436

414437
// Create the lambda function
@@ -448,11 +471,11 @@ export class StacLoader extends Construct {
448471
new lambdaEventSources.SqsEventSource(this.queue, {
449472
batchSize: props.batchSize ?? 500,
450473
maxBatchingWindow: Duration.minutes(
451-
props.maxBatchingWindowMinutes ?? 1
474+
props.maxBatchingWindowMinutes ?? 1,
452475
),
453476
maxConcurrency: maxConcurrency,
454477
reportBatchItemFailures: true,
455-
})
478+
}),
456479
);
457480

458481
// Create outputs
@@ -490,7 +513,7 @@ export class StacItemLoader extends StacLoader {
490513
constructor(scope: Construct, id: string, props: StacLoaderProps) {
491514
console.warn(
492515
`StacItemLoader is deprecated. Please use StacLoader instead. ` +
493-
`StacItemLoader will be removed in a future version.`
516+
`StacItemLoader will be removed in a future version.`,
494517
);
495518

496519
super(scope, id, props);

0 commit comments

Comments
 (0)