Skip to content

Commit fe2bc29

Browse files
ahmedabu98sjvanrossumSteven van Rossum
authored
Support writing to Pubsub with ordering key; Add PubsubMessage SchemaCoder (#31608)
* support writing pubsub messages with ordering key * Add ordering key size validation to validatePubsubMessageSize * Refactor writeMessagesWithOrderingKey into withOrderingKey * Route to bad records if key is defined, but would be dropped silently * Add publishBatchWithOrderingKey to PubsubUnboundedSink * Abort override if PubsubUnboundedSink set publishBatchWithOrderingKey * Add support for ordering keys in PubsubBoundedWriter * Add support for ordering keys in PubsubUnboundedSink * Remove nullable ordering keys, null and empty are equivalent * Construct OutgoingMessage with Beam PubsubMessage to reduce repetition * Improve readability of PubsubUnboundedSink batch assignment * Add size validation TODOs * Replace auto-sharding sink comment with FR link, move to relevant place * Add links to Pub/Sub documentation * Refine comment about lack of ordering key support in Dataflow's sink Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> * Add TODO to remove ordering key check once all sinks support this * Add missing return statement Co-authored-by: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> * Remove duplicated statements * Apply Spotless * Add notable changes * address comments * allow messages with ordering keys even when the sink isn't configured to accept ordering keys * spotless * spotless * add warning log when ordering key is not configured * address comments * Distribute ordering keys across shards with consistent hashing * Drop the ordering key field if ordering key writes are disabled * Add more context to TODOs and remove one TODO occurrence * Add soft deprecation notice to coders for maintainers of PubsubIO * Use attachValues instead of constructing a map of named fields * Apply suggestions from code review * Add missing import of Nullable * Fix row builder * Add missing nullable annotation to attachValues * Remove unused field pubishBatchSize * Rewrite integration test for ordering key writes * Fix assertion failure on empty ordering key * Add comments * Add unit tests for ordering key writes * Unconditionally reset test client state to prevent global state corruption across tests * Shuffle input for multiple ordering key batches * Add comment above call to shuffle --------- Co-authored-by: Steven van Rossum <sjvanrossum@google.com> Co-authored-by: Steven van Rossum <stevenvanrossum@gmail.com>
1 parent e038ad3 commit fe2bc29

20 files changed

+678
-78
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969

7070
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
7171
* Upgraded GoogleAdsAPI to v19 for GoogleAdsIO (Java) ([#34497](https://github.com/apache/beam/pull/34497)). Changed PTransform method from version-specified (`v17()`) to `current()` for better backward compatibility in the future.
72+
* Added support for writing to Pubsub with ordering keys (Java) ([#21162](https://github.com/apache/beam/issues/21162))
7273

7374
## New Features / Improvements
7475

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static java.nio.charset.StandardCharsets.UTF_8;
2121
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SINK;
2222
import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SOURCE;
23+
import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
2324
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
2425
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
2526
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
@@ -2150,6 +2151,15 @@ private static void translate(
21502151
PubsubUnboundedSink overriddenTransform,
21512152
StepTranslationContext stepContext,
21522153
PCollection input) {
2154+
if (overriddenTransform.getPublishBatchWithOrderingKey()) {
2155+
throw new UnsupportedOperationException(
2156+
String.format(
2157+
"The DataflowRunner does not currently support publishing to Pubsub with ordering keys. "
2158+
+ "%s is required to support publishing with ordering keys. "
2159+
+ "Set the pipeline option --experiments=%s to use this PTransform. "
2160+
+ "See https://issuetracker.google.com/issues/200955424 for current status.",
2161+
PubsubUnboundedSink.class.getSimpleName(), ENABLE_CUSTOM_PUBSUB_SINK));
2162+
}
21532163
stepContext.addInput(PropertyNames.FORMAT, "pubsub");
21542164
if (overriddenTransform.getTopicProvider() != null) {
21552165
if (overriddenTransform.getTopicProvider().isAccessible()) {

sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,7 @@ public Row attachValues(List<@Nullable Object> attachedValues) {
829829
return new RowWithStorage(schema, attachedValues);
830830
}
831831

832-
public Row attachValues(Object... values) {
832+
public Row attachValues(@Nullable Object... values) {
833833
return attachValues(Arrays.asList(values));
834834
}
835835

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,25 @@
2929
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
3030
import org.apache.beam.sdk.values.TupleTag;
3131
import org.apache.beam.sdk.values.ValueInSingleWindow;
32+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
3233
import org.checkerframework.checker.nullness.qual.Nullable;
3334
import org.joda.time.Instant;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3437

3538
public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage> {
39+
private static final Logger LOG = LoggerFactory.getLogger(PreparePubsubWriteDoFn.class);
3640
// See https://cloud.google.com/pubsub/quotas#resource_limits.
3741
private static final int PUBSUB_MESSAGE_DATA_MAX_BYTES = 10 << 20;
3842
private static final int PUBSUB_MESSAGE_MAX_ATTRIBUTES = 100;
3943
private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES = 256;
4044
private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES = 1024;
45+
private static final int ORDERING_KEY_MAX_BYTE_SIZE = 1024;
4146
// The amount of bytes that each attribute entry adds up to the request
4247
private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6;
48+
private final boolean usesOrderingKey;
4349
private int maxPublishBatchSize;
44-
50+
private boolean logOrderingKeyUnconfigured = false;
4551
private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction;
4652
@Nullable SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction;
4753
/** Last TopicPath that reported Lineage. */
@@ -66,6 +72,20 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
6672
}
6773
int totalSize = payloadSize;
6874

75+
@Nullable String orderingKey = message.getOrderingKey();
76+
if (orderingKey != null) {
77+
int orderingKeySize = orderingKey.getBytes(StandardCharsets.UTF_8).length;
78+
if (orderingKeySize > ORDERING_KEY_MAX_BYTE_SIZE) {
79+
throw new SizeLimitExceededException(
80+
"Pubsub message ordering key of length "
81+
+ orderingKeySize
82+
+ " exceeds maximum of "
83+
+ ORDERING_KEY_MAX_BYTE_SIZE
84+
+ " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits");
85+
}
86+
totalSize += orderingKeySize;
87+
}
88+
6989
@Nullable Map<String, String> attributes = message.getAttributeMap();
7090
if (attributes != null) {
7191
if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) {
@@ -125,12 +145,14 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
125145
SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction,
126146
@Nullable
127147
SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction,
148+
boolean usesOrderingKey,
128149
int maxPublishBatchSize,
129150
BadRecordRouter badRecordRouter,
130151
Coder<InputT> inputCoder,
131152
TupleTag<PubsubMessage> outputTag) {
132153
this.formatFunction = formatFunction;
133154
this.topicFunction = topicFunction;
155+
this.usesOrderingKey = usesOrderingKey;
134156
this.maxPublishBatchSize = maxPublishBatchSize;
135157
this.badRecordRouter = badRecordRouter;
136158
this.inputCoder = inputCoder;
@@ -179,6 +201,16 @@ public void process(
179201
null);
180202
reportedLineage = topic;
181203
}
204+
if (!usesOrderingKey && !Strings.isNullOrEmpty(message.getOrderingKey())) {
205+
if (!logOrderingKeyUnconfigured) {
206+
LOG.warn(
207+
"Encountered Pubsub message with ordering key but this sink was not configured to "
208+
+ "retain ordering keys, so they will be dropped. Please set #withOrderingKeys().");
209+
210+
logOrderingKeyUnconfigured = true;
211+
}
212+
message = message.withOrderingKey(null);
213+
}
182214
try {
183215
validatePubsubMessageSize(message, maxPublishBatchSize);
184216
} catch (SizeLimitExceededException e) {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import com.google.api.client.util.Clock;
2424
import com.google.auto.value.AutoValue;
25-
import com.google.protobuf.ByteString;
2625
import com.google.protobuf.Descriptors.Descriptor;
2726
import com.google.protobuf.DynamicMessage;
2827
import com.google.protobuf.InvalidProtocolBufferException;
@@ -1378,6 +1377,8 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>
13781377

13791378
abstract @Nullable String getPubsubRootUrl();
13801379

1380+
abstract boolean getPublishWithOrderingKey();
1381+
13811382
abstract BadRecordRouter getBadRecordRouter();
13821383

13831384
abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
@@ -1393,6 +1394,7 @@ static <T> Builder<T> newBuilder(
13931394
builder.setFormatFn(formatFn);
13941395
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
13951396
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
1397+
builder.setPublishWithOrderingKey(false);
13961398
builder.setValidate(false);
13971399
return builder;
13981400
}
@@ -1425,6 +1427,8 @@ abstract Builder<T> setFormatFn(
14251427

14261428
abstract Builder<T> setPubsubRootUrl(String pubsubRootUrl);
14271429

1430+
abstract Builder<T> setPublishWithOrderingKey(boolean publishWithOrderingKey);
1431+
14281432
abstract Builder<T> setBadRecordRouter(BadRecordRouter badRecordRouter);
14291433

14301434
abstract Builder<T> setBadRecordErrorHandler(
@@ -1510,6 +1514,19 @@ public Write<T> withMaxBatchBytesSize(int maxBatchBytesSize) {
15101514
return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build();
15111515
}
15121516

1517+
/**
1518+
* Writes to Pub/Sub with each record's ordering key. A subscription with message ordering
1519+
* enabled will receive messages published in the same region with the same ordering key in the
1520+
* order in which they were received by the service. Note that the order in which Beam publishes
1521+
* records to the service remains unspecified.
1522+
*
1523+
* @see <a href="https://cloud.google.com/pubsub/docs/ordering">Pub/Sub documentation on message
1524+
* ordering</a>
1525+
*/
1526+
public Write<T> withOrderingKey() {
1527+
return toBuilder().setPublishWithOrderingKey(true).build();
1528+
}
1529+
15131530
/**
15141531
* Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute
15151532
* with the specified name. The value of the attribute will be a number representing the number
@@ -1586,6 +1603,7 @@ public PDone expand(PCollection<T> input) {
15861603
new PreparePubsubWriteDoFn<>(
15871604
getFormatFn(),
15881605
topicFunction,
1606+
getPublishWithOrderingKey(),
15891607
maxMessageSize,
15901608
getBadRecordRouter(),
15911609
input.getCoder(),
@@ -1597,8 +1615,12 @@ public PDone expand(PCollection<T> input) {
15971615
pubsubMessageTuple
15981616
.get(BAD_RECORD_TAG)
15991617
.setCoder(BadRecord.getCoder(input.getPipeline())));
1600-
PCollection<PubsubMessage> pubsubMessages =
1601-
pubsubMessageTuple.get(pubsubMessageTupleTag).setCoder(PubsubMessageWithTopicCoder.of());
1618+
PCollection<PubsubMessage> pubsubMessages = pubsubMessageTuple.get(pubsubMessageTupleTag);
1619+
if (getPublishWithOrderingKey()) {
1620+
pubsubMessages.setCoder(PubsubMessageSchemaCoder.getSchemaCoder());
1621+
} else {
1622+
pubsubMessages.setCoder(PubsubMessageWithTopicCoder.of());
1623+
}
16021624
switch (input.isBounded()) {
16031625
case BOUNDED:
16041626
pubsubMessages.apply(
@@ -1618,6 +1640,7 @@ public PDone expand(PCollection<T> input) {
16181640
getTimestampAttribute(),
16191641
getIdAttribute(),
16201642
100 /* numShards */,
1643+
getPublishWithOrderingKey(),
16211644
MoreObjects.firstNonNull(
16221645
getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
16231646
MoreObjects.firstNonNull(
@@ -1679,7 +1702,9 @@ private class OutgoingData {
16791702
}
16801703
}
16811704

1682-
private transient Map<PubsubTopic, OutgoingData> output;
1705+
// NOTE: A single publish request may only write to one ordering key.
1706+
// See https://cloud.google.com/pubsub/docs/publisher#using-ordering-keys for details.
1707+
private transient Map<KV<PubsubTopic, String>, OutgoingData> output;
16831708

16841709
private transient PubsubClient pubsubClient;
16851710

@@ -1710,51 +1735,47 @@ public void startBundle(StartBundleContext c) throws IOException {
17101735
public void processElement(@Element PubsubMessage message, @Timestamp Instant timestamp)
17111736
throws IOException, SizeLimitExceededException {
17121737
// Validate again here just as a sanity check.
1738+
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
1739+
// - Size validation makes no distinction between JSON and Protobuf encoding
1740+
// - Accounting for HTTP to gRPC transcoding is non-trivial
17131741
PreparePubsubWriteDoFn.validatePubsubMessageSize(message, maxPublishBatchByteSize);
1714-
byte[] payload = message.getPayload();
1715-
int messageSize = payload.length;
1716-
1717-
PubsubTopic pubsubTopic;
1742+
// NOTE: The record id is always null since it will be assigned by Pub/Sub.
1743+
final OutgoingMessage msg =
1744+
OutgoingMessage.of(message, timestamp.getMillis(), null, message.getTopic());
1745+
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
1746+
// - Size validation makes no distinction between JSON and Protobuf encoding
1747+
// - Accounting for HTTP to gRPC transcoding is non-trivial
1748+
final int messageSize = msg.getMessage().getData().size();
1749+
1750+
final PubsubTopic pubsubTopic;
17181751
if (getTopicProvider() != null) {
17191752
pubsubTopic = getTopicProvider().get();
17201753
} else {
1721-
pubsubTopic =
1722-
PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(message.getTopic()));
1723-
}
1724-
// Checking before adding the message stops us from violating max batch size or bytes
1725-
OutgoingData currentTopicOutput =
1726-
output.computeIfAbsent(pubsubTopic, t -> new OutgoingData());
1727-
if (currentTopicOutput.messages.size() >= maxPublishBatchSize
1728-
|| (!currentTopicOutput.messages.isEmpty()
1729-
&& (currentTopicOutput.bytes + messageSize) >= maxPublishBatchByteSize)) {
1730-
publish(pubsubTopic, currentTopicOutput.messages);
1731-
currentTopicOutput.messages.clear();
1732-
currentTopicOutput.bytes = 0;
1754+
pubsubTopic = PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(msg.topic()));
17331755
}
17341756

1735-
Map<String, String> attributes = message.getAttributeMap();
1736-
String orderingKey = message.getOrderingKey();
1737-
1738-
com.google.pubsub.v1.PubsubMessage.Builder msgBuilder =
1739-
com.google.pubsub.v1.PubsubMessage.newBuilder()
1740-
.setData(ByteString.copyFrom(payload))
1741-
.putAllAttributes(attributes);
1742-
1743-
if (orderingKey != null) {
1744-
msgBuilder.setOrderingKey(orderingKey);
1757+
// Checking before adding the message stops us from violating max batch size or bytes
1758+
String orderingKey = getPublishWithOrderingKey() ? msg.getMessage().getOrderingKey() : "";
1759+
final OutgoingData currentTopicAndOrderingKeyOutput =
1760+
output.computeIfAbsent(KV.of(pubsubTopic, orderingKey), t -> new OutgoingData());
1761+
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
1762+
if (currentTopicAndOrderingKeyOutput.messages.size() >= maxPublishBatchSize
1763+
|| (!currentTopicAndOrderingKeyOutput.messages.isEmpty()
1764+
&& (currentTopicAndOrderingKeyOutput.bytes + messageSize)
1765+
>= maxPublishBatchByteSize)) {
1766+
publish(pubsubTopic, currentTopicAndOrderingKeyOutput.messages);
1767+
currentTopicAndOrderingKeyOutput.messages.clear();
1768+
currentTopicAndOrderingKeyOutput.bytes = 0;
17451769
}
17461770

1747-
// NOTE: The record id is always null.
1748-
currentTopicOutput.messages.add(
1749-
OutgoingMessage.of(
1750-
msgBuilder.build(), timestamp.getMillis(), null, message.getTopic()));
1751-
currentTopicOutput.bytes += messageSize;
1771+
currentTopicAndOrderingKeyOutput.messages.add(msg);
1772+
currentTopicAndOrderingKeyOutput.bytes += messageSize;
17521773
}
17531774

17541775
@FinishBundle
17551776
public void finishBundle() throws IOException {
1756-
for (Map.Entry<PubsubTopic, OutgoingData> entry : output.entrySet()) {
1757-
publish(entry.getKey(), entry.getValue().messages);
1777+
for (Map.Entry<KV<PubsubTopic, String>, OutgoingData> entry : output.entrySet()) {
1778+
publish(entry.getKey().getKey(), entry.getValue().messages);
17581779
}
17591780
output = null;
17601781
pubsubClient.close();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,11 @@ public List<IncomingMessage> pull(
226226
com.google.pubsub.v1.PubsubMessage.newBuilder();
227227
protoMessage.setData(ByteString.copyFrom(elementBytes));
228228
protoMessage.putAllAttributes(attributes);
229-
// PubsubMessage uses `null` to represent no ordering key where we want a default of "".
229+
// {@link PubsubMessage} uses `null` or empty string to represent no ordering key.
230+
// {@link com.google.pubsub.v1.PubsubMessage} does not track string field presence and uses
231+
// empty string as a default.
230232
if (pubsubMessage.getOrderingKey() != null) {
231233
protoMessage.setOrderingKey(pubsubMessage.getOrderingKey());
232-
} else {
233-
protoMessage.setOrderingKey("");
234234
}
235235
incomingMessages.add(
236236
IncomingMessage.of(

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,16 @@ public byte[] getPayload() {
112112
return impl.getMessageId();
113113
}
114114

115+
public PubsubMessage withOrderingKey(@Nullable String orderingKey) {
116+
return new PubsubMessage(
117+
Impl.create(
118+
impl.getTopic(),
119+
impl.getPayload(),
120+
impl.getAttributeMap(),
121+
impl.getMessageId(),
122+
orderingKey));
123+
}
124+
115125
/** Returns the ordering key of the message. */
116126
public @Nullable String getOrderingKey() {
117127
return impl.getOrderingKey();

0 commit comments

Comments
 (0)