|
1 | | -import _ from 'highland'; |
2 | 1 | import { |
3 | 2 | printStartPipeline, printEndPipeline, |
4 | 3 | faulty, faultyAsyncStream, faultify, |
5 | 4 | splitObject, encryptEvent, |
| 5 | + compact, |
6 | 6 | } from '../utils'; |
7 | 7 | import { |
8 | 8 | scanSplitDynamoDB, querySplitDynamoDB, queryAllDynamoDB, batchGetDynamoDB, |
@@ -132,35 +132,36 @@ export const toCursorUpdateRequest = (rule) => faulty((uow) => ({ |
132 | 132 | })); |
133 | 133 |
|
134 | 134 | export const flushCursor = (rule) => (s) => { |
135 | | - let lastUow; |
136 | | - |
137 | | - const cursorStream = () => _([lastUow]) |
138 | | - .map(toCursorUpdateRequest(rule)) |
139 | | - .through(updateDynamoDB({ |
140 | | - ...rule, |
141 | | - updateRequestField: 'cursorUpdateRequest', |
142 | | - updateResponseField: 'cursorUpdateResponse', |
143 | | - })); |
| 135 | + const { |
| 136 | + // By default group on a stringified version of the full key. If the key structure |
| 137 | + // differs in a users particular implementation or they want to group by something |
| 138 | + // else they can simply override this fn in their rule. |
| 139 | + cursorKeyFn = (uow) => `pk:${uow.event.raw.new.pk}|sk:${uow.event.raw.new.sk}`, |
| 140 | + } = rule; |
144 | 141 |
|
145 | 142 | /* istanbul ignore else */ |
146 | 143 | if (rule.toCursorUpdateRequest) { |
147 | 144 | return s |
148 | | - .consume((err, x, push, next) => { |
149 | | - /* istanbul ignore if */ |
150 | | - if (err) { |
151 | | - push(err); |
152 | | - next(); |
153 | | - } else if (x === _.nil) { |
154 | | - if (lastUow) { |
155 | | - next(cursorStream()); |
156 | | - } else { |
157 | | - push(null, x); |
158 | | - } |
159 | | - } else { |
160 | | - lastUow = x; |
161 | | - push(null, x); |
162 | | - next(); |
163 | | - } |
| 145 | + .through(compact({ |
| 146 | + ...rule, |
| 147 | + compact: { |
| 148 | + group: (uow) => cursorKeyFn(uow), |
| 149 | + }, |
| 150 | + })) |
| 151 | + .map(toCursorUpdateRequest(rule)) |
| 152 | + .through(updateDynamoDB({ |
| 153 | + ...rule, |
| 154 | + updateRequestField: 'cursorUpdateRequest', |
| 155 | + updateResponseField: 'cursorUpdateResponse', |
| 156 | + })) |
| 157 | + // Maintains backwards compatibility with how this used to manipulate the UOWs, |
| 158 | + // duping the last uow. |
| 159 | + .flatMap((uow) => { |
| 160 | + const { batch, ...lastUow } = uow; |
| 161 | + return [ |
| 162 | + ...batch, |
| 163 | + lastUow, |
| 164 | + ]; |
164 | 165 | }); |
165 | 166 | } else { |
166 | 167 | return s; |
|
0 commit comments