Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ensure-labels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Check one of required labels are set
uses: docker://agilepathway/label-checker:v1.6.65
uses: docker://agilepathway/pull-request-label-checker:v1.6.65
with:
one_of: major,minor,patch,skip-release
repo_token: ${{ secrets.GITHUB_TOKEN }}
5 changes: 4 additions & 1 deletion packages/gcp-pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,14 @@ import { deletePubSub } from '@message-queue-toolkit/gcp-pubsub'

**Deletion Behavior:**
- Only deletes if both `deleteIfExists: true` and `creationConfig` are provided
- Deletes subscription first, then topic (proper order)
- **Consumers only delete subscriptions** (not topics) - topics may be shared with other consumers
- **Publishers delete both topic and subscription** (when applicable)
- Throws error if trying to delete in production without `forceDeleteInProduction: true`
- `waitForConfirmation: true`: Polls to confirm deletion completed (recommended)
- `waitForConfirmation: false`: Returns immediately after deletion request

**Note:** In Pub/Sub, topics can have multiple subscriptions (1:N relationship). When `deleteIfExists` is used on a consumer, only the subscription is deleted to avoid breaking other consumers sharing the same topic.

**Production Safety:**

The library checks `process.env.NODE_ENV` to determine if running in production:
Expand Down
5 changes: 4 additions & 1 deletion packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ export abstract class AbstractPubSubConsumer<

public override async init(): Promise<void> {
if (this.deletionConfig && this.creationConfig) {
await deletePubSub(this.pubSubClient, this.deletionConfig, this.creationConfig)
// Only delete subscription, not the topic (topic may be shared with other consumers)
await deletePubSub(this.pubSubClient, this.deletionConfig, this.creationConfig, {
deleteSubscriptionOnly: true,
})
}

const initResult = await initPubSub(
Expand Down
19 changes: 18 additions & 1 deletion packages/gcp-pubsub/lib/utils/pubSubInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,31 @@ export async function initPubSub(
}
}

export type PubSubDeletionOptions = {
/**
* When true, only deletes the subscription without deleting the topic.
* Use this for consumers to avoid deleting shared topics.
* Default: false (deletes both subscription and topic)
*/
deleteSubscriptionOnly?: boolean
}

/**
* Deletes Pub/Sub resources (topics and subscriptions).
*
* Deletion behavior:
* - Only deletes if deletionConfig.deleteIfExists is true and creationConfig is provided
* - Checks forceDeleteInProduction flag to prevent accidental deletion in production environments
* - Deletes subscription first (if exists), then topic
* - If deleteSubscriptionOnly is true, only the subscription is deleted (safe for consumers)
* - If deleteSubscriptionOnly is false (default), deletes subscription first, then topic
* - If waitForConfirmation is true (default), polls to confirm resources are actually deleted
* using the core waitAndRetry utility (similar to SQS implementation)
*/
export async function deletePubSub(
pubSubClient: PubSub,
deletionConfig: DeletionConfig,
creationConfig?: PubSubCreationConfig,
options?: PubSubDeletionOptions,
): Promise<void> {
if (!deletionConfig.deleteIfExists || !creationConfig) {
return
Expand All @@ -214,6 +225,7 @@ export async function deletePubSub(
}

const shouldWaitForConfirmation = deletionConfig.waitForConfirmation !== false
const deleteSubscriptionOnly = options?.deleteSubscriptionOnly ?? false

// Delete subscription first (if it exists)
if (creationConfig.subscription) {
Expand Down Expand Up @@ -242,6 +254,11 @@ export async function deletePubSub(
}
}

// Skip topic deletion if deleteSubscriptionOnly is true (consumer context)
if (deleteSubscriptionOnly) {
return
}

// Delete topic
const topicName = creationConfig.topic.name
const topic = pubSubClient.topic(topicName)
Expand Down
66 changes: 66 additions & 0 deletions packages/gcp-pubsub/test/utils/pubSubInitter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,72 @@ describe('pubSubInitter', () => {
expect(topicExists).toBe(false)
expect(subscriptionExists).toBe(false)
})

it('only deletes subscription when deleteSubscriptionOnly is true', async () => {
process.env.NODE_ENV = 'development'
reloadConfig() // Reload config to pick up env change

const deletionConfig: DeletionConfig = {
deleteIfExists: true,
}

await deletePubSub(
pubSubClient,
deletionConfig,
{
topic: {
name: topicName,
},
subscription: {
name: subscriptionName,
},
},
{ deleteSubscriptionOnly: true },
)

// Verify subscription is deleted but topic still exists
const topic = pubSubClient.topic(topicName)
const subscription = pubSubClient.subscription(subscriptionName)

const [topicExists] = await topic.exists()
const [subscriptionExists] = await subscription.exists()

expect(topicExists).toBe(true) // Topic should still exist
expect(subscriptionExists).toBe(false) // Subscription should be deleted
})

it('deletes both topic and subscription when deleteSubscriptionOnly is false (default)', async () => {
process.env.NODE_ENV = 'development'
reloadConfig() // Reload config to pick up env change

const deletionConfig: DeletionConfig = {
deleteIfExists: true,
}

await deletePubSub(
pubSubClient,
deletionConfig,
{
topic: {
name: topicName,
},
subscription: {
name: subscriptionName,
},
},
{ deleteSubscriptionOnly: false },
)

// Verify both resources are deleted
const topic = pubSubClient.topic(topicName)
const subscription = pubSubClient.subscription(subscriptionName)

const [topicExists] = await topic.exists()
const [subscriptionExists] = await subscription.exists()

expect(topicExists).toBe(false)
expect(subscriptionExists).toBe(false)
})
})

describe('config validation', () => {
Expand Down
Loading