Skip to content

Commit 9245031

Browse files
feat(pubsub)!: set default max ack extension period to 60 minutes (#3501)
* feat(pubsub)!: set max ack extension period to 60 minutes * update default max global threads to 5 * update tests * update tests ii * use defaults from client library this removes the global executor configuration since it relied on a hardcoded default value * deformat * reset executor thread changes * reset ITs * revert reset on unwanted code sections * fix units * update docs
1 parent d7c53e1 commit 9245031

File tree

8 files changed

+32
-16
lines changed

8 files changed

+32
-16
lines changed

docs/src/main/asciidoc/_configprops.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
|spring.cloud.gcp.pubsub.subscriber.flow-control.limit-exceeded-behavior | | The behavior when the specified limits are exceeded.
7474
|spring.cloud.gcp.pubsub.subscriber.flow-control.max-outstanding-element-count | | Maximum number of outstanding elements to keep in memory before enforcing flow control.
7575
|spring.cloud.gcp.pubsub.subscriber.flow-control.max-outstanding-request-bytes | | Maximum number of outstanding bytes to keep in memory before enforcing flow control.
76-
|spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period | 0 | The optional max ack extension period in seconds for the subscriber factory.
76+
|spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period | Library default (60 minutes) | The optional max ack extension period in seconds for the subscriber factory.
7777
|spring.cloud.gcp.pubsub.subscriber.max-acknowledgement-threads | 4 | Number of threads used for batch acknowledgement.
7878
|spring.cloud.gcp.pubsub.subscriber.parallel-pull-count | | The optional parallel pull count setting for the subscriber factory.
7979
|spring.cloud.gcp.pubsub.subscriber.pull-endpoint | | The optional pull endpoint setting for the subscriber factory.

docs/src/main/asciidoc/pubsub.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ However, if a per-subscription configuration is not set then the global or defau
6666
|===
6767
| Name | Description | Required | Default value
6868
| `spring.cloud.gcp.pubsub.subscriber.parallel-pull-count` | The number of pull workers | No | 1
69-
| `spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds | No | 0
69+
| `spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds | No | Library default (60 minutes)
7070
| `spring.cloud.gcp.pubsub.subscriber.min-duration-per-ack-extension` | The lower bound for a single mod ack extension period, in seconds | No | 0
7171
| `spring.cloud.gcp.pubsub.subscriber.max-duration-per-ack-extension` | The upper bound for a single mod ack extension period, in seconds | No | 0
7272
| `spring.cloud.gcp.pubsub.subscriber.pull-endpoint` | The endpoint for pulling messages | No | pubsub.googleapis.com:443
@@ -111,7 +111,7 @@ When true, replicates the default behavior before Spring 6.1.x. | No | false
111111
| Name | Description | Required | Default value
112112
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].fully-qualified-name` | The fully-qualified subscription name in the `projects/[PROJECT]/subscriptions/[SUBSCRIPTION]` format. When this property is present, the `[subscription-name]` key does not have to match any actual resources; it's used only for logical grouping. | No | 1
113113
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].parallel-pull-count` | The number of pull workers. | No | 1
114-
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds. | No | 0
114+
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].max-ack-extension-period` | The maximum period a message ack deadline will be extended, in seconds. | No | Library default (60 minutes)
115115
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].min-duration-per-ack-extension` | The lower bound for a single mod ack extension period, in seconds | No | 0
116116
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].max-duration-per-ack-extension` | The upper bound for a single mod ack extension period, in seconds | No | 0
117117
| `spring.cloud.gcp.pubsub.subscription.[subscription-name].pull-endpoint` | The endpoint for pulling messages. | No | pubsub.googleapis.com:443

docs/src/main/asciidoc/spring-integration-pubsub.adoc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ When working with Cloud Pub/Sub, it is important to understand the concept of `a
8282
Each subscription has a default `ackDeadline` applied to all messages sent to it.
8383
Additionally, the Cloud Pub/Sub client library can extend each streamed message's `ackDeadline` until the message processing completes, fails or until the maximum extension period elapses.
8484

85-
NOTE: In the Pub/Sub client library, default maximum extension period is an hour. However, Spring Framework on Google Cloud disables this auto-extension behavior.
86-
Use the `spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period` property to re-enable it.
85+
NOTE: In the Pub/Sub client library, default maximum extension period is an hour.
86+
The Spring integration delegates the default value resolution to the underlying client library.
87+
If you wish to use a different value, use the `spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period` property.
8788

8889
Acknowledging (acking) a message removes it from Pub/Sub's known outstanding messages. Nacking a message resets its acknowledgement deadline to 0, forcing immediate redelivery.
8990
This could be useful in a load balanced architecture, where one of the subscribers is having issues but others are available to process messages.

