Skip to content

Commit dc9e588

Browse files
committed
Implement support for Publish Many for GCPS Publisher
1 parent d02d032 commit dc9e588

File tree

4 files changed

+56
-40
lines changed

4 files changed

+56
-40
lines changed

src/queue/drivers/gcps/GCPSClient.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,19 @@ export class GCPSClient {
6161
})
6262
}
6363

64-
public async publish(topic: string, message: GCPSMessage): Promise<void> {
64+
public async publish(
65+
topic: string,
66+
messages: Array<GCPSMessage>
67+
): Promise<void> {
6568
await Axios.post(
6669
`https://pubsub.googleapis.com/v1/${topic}:publish`,
6770
{
68-
messages: [
69-
{
70-
attributes: message.attributes,
71-
data: Buffer.from(
72-
JSON.stringify(message.data)
73-
).toString("base64"),
74-
},
75-
],
71+
messages: messages.map((m) => ({
72+
attributes: m.attributes,
73+
data: Buffer.from(JSON.stringify(m.data)).toString(
74+
"base64"
75+
),
76+
})),
7677
},
7778
{
7879
headers: {

src/queue/drivers/gcps/GCPSConsumer.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { QueueHandler } from "../../abstract/QueueHandler"
33
import { SerializedTask } from "../../abstract/SerializedTask"
44
import { TransientError } from "../../../errors/TransientError"
55
import { Container } from "inversify"
6+
import { isEmpty } from "lodash"
67
import { Logger } from "../../../logging"
78
import { Process } from "../../../runtime"
89
import { ConstructorOf } from "../../../utils/types"
@@ -45,7 +46,7 @@ export class GCPSConsumer implements Process {
4546
this.subscriptionName
4647
)
4748

48-
if (subscription.pushConfig !== {}) {
49+
if (!isEmpty(subscription.pushConfig)) {
4950
throw new Error(
5051
"The Strontium GCPS Consumer does not support Push based GCPS subscriptions. " +
5152
"Please change the subscription inside Google Cloud Platform to operate on a Pull Based model if you wish " +
@@ -92,16 +93,17 @@ export class GCPSConsumer implements Process {
9293
)
9394

9495
await Promise.all(
95-
messages.map(async (m) =>
96-
this.executeTask(
97-
m.ackId,
98-
{
99-
eventName:
100-
m.message.attributes.STRONTIUM_EVENT_NAME,
101-
message: m.message.data,
102-
},
103-
container
104-
)
96+
messages.map(
97+
async (m) =>
98+
await this.executeTask(
99+
m.ackId,
100+
{
101+
eventName:
102+
m.message.attributes.STRONTIUM_EVENT_NAME,
103+
message: m.message.data,
104+
},
105+
container
106+
)
105107
)
106108
)
107109
}

src/queue/drivers/gcps/GCPSPublisher.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,21 @@ export class GCPSPublisher extends QueuePublisher implements Process {
2424
public async publish<Q extends {}>(
2525
queueName: string,
2626
eventName: string,
27-
message: Q
27+
messages: Q | Array<Q>
2828
): Promise<void> {
29-
return this.client.publish(queueName, {
30-
attributes: {
31-
STRONTIUM_EVENT_NAME: eventName,
32-
},
33-
data: message,
34-
})
29+
if (!Array.isArray(messages)) {
30+
messages = [messages]
31+
}
32+
33+
return this.client.publish(
34+
queueName,
35+
messages.map((m) => ({
36+
attributes: {
37+
STRONTIUM_EVENT_NAME: eventName,
38+
},
39+
data: m,
40+
}))
41+
)
3542
}
3643

3744
public async shutdown(container: Container): Promise<void> {

tests/queue/drivers/GCPSClient.spec.ts

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ describe("GCPSClient", () => {
1010

1111
before(function() {
1212
// If the test suite is not running in CI then skip this suite - it's slow and requires credentials
13-
if (process.env.CI !== "true") {
13+
if (process.env.CI !== "true" || isNaN(Number(process.env.TRAVIS_PULL_REQUEST)) ) {
1414
this.skip()
1515
}
1616
})
@@ -36,10 +36,12 @@ describe("GCPSClient", () => {
3636
it("Should publish a message and correctly reconstruct it", async () => {
3737
await client.publish(
3838
"projects/strontium-tests/topics/integrationTestTopic",
39-
{
40-
data: "MY-INTEGRATION-TEST",
41-
attributes: {},
42-
}
39+
[
40+
{
41+
data: "MY-INTEGRATION-TEST",
42+
attributes: {},
43+
},
44+
]
4345
)
4446

4547
let messages = await client.pullTasks(
@@ -59,10 +61,12 @@ describe("GCPSClient", () => {
5961
it("Acking a message should remove it from the queue", async () => {
6062
await client.publish(
6163
"projects/strontium-tests/topics/integrationTestTopic",
62-
{
63-
data: "MY-INTEGRATION-TEST",
64-
attributes: {},
65-
}
64+
[
65+
{
66+
data: "MY-INTEGRATION-TEST",
67+
attributes: {},
68+
},
69+
]
6670
)
6771

6872
let messages = await client.pullTasks(
@@ -92,10 +96,12 @@ describe("GCPSClient", () => {
9296
it("Nacking a message should readd it to the queue", async () => {
9397
await client.publish(
9498
"projects/strontium-tests/topics/integrationTestTopic",
95-
{
96-
data: "NACKED-MESSAGE",
97-
attributes: {},
98-
}
99+
[
100+
{
101+
data: "NACKED-MESSAGE",
102+
attributes: {},
103+
},
104+
]
99105
)
100106

101107
let messages = await client.pullTasks(

0 commit comments

Comments
 (0)