Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ Topic ensureTopicExists(String topicName, boolean autoCreate) {
}

Subscription ensureSubscriptionExists(
String subscriptionName,
String topicName,
PubSubConsumerProperties properties) {
String subscriptionName, String topicName, PubSubConsumerProperties properties) {
Subscription subscription = this.pubSubAdmin.getSubscription(subscriptionName);
if (subscription == null) {
return createSubscription(subscriptionName, topicName, properties);
Expand All @@ -153,19 +151,45 @@ Subscription ensureSubscriptionExists(
}

private Subscription createSubscription(
String subscriptionName,
String topicName,
PubSubConsumerProperties properties) {
String subscriptionName, String topicName, PubSubConsumerProperties properties) {
Subscription.Builder builder =
Subscription.newBuilder().setName(subscriptionName).setTopic(topicName);

PubSubConsumerProperties.ExpirationPolicy expirationPolicy = properties.getExpirationPolicy();

PubSubConsumerProperties.DeadLetterPolicy deadLetterPolicy = properties.getDeadLetterPolicy();
if (deadLetterPolicy != null) {
String dlTopicName = deadLetterPolicy.getDeadLetterTopic();
Assert.hasText(dlTopicName, "Dead letter policy cannot have null or empty topic");

Topic dlTopic = ensureTopicExists(dlTopicName, properties.isAutoCreateResources());

String dltSubscriptionName = subscriptionName + ".DLT";
Subscription dltSubscription = this.pubSubAdmin.getSubscription(dltSubscriptionName);

if (dltSubscription == null) {
try {
Subscription.Builder dltSubBuilder =
Subscription.newBuilder().setName(dltSubscriptionName).setTopic(dlTopic.getName());

ExpirationPolicy builtExpirationPolicy = buildExpirationPolicy(expirationPolicy);
if (builtExpirationPolicy != null) {
dltSubBuilder.setExpirationPolicy(builtExpirationPolicy);
}

this.pubSubAdmin.createSubscription(dltSubBuilder);
LOGGER.info(
"Created DLT subscription '"
+ dltSubscriptionName
+ "' for dead-letter topic '"
+ dlTopic.getName()
+ "'");
} catch (AlreadyExistsException ex) {
// Safe race condition handling
LOGGER.debug("DLT subscription '" + dltSubscriptionName + "' already exists.");
}
}

DeadLetterPolicy.Builder dlpBuilder =
DeadLetterPolicy.newBuilder().setDeadLetterTopic(dlTopic.getName());

Expand All @@ -174,21 +198,35 @@ private Subscription createSubscription(
dlpBuilder.setMaxDeliveryAttempts(maxAttempts);
}
builder.setDeadLetterPolicy(dlpBuilder);
}

PubSubConsumerProperties.ExpirationPolicy expirationPolicy = properties.getExpirationPolicy();
if (expirationPolicy != null) {
ExpirationPolicy.Builder epBuilder = ExpirationPolicy.newBuilder();
LOGGER.warn(
"Dead Letter Topic is configured, but IAM roles are NOT validated or auto-provisioned. "
+ "Ensure the Pub/Sub service agent has roles/pubsub.publisher on '"
+ dlTopic.getName()
+ "' and consumers have roles/pubsub.subscriber on '"
+ dltSubscriptionName
+ "'. Missing IAM will cause runtime failures or endless retries.");
}

if (expirationPolicy.getTtl() != null) {
long desiredSeconds = expirationPolicy.getTtl().getSeconds();
epBuilder.setTtl(
com.google.protobuf.Duration.newBuilder().setSeconds(desiredSeconds).build());
}
ExpirationPolicy builtExpirationPolicy = buildExpirationPolicy(expirationPolicy);
if (builtExpirationPolicy != null) {
builder.setExpirationPolicy(builtExpirationPolicy);
}
return this.pubSubAdmin.createSubscription(builder);
}

builder.setExpirationPolicy(epBuilder);
private ExpirationPolicy buildExpirationPolicy(
PubSubConsumerProperties.ExpirationPolicy expirationPolicy) {
if (expirationPolicy == null) {
return null;
}
ExpirationPolicy.Builder epBuilder = ExpirationPolicy.newBuilder();

return this.pubSubAdmin.createSubscription(builder);
if (expirationPolicy.getTtl() != null) {
long desiredSeconds = expirationPolicy.getTtl().getSeconds();
epBuilder.setTtl(
com.google.protobuf.Duration.newBuilder().setSeconds(desiredSeconds).build());
}
return epBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -35,6 +37,7 @@
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import java.util.List;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -188,6 +191,89 @@ void testProvisionConsumerDestination_deadLetterQueue() {
this.pubSubChannelProvisioner.provisionConsumerDestination(
"topic_A", "group_A", this.extendedConsumerProperties);

ArgumentCaptor<Subscription.Builder> argCaptor =
ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock, times(2)).createSubscription(argCaptor.capture());
List<Subscription.Builder> captured = argCaptor.getAllValues();
Subscription.Builder sb1 = captured.get(0);
assertThat(sb1.getName()).isEqualTo("topic_A.group_A.DLT");
assertThat(sb1.getTopic()).isEqualTo("projects/test-project/topics/deadLetterTopic");
assertThat(sb1.getDeadLetterPolicy()).isNotNull();
DeadLetterPolicy policy1 = sb1.getDeadLetterPolicy();
assertThat(policy1.getDeadLetterTopic()).isBlank();
assertThat(policy1.getMaxDeliveryAttempts()).isEqualTo(0);

Subscription.Builder sb2 = captured.get(1);
assertThat(sb2.getName()).isEqualTo("topic_A.group_A");
assertThat(sb2.getTopic()).isEqualTo("topic_A");
assertThat(sb2.getDeadLetterPolicy()).isNotNull();
DeadLetterPolicy policy2 = sb2.getDeadLetterPolicy();
assertThat(policy2.getDeadLetterTopic())
.isEqualTo("projects/test-project/topics/deadLetterTopic");
assertThat(policy2.getMaxDeliveryAttempts()).isEqualTo(12);
}


@Test
void testProvisionConsumerDestination_subscriptionRaceCondition() {
PubSubConsumerProperties.DeadLetterPolicy dlp = new PubSubConsumerProperties.DeadLetterPolicy();
dlp.setDeadLetterTopic("deadLetterTopic");
when(this.pubSubConsumerProperties.getDeadLetterPolicy()).thenReturn(dlp);
when(this.pubSubConsumerProperties.getExpirationPolicy()).thenReturn(new PubSubConsumerProperties.ExpirationPolicy());

when(this.pubSubAdminMock.getTopic("deadLetterTopic")).thenReturn(null);
when(this.pubSubAdminMock.createTopic("deadLetterTopic"))
.thenReturn(
Topic.newBuilder().setName("projects/test-project/topics/deadLetterTopic").build());
when(this.pubSubAdminMock.getSubscription("topic_A.group_A.DLT")).thenReturn(null);

doThrow(AlreadyExistsException.class)
.when(pubSubAdminMock)
.createSubscription(argThat(sb -> sb.getName().equals("topic_A.group_A.DLT")));

this.pubSubChannelProvisioner.provisionConsumerDestination(
"topic_A", "group_A", this.extendedConsumerProperties);

ArgumentCaptor<Subscription.Builder> argCaptor =
ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock, times(2)).createSubscription(argCaptor.capture());
List<Subscription.Builder> captured = argCaptor.getAllValues();
Subscription.Builder sb1 = captured.get(0);
assertThat(sb1.getName()).isEqualTo("topic_A.group_A.DLT");
assertThat(sb1.getTopic()).isEqualTo("projects/test-project/topics/deadLetterTopic");
assertThat(sb1.getDeadLetterPolicy()).isNotNull();
DeadLetterPolicy policy1 = sb1.getDeadLetterPolicy();
assertThat(policy1.getDeadLetterTopic()).isBlank();
assertThat(policy1.getMaxDeliveryAttempts()).isEqualTo(0);

Subscription.Builder sb2 = captured.get(1);
assertThat(sb2.getName()).isEqualTo("topic_A.group_A");
assertThat(sb2.getTopic()).isEqualTo("topic_A");
assertThat(sb2.getDeadLetterPolicy()).isNotNull();
DeadLetterPolicy policy2 = sb2.getDeadLetterPolicy();
assertThat(policy2.getDeadLetterTopic())
.isEqualTo("projects/test-project/topics/deadLetterTopic");
assertThat(policy2.getMaxDeliveryAttempts()).isEqualTo(0);
}

@Test
void testProvisionConsumerDestination_deadLetterQueue_subscriptionExists() {
PubSubConsumerProperties.DeadLetterPolicy dlp = new PubSubConsumerProperties.DeadLetterPolicy();
dlp.setDeadLetterTopic("deadLetterTopic");
dlp.setMaxDeliveryAttempts(12);
when(this.pubSubConsumerProperties.getDeadLetterPolicy()).thenReturn(dlp);

when(this.pubSubAdminMock.getTopic("deadLetterTopic")).thenReturn(null);
when(this.pubSubAdminMock.createTopic("deadLetterTopic"))
.thenReturn(
Topic.newBuilder().setName("projects/test-project/topics/deadLetterTopic").build());
when(this.pubSubAdminMock.getSubscription("topic_A.group_A.DLT"))
.thenReturn(
Subscription.newBuilder().setName("topic_A.group_A.DLT").setTopic("topic_A").build());

this.pubSubChannelProvisioner.provisionConsumerDestination(
"topic_A", "group_A", this.extendedConsumerProperties);

ArgumentCaptor<Subscription.Builder> argCaptor =
ArgumentCaptor.forClass(Subscription.Builder.class);
verify(this.pubSubAdminMock).createSubscription(argCaptor.capture());
Expand Down Expand Up @@ -349,8 +435,7 @@ void testProvisionConsumerDestination_createSubscription() {
Subscription.newBuilder().setTopic("topic_A").setName("subscription_A").build());

Subscription subscription =
this.pubSubChannelProvisioner.ensureSubscriptionExists(
"subscription_A", "topic_A", null);
this.pubSubChannelProvisioner.ensureSubscriptionExists("subscription_A", "topic_A", null);

assertThat(subscription.getName()).isEqualTo("subscription_A");
assertThat(subscription.getTopic()).isEqualTo("topic_A");
Expand Down