Skip to content

Commit 3aabc74

Browse files
authored
feat: Publish idempotence (#1323)
Implements publish idempotence (default enabled), where the server will ensure that unique messages within a single publisher session are only stored once.
1 parent 4cadfba commit 3aabc74

20 files changed

+932
-239
lines changed

google-cloud-pubsublite/clirr-ignored-differences.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,27 @@
77
<className>com/google/cloud/pubsublite/AdminClient</className>
88
<method>*</method>
99
</difference>
10+
<difference>
11+
<differenceType>7013</differenceType>
12+
<className>com/google/cloud/pubsublite/cloudpubsub/PublisherSettings$Builder</className>
13+
<method>*</method>
14+
</difference>
15+
<difference>
16+
<differenceType>4001</differenceType>
17+
<className>com/google/cloud/pubsublite/internal/**</className>
18+
<to>**</to>
19+
</difference>
1020
<difference>
1121
<differenceType>7004</differenceType>
1222
<className>com/google/cloud/pubsublite/internal/**</className>
1323
<method>*</method>
1424
</difference>
25+
<difference>
26+
<differenceType>7005</differenceType>
27+
<className>com/google/cloud/pubsublite/internal/**</className>
28+
<method>*</method>
29+
<to>**</to>
30+
</difference>
1531
<difference>
1632
<differenceType>7013</differenceType>
1733
<className>com/google/cloud/pubsublite/internal/**</className>

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/MessageMetadata.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,15 @@ public abstract class MessageMetadata {
3333
/** The partition a message was published to. */
3434
public abstract Partition partition();
3535

36-
/** The offset a message was assigned. */
36+
/**
37+
* The offset a message was assigned.
38+
*
39+
* <p>If this MessageMetadata was returned for a publish result and publish idempotence was
40+
* enabled, the offset may be -1 when the message was identified as a duplicate of an already
41+
* successfully published message, but the server did not have sufficient information to return
42+
* the message's offset at publish time. Messages received by subscribers will always have the
43+
* correct offset.
44+
*/
3745
public abstract Offset offset();
3846

3947
/** Construct a MessageMetadata from a Partition and Offset. */

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@
4242
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
4343
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
4444
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
45+
import com.google.cloud.pubsublite.internal.wire.UuidBuilder;
4546
import com.google.cloud.pubsublite.v1.AdminServiceClient;
4647
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
4748
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
4849
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
4950
import com.google.common.annotations.VisibleForTesting;
51+
import com.google.protobuf.ByteString;
5052
import com.google.pubsub.v1.PubsubMessage;
5153
import java.util.Optional;
5254
import org.threeten.bp.Duration;
@@ -79,6 +81,12 @@ public abstract class PublisherSettings {
7981
/** Batching settings for this publisher to use. Apply per-partition. */
8082
abstract BatchingSettings batchingSettings();
8183

84+
/**
85+
* Whether to enable publish idempotence, where the server will ensure that unique messages within
86+
* a single publisher session are stored only once. Default true.
87+
*/
88+
abstract boolean enableIdempotence();
89+
8290
/** A provider for credentials. */
8391
abstract CredentialsProvider credentialsProvider();
8492

@@ -106,6 +114,7 @@ public static Builder newBuilder() {
106114
.setCredentialsProvider(
107115
PublisherServiceSettings.defaultCredentialsProviderBuilder().build())
108116
.setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
117+
.setEnableIdempotence(true)
109118
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder());
110119
}
111120

@@ -127,6 +136,12 @@ public abstract Builder setMessageTransformer(
127136
/** Batching settings for this publisher to use. Apply per-partition. */
128137
public abstract Builder setBatchingSettings(BatchingSettings batchingSettings);
129138

139+
/**
140+
* Whether to enable publish idempotence, where the server will ensure that unique messages
141+
* within a single publisher session are stored only once. Default true.
142+
*/
143+
public abstract Builder setEnableIdempotence(boolean enableIdempotence);
144+
130145
/** A provider for credentials. */
131146
public abstract Builder setCredentialsProvider(CredentialsProvider credentialsProvider);
132147

@@ -163,6 +178,7 @@ private PublisherServiceClient newServiceClient() throws ApiException {
163178

164179
private PartitionPublisherFactory getPartitionPublisherFactory() {
165180
PublisherServiceClient client = newServiceClient();
181+
ByteString publisherClientId = UuidBuilder.toByteString(UuidBuilder.generate());
166182
return new PartitionPublisherFactory() {
167183
@Override
168184
public com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> newPublisher(
@@ -180,6 +196,9 @@ public com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> newPublis
180196
RoutingMetadata.of(topicPath(), partition));
181197
return client.publishCallable().splitCall(responseStream, context);
182198
});
199+
if (enableIdempotence()) {
200+
singlePartitionBuilder.setClientId(publisherClientId);
201+
}
183202
return singlePartitionBuilder.build();
184203
}
185204

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal;
18+
19+
import com.google.auto.value.AutoValue;
20+
import java.io.Serializable;
21+
22+
/** A sequence number for a published message, for implementing publish idempotency. */
23+
@AutoValue
24+
public abstract class PublishSequenceNumber implements Serializable {
25+
26+
/** Create a publish sequence number from its long value. */
27+
public static PublishSequenceNumber of(long sequenceNumber) {
28+
return new AutoValue_PublishSequenceNumber(sequenceNumber);
29+
}
30+
31+
/** The sequence number that should be set for the first message in a publisher session. */
32+
public static final PublishSequenceNumber FIRST = PublishSequenceNumber.of(0);
33+
34+
/** Returns the next sequence number that follows the current. */
35+
public PublishSequenceNumber next() {
36+
return PublishSequenceNumber.of(value() + 1);
37+
}
38+
39+
/** The long value of this publish sequence number. */
40+
public abstract long value();
41+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.internal;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiService;
21+
import com.google.cloud.pubsublite.Message;
22+
import java.io.Flushable;
23+
24+
/**
25+
* A PubSub Lite publisher that requires a sequence number assigned to every message, for publish
26+
* idempotency. Errors are handled out of band. Thread safe.
27+
*/
28+
public interface SequencedPublisher<ResponseT> extends ApiService, Flushable {
29+
/**
30+
* Publish a new message with an assigned sequence number.
31+
*
32+
* <p>Behavior is undefined if a call to flush() is outstanding or close() has already been
33+
* called. This method never blocks.
34+
*
35+
* <p>Guarantees that if a single publish future has an exception set, all publish calls made
36+
* after that will also have an exception set.
37+
*/
38+
ApiFuture<ResponseT> publish(Message message, PublishSequenceNumber sequenceNumber);
39+
40+
/** Attempts to cancel all outstanding publishes. */
41+
void cancelOutstandingPublishes();
42+
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerSettings.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
2222
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient;
2323
import com.google.common.flogger.GoogleLogger;
24-
import com.google.protobuf.ByteString;
25-
import java.nio.ByteBuffer;
2624
import java.util.UUID;
2725

2826
@AutoValue
@@ -39,7 +37,7 @@ public abstract class AssignerSettings {
3937
abstract UUID uuid();
4038

4139
public static Builder newBuilder() {
42-
return new AutoValue_AssignerSettings.Builder().setUuid(UUID.randomUUID());
40+
return new AutoValue_AssignerSettings.Builder().setUuid(UuidBuilder.generate());
4341
}
4442

4543
@AutoValue.Builder
@@ -58,16 +56,13 @@ public abstract static class Builder {
5856
}
5957

6058
public Assigner instantiate() {
61-
ByteBuffer uuidBuffer = ByteBuffer.allocate(16);
62-
uuidBuffer.putLong(uuid().getMostSignificantBits());
63-
uuidBuffer.putLong(uuid().getLeastSignificantBits());
6459
logger.atInfo().log(
6560
"Subscription %s using UUID %s for assignment.", subscriptionPath(), uuid());
6661

6762
InitialPartitionAssignmentRequest initial =
6863
InitialPartitionAssignmentRequest.newBuilder()
6964
.setSubscription(subscriptionPath().toString())
70-
.setClientId(ByteString.copyFrom(uuidBuffer.array()))
65+
.setClientId(UuidBuilder.toByteString(uuid()))
7166
.build();
7267
return new AssignerImpl(serviceClient(), initial, receiver());
7368
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisher.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616

1717
package com.google.cloud.pubsublite.internal.wire;
1818

19+
import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
1920
import com.google.cloud.pubsublite.proto.PubSubMessage;
2021
import java.util.Collection;
2122

2223
interface BatchPublisher extends AutoCloseable {
23-
/** Publish the batch of messages. Failures are communicated out of band. */
24-
void publish(Collection<PubSubMessage> messages);
24+
/**
25+
* Publish the batch of messages, with the given sequence number of the first message in the
26+
* batch. Failures are communicated out of band.
27+
*/
28+
void publish(Collection<PubSubMessage> messages, PublishSequenceNumber firstSequenceNumber);
2529
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616

1717
package com.google.cloud.pubsublite.internal.wire;
1818

19-
import com.google.cloud.pubsublite.Offset;
19+
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
2020
import com.google.cloud.pubsublite.proto.PublishRequest;
2121
import com.google.cloud.pubsublite.proto.PublishResponse;
2222

2323
interface BatchPublisherFactory
24-
extends SingleConnectionFactory<PublishRequest, PublishResponse, Offset, BatchPublisher> {}
24+
extends SingleConnectionFactory<
25+
PublishRequest, PublishResponse, MessagePublishResponse, BatchPublisher> {}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/BatchPublisherImpl.java

Lines changed: 20 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,48 +19,52 @@
1919
import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
2020

2121
import com.google.api.gax.rpc.ResponseObserver;
22-
import com.google.api.gax.rpc.StatusCode.Code;
23-
import com.google.cloud.pubsublite.Offset;
2422
import com.google.cloud.pubsublite.internal.CheckedApiException;
25-
import com.google.cloud.pubsublite.internal.CloseableMonitor;
23+
import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
2624
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
25+
import com.google.cloud.pubsublite.proto.MessagePublishRequest;
2726
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
2827
import com.google.cloud.pubsublite.proto.PubSubMessage;
2928
import com.google.cloud.pubsublite.proto.PublishRequest;
3029
import com.google.cloud.pubsublite.proto.PublishResponse;
31-
import com.google.errorprone.annotations.concurrent.GuardedBy;
3230
import java.util.Collection;
33-
import java.util.Optional;
3431

35-
class BatchPublisherImpl extends SingleConnection<PublishRequest, PublishResponse, Offset>
32+
class BatchPublisherImpl
33+
extends SingleConnection<PublishRequest, PublishResponse, MessagePublishResponse>
3634
implements BatchPublisher {
37-
private final CloseableMonitor monitor = new CloseableMonitor();
38-
39-
@GuardedBy("monitor.monitor")
40-
private Optional<Offset> lastOffset = Optional.empty();
41-
4235
static class Factory implements BatchPublisherFactory {
4336
@Override
4437
public BatchPublisherImpl New(
4538
StreamFactory<PublishRequest, PublishResponse> streamFactory,
46-
ResponseObserver<Offset> clientStream,
39+
ResponseObserver<MessagePublishResponse> clientStream,
4740
PublishRequest initialRequest) {
4841
return new BatchPublisherImpl(streamFactory::New, clientStream, initialRequest);
4942
}
5043
}
5144

45+
private final boolean sendSequenceNumbers;
46+
5247
private BatchPublisherImpl(
5348
PublishStreamFactory streamFactory,
54-
ResponseObserver<Offset> publishCompleteStream,
49+
ResponseObserver<MessagePublishResponse> publishCompleteStream,
5550
PublishRequest initialRequest) {
5651
super(streamFactory, publishCompleteStream);
5752
initialize(initialRequest);
53+
54+
// Publish idempotency is enabled when a publisher client id is specified. Otherwise do not send
55+
// sequence numbers to the stream.
56+
this.sendSequenceNumbers = !initialRequest.getInitialRequest().getClientId().isEmpty();
5857
}
5958

6059
@Override
61-
public void publish(Collection<PubSubMessage> messages) {
60+
public void publish(
61+
Collection<PubSubMessage> messages, PublishSequenceNumber firstSequenceNumber) {
6262
PublishRequest.Builder builder = PublishRequest.newBuilder();
63-
builder.getMessagePublishRequestBuilder().addAllMessages(messages);
63+
MessagePublishRequest.Builder publishRequestBuilder = builder.getMessagePublishRequestBuilder();
64+
publishRequestBuilder.addAllMessages(messages);
65+
if (sendSequenceNumbers) {
66+
publishRequestBuilder.setFirstSequenceNumber(firstSequenceNumber.value());
67+
}
6468
sendToStream(builder.build());
6569
}
6670

@@ -77,18 +81,6 @@ protected void handleStreamResponse(PublishResponse response) throws CheckedApiE
7781
checkState(
7882
response.hasMessageResponse(),
7983
"Received response on stream which was neither a message or initial response.");
80-
onMessageResponse(response.getMessageResponse());
81-
}
82-
83-
private void onMessageResponse(MessagePublishResponse response) throws CheckedApiException {
84-
Offset offset = Offset.of(response.getStartCursor().getOffset());
85-
try (CloseableMonitor.Hold h = monitor.enter()) {
86-
if (lastOffset.isPresent() && offset.value() <= lastOffset.get().value()) {
87-
throw new CheckedApiException(
88-
"Received out of order offsets on stream.", Code.FAILED_PRECONDITION);
89-
}
90-
lastOffset = Optional.of(offset);
91-
}
92-
sendToClient(offset);
84+
sendToClient(response.getMessageResponse());
9385
}
9486
}

0 commit comments

Comments
 (0)