spring-cloud-gcp-autoconfigure/src/test/java/com/google/cloud/spring/autoconfigure/pubsub/GcpPubSubAutoConfigurationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ void pullConfig_defaultConfigurationSet() {
507507
assertThat(
508508
gcpPubSubProperties.computeMaxAckExtensionPeriod(
509509
"subscription-name", projectIdProvider.getProjectId()))
510-
.isZero();
510+
.isNull();
511511
assertThat(
512512
gcpPubSubProperties.computeMinDurationPerAckExtension(
513513
"subscription-name", projectIdProvider.getProjectId()))

spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/core/PubSubConfiguration.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ public class PubSubConfiguration {
3737
/** Default number of executor threads. */
3838
public static final int DEFAULT_EXECUTOR_THREADS = 4;
3939

40-
private static final Long DEFAULT_MAX_ACK_EXTENSION_PERIOD = 0L;
41-
4240
/**
4341
* Automatically extracted user-provided properties. Contains only short subscription keys
4442
* user-provided properties, therefore do not use except in initialize().
@@ -225,10 +223,7 @@ public Long computeMaxAckExtensionPeriod(String subscriptionName, String project
225223
if (maxAckExtensionPeriod != null) {
226224
return maxAckExtensionPeriod;
227225
}
228-
Long globalMaxAckExtensionPeriod = this.globalSubscriber.getMaxAckExtensionPeriod();
229-
return globalMaxAckExtensionPeriod != null
230-
? globalMaxAckExtensionPeriod
231-
: DEFAULT_MAX_ACK_EXTENSION_PERIOD;
226+
return this.globalSubscriber.getMaxAckExtensionPeriod();
232227
}
233228

234229
/**

spring-cloud-gcp-pubsub/src/main/java/com/google/cloud/spring/pubsub/support/DefaultSubscriberFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -499,8 +499,11 @@ Duration getMaxAckExtensionPeriod(String subscriptionName) {
499499
if (this.maxAckExtensionPeriod != null) {
500500
return this.maxAckExtensionPeriod;
501501
}
502-
return Duration.ofSeconds(
503-
this.pubSubConfiguration.computeMaxAckExtensionPeriod(subscriptionName, projectId));
502+
Long maxAckExtensionPeriod = this.pubSubConfiguration.computeMaxAckExtensionPeriod(subscriptionName, projectId);
503+
if (maxAckExtensionPeriod != null) {
504+
return Duration.ofSeconds(maxAckExtensionPeriod);
505+
}
506+
return null;
504507
}
505508

506509
@Nullable

spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/core/PubSubConfigurationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ void testComputeMaxAckExtensionPeriod_returnDefault() {
265265
Long result =
266266
pubSubConfiguration.computeMaxAckExtensionPeriod("subscription-name", "projectId");
267267

268-
assertThat(result).isZero();
268+
assertThat(result).isNull();
269269
}
270270

271271
@Test

spring-cloud-gcp-pubsub/src/test/java/com/google/cloud/spring/pubsub/support/DefaultSubscriberFactoryTests.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,23 @@ void testNewSubscriber() {
9393
.isEqualTo("projects/angeldust/subscriptions/midnight cowboy");
9494
}
9595

96+
@Test
97+
void testNewSubscriber_noMaxAckExtensionPeriodSet_usesClientDefault()
98+
throws NoSuchFieldException, IllegalAccessException {
99+
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(() -> "sealion", pubSubConfig);
100+
String subscriptionName = "deadbeef";
101+
102+
// By default, we build a subscriber with null max ack extension period
103+
Duration maxAckExtensionPeriodArgument = factory.getMaxAckExtensionPeriod(subscriptionName);
104+
assertThat(maxAckExtensionPeriodArgument).isNull();
105+
106+
// Now we check that the default in google-cloud-pubsub was used
107+
Subscriber subscriber = factory.createSubscriber(subscriptionName, (message, consumer) -> {});
108+
java.time.Duration effectiveMaxAckExtensionPeriod = (java.time.Duration) FieldUtils.readField(subscriber, "maxAckExtensionPeriod", true);
109+
java.time.Duration clientDefaultMaxAckExtensionPeriod = (java.time.Duration) FieldUtils.readField(subscriber, "DEFAULT_MAX_ACK_EXTENSION_PERIOD", true);
110+
assertThat(effectiveMaxAckExtensionPeriod).isEqualTo(clientDefaultMaxAckExtensionPeriod);
111+
}
112+
96113
@Test
97114
void testNewSubscriber_constructorWithPubSubConfiguration() {
98115
GcpProjectIdProvider projectIdProvider = () -> "angeldust";
@@ -534,7 +551,7 @@ void testGetMaxAckExtensionPeriod_newConfiguration() {
534551
new DefaultSubscriberFactory(projectIdProvider, this.pubSubConfig);
535552

536553
assertThat(factory.getMaxAckExtensionPeriod("subscription-name"))
537-
.isEqualTo(Duration.ofSeconds(0L));
554+
.isNull();
538555
}
539556

540557
@Test

0 commit comments

Comments
 (0)