diff --git a/package.json b/package.json index 567f456..f0de1e3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.20", + "version": "1.1.21", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", diff --git a/src/sinks/eventbridge.js b/src/sinks/eventbridge.js index ccd2ad1..ed23a52 100644 --- a/src/sinks/eventbridge.js +++ b/src/sinks/eventbridge.js @@ -28,6 +28,10 @@ export const publishToEventBridge = ({ // eslint-disable-line import/prefer-defa step = 'publish', ...opt } = {}) => { + // EB Batchsize can not exceed 10 - since BATCH_SIZE is used in many places, we should reduce it to 10 here. + if (batchSize > 10) { + batchSize = 10; + } if (endpointId) import('@aws-sdk/signature-v4-crt'); const connector = new Connector({ diff --git a/test/unit/sinks/eventbridge.test.js b/test/unit/sinks/eventbridge.test.js index b826ded..5231e68 100644 --- a/test/unit/sinks/eventbridge.test.js +++ b/test/unit/sinks/eventbridge.test.js @@ -2,12 +2,17 @@ import 'mocha'; import { expect } from 'chai'; import sinon from 'sinon'; import _ from 'highland'; - +import { v4 } from 'uuid'; import { publishToEventBridge as publish } from '../../../src/sinks/eventbridge'; import Connector from '../../../src/connectors/eventbridge'; describe('sinks/eventbridge.js', () => { + const origBatchSize = process.env.BATCH_SIZE; + afterEach(() => { + // set back to state specified in package.json + process.env.BATCH_SIZE = origBatchSize; + }); afterEach(sinon.restore); it('should batch and publish', (done) => { @@ -89,6 +94,36 @@ describe('sinks/eventbridge.js', () => { .done(done); }); + it('should batch and publish, multiple', (done) => { + process.env.BATCH_SIZE = 100; + sinon.stub(Connector.prototype, 'putEvents').resolves({ FailedEntryCount: 0 }); + + const uows = []; + for (let i = 1; i <= 15; i += 1) { + const id = v4(); + uows.push({ + event: { + id, + type: `p${i}`, + partitionKey: id, + }, + }); + } + + _(uows) + .through(publish({ busName: 'b1', debug: (msg, v) => console.log(msg, v), metricsEnabled: true })) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(collected.length).to.equal(15); + collected.forEach((c) => { + expect(c.publishRequest.Entries.length < 11).to.be.true; + }); + }) + .done(done); + }); + it('should not publish', (done) => { const uows = [{ }];