Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions spec/v2/providers/pubsub.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { expect } from "chai";
import { EventContext } from "../../../src/v1/cloud-functions";

import { CloudEvent } from "../../../src/v2/core";
import * as options from "../../../src/v2/options";
Expand Down Expand Up @@ -170,6 +171,51 @@
expect(json).to.deep.equal({ hello: "world" });
});

it("should construct a CloudEvent with the correct context and message", async () => {
const publishTime = new Date().toISOString();
const messagePayload = {
messageId: "uuid",
data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"),
publishTime,
};
const data: pubsub.MessagePublishedData = {
message: messagePayload as any,
subscription: "projects/aProject/subscriptions/aSubscription",
};
const event: CloudEvent<pubsub.MessagePublishedData> = {
specversion: "1.0",
id: "uuid",
time: publishTime,
type: "google.cloud.pubsub.topic.v1.messagePublished",
source: "//pubsub.googleapis.com/projects/aProject/topics/topic",
data,
};

type PubSubCloudEvent = CloudEvent<pubsub.MessagePublishedData> & {
message: pubsub.Message<{ hello: "world" }>;
context: EventContext;
};

let destructuredMessage: pubsub.Message<{ hello: "world" }>;
let context: EventContext;
const func = pubsub.onMessagePublished("topic", (e) => {
const pubsubEvent = e as PubSubCloudEvent;
({ message: destructuredMessage, context } = pubsubEvent);
});

await func(event);

expect(destructuredMessage.json).to.deep.equal({ hello: "world" });
expect(context).to.exist;
expect(context.eventId).to.equal("uuid");
expect(context.timestamp).to.equal(publishTime);
expect(context.eventType).to.equal("google.cloud.pubsub.topic.v1.messagePublished");
expect(context.resource).to.deep.equal({
service: "pubsub.googleapis.com",
name: "projects/aProject/topics/topic",
});
});

// These tests pass if the transpiler works
it("allows desirable syntax", () => {
pubsub.onMessagePublished<string>(
Expand All @@ -193,4 +239,40 @@
(event: CloudEvent<pubsub.MessagePublishedData>) => undefined
);
});


Check failure on line 243 in spec/v2/providers/pubsub.spec.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Delete `⏎⏎⏎⏎⏎`




it("should use 'unknown-project' as fallback for resource name", async () => {
delete process.env.GCLOUD_PROJECT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we should keep this. In case we receive a malformed event where the EventID is not parsed, this would ensure that the function does not crash. Also considering GCLOUD_PROJECT is an environmental variable, there could be a case when its not populated (?). What do you think?

const publishTime = new Date().toISOString();
const message = {
messageId: "uuid",
data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"),
publishTime,
};
const data: pubsub.MessagePublishedData = {
message: message as any,
subscription: "projects/aProject/subscriptions/aSubscription",
};
const event: CloudEvent<pubsub.MessagePublishedData> = {
specversion: "1.0",
id: "uuid",
time: publishTime,
type: "google.cloud.pubsub.topic.v1.messagePublished",
source: "//pubsub.googleapis.com/topics/topic", // Malformed source
data,
};

let receivedEvent: CloudEvent<pubsub.MessagePublishedData<any>>;
const func = pubsub.onMessagePublished("topic", (e) => {
receivedEvent = e;
});

await func(event);

expect(receivedEvent.context.resource.name).to.equal("projects/unknown-project/topics/topic");
});
});
8 changes: 8 additions & 0 deletions src/v2/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* @packageDocumentation
*/

import type { EventContext } from "../v1/cloud-functions";
import { Change } from "../common/change";
import { ManifestEndpoint } from "../runtime/manifest";

Expand Down Expand Up @@ -91,6 +92,13 @@

/** Information about this specific event. */
data: T;

/** V1- compatible context of this event.
*

Check failure on line 97 in src/v2/core.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Delete `·`
* This getter is added at runtime for V1 compatibility.
* May be undefined it not set by a provider
*/
readonly context?: EventContext;
}

/**
Expand Down
64 changes: 63 additions & 1 deletion src/v2/providers/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import * as options from "../options";
import { SecretParam } from "../../params/types";
import { withInit } from "../../common/onInit";
import type { EventContext, Resource } from "../../v1/cloud-functions";

/**
* Google Cloud Pub/Sub is a globally distributed message bus that automatically scales as you need it.
Expand Down Expand Up @@ -304,7 +305,9 @@
subscription: string;
};
messagePublishedData.message = new Message(messagePublishedData.message);
return wrapTraceContext(withInit(handler))(raw as CloudEvent<MessagePublishedData<T>>);
const event = raw as CloudEvent<MessagePublishedData<T>>;
addV1Compatibility(event, topic);
return wrapTraceContext(withInit(handler))(event);
};

func.run = handler;
Expand Down Expand Up @@ -353,3 +356,62 @@

return func;
}

/**
* Adds v1-style `context` and `message` properties to the event.
*
* @param event - The event to add the context to.
* @param topic - The topic the event is for.
*/
function addV1Compatibility<T>(event: CloudEvent<MessagePublishedData<T>>, topic: string) {
if ("context" in event && event.context) {
throw new Error("Unexpected context in event.");
}

const resourceName = getResourceName(event, topic);
const resource: Resource = {

Check failure on line 373 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Delete `⏎`
service: "pubsub.googleapis.com",
name: resourceName,

Check failure on line 375 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Delete `⏎`

};

const context: EventContext = {
eventId: event.id,
timestamp: event.time,
resource,
eventType: "google.cloud.pubsub.topic.v1.messagePublished",
params: {}

Check failure on line 384 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Replace `⏎` with `,`

};

Object.defineProperty(event, "context", {

Check failure on line 389 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Replace `⏎····get:·()·=>·context,⏎` with `····get:·()·=>·context,`
get: () => context,

});

Object.defineProperty(event, "message", {
get: () => (event.data as MessagePublishedData<T>).message,

Check failure on line 395 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

This assertion is unnecessary since it does not change the type of the expression
});
}

/**
* Extracts the resource name from the event source.
*
* @param event - The event to extract the resource name from.
* @param topic - The topic the event is for.
* @returns The resource name.
*/
function getResourceName(event: CloudEvent<MessagePublishedData<any>>, topic: string) {
const match = event.source?.match(/projects\/([^/]+)\/topics\/([^/]+)/);
const project = match?.[1];
const topicName = match?.[2] ?? topic;

if (!project) {
return `projects/unknown-project/topics/${topicName}`;
}

return `projects/${project}/topics/${topicName}`;

Check failure on line 416 in src/v2/providers/pubsub.ts

View workflow job for this annotation

GitHub Actions / lint (22.x)

Replace `⏎}` with `}⏎`
}
Loading