Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aws-lambda-stream",
"version": "1.1.14",
"version": "1.1.15",
"description": "Create stream processors with AWS Lambda functions.",
"keywords": [
"aws",
Expand Down
46 changes: 19 additions & 27 deletions src/flavors/job.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import _ from 'highland';
import {
printStartPipeline, printEndPipeline,
faulty, faultyAsyncStream, faultify,
splitObject, encryptEvent,
compact,
} from '../utils';
import {
scanSplitDynamoDB, querySplitDynamoDB, queryAllDynamoDB, batchGetDynamoDB,
Expand Down Expand Up @@ -132,35 +132,27 @@ export const toCursorUpdateRequest = (rule) => faulty((uow) => ({
}));

export const flushCursor = (rule) => (s) => {
let lastUow;

const cursorStream = () => _([lastUow])
.map(toCursorUpdateRequest(rule))
.through(updateDynamoDB({
...rule,
updateRequestField: 'cursorUpdateRequest',
updateResponseField: 'cursorUpdateResponse',
}));

/* istanbul ignore else */
if (rule.toCursorUpdateRequest) {
return s
.consume((err, x, push, next) => {
/* istanbul ignore if */
if (err) {
push(err);
next();
} else if (x === _.nil) {
if (lastUow) {
next(cursorStream());
} else {
push(null, x);
}
} else {
lastUow = x;
push(null, x);
next();
}
// Compact explicitly on PK here since we want to capture just the last event per PK in this
// invocation after the query split. This handles the case where multiple cursor events
// ended up in a single lambda invocation.
.through(compact({ ...rule, compact: true }))
.map(toCursorUpdateRequest(rule))
.through(updateDynamoDB({
...rule,
updateRequestField: 'cursorUpdateRequest',
updateResponseField: 'cursorUpdateResponse',
}))
// Maintains backwards compatibility with how this used to manipulate the UOWs,
// duping the last uow.
.flatMap((uow) => {
const { batch, ...lastUow } = uow;
return [
...batch,
lastUow,
];
});
} else {
return s;
Expand Down
Loading