Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.pubsub;

import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.client.util.Clock;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -834,6 +836,9 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
/** The name of the message attribute to read unique message IDs from. */
abstract @Nullable String getIdAttribute();

/** The maximum time to read from Pub/Sub. If not specified, will read indefinitely. */
abstract @Nullable Duration getMaxReadTime();

/** The coder used to decode each record. */
abstract Coder<T> getCoder();

Expand Down Expand Up @@ -896,6 +901,8 @@ abstract static class Builder<T> {

abstract Builder<T> setIdAttribute(String idAttribute);

abstract Builder<T> setMaxReadTime(@Nullable Duration maxReadTime);

abstract Builder<T> setCoder(Coder<T> coder);

abstract Builder<T> setParseFn(SerializableFunction<PubsubMessage, T> parseFn);
Expand Down Expand Up @@ -1079,6 +1086,17 @@ public Read<T> withIdAttribute(String idAttribute) {
return toBuilder().setIdAttribute(idAttribute).build();
}

/**
* Sets a maximum amount of time to read from the source.
*
* <p>If this is set, the source will be bounded and will stop reading after this much time has
* passed.
*/
public Read<T> withMaxReadTime(Duration maxReadTime) {
checkArgument(maxReadTime != null, "maxReadTime can not be null");
return toBuilder().setMaxReadTime(maxReadTime).build();
}

/**
* Causes the source to return a PubsubMessage that includes Pubsub attributes, and uses the
* given parsing function to transform the PubsubMessage into an output type. A Coder for the
Expand Down Expand Up @@ -1155,7 +1173,8 @@ public PCollection<T> expand(PBegin input) {
getIdAttribute(),
getNeedsAttributes(),
getNeedsMessageId(),
getNeedsOrderingKey());
getNeedsOrderingKey(),
getMaxReadTime());

PCollection<PubsubMessage> preParse = input.apply(source);
return expandReadContinued(preParse, topicPath, subscriptionPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ public abstract class PubsubReadSchemaTransformConfiguration {
// Used for testing only.
public abstract @Nullable Clock getClock();

@SchemaFieldDescription(
"The maximum time to read from Pub/Sub, in seconds. If not specified, will read indefinitely.")
public abstract @Nullable Long getMaxReadTimeSeconds();

@AutoValue
public abstract static class ErrorHandling {
@SchemaFieldDescription("The name of the output PCollection containing failed reads.")
Expand Down Expand Up @@ -146,6 +150,8 @@ public abstract static class Builder {
// Used for testing only.
public abstract Builder setClock(@Nullable Clock clock);

public abstract Builder setMaxReadTimeSeconds(@Nullable Long maxReadTimeSeconds);

public abstract PubsubReadSchemaTransformConfiguration build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads configured using
Expand Down Expand Up @@ -271,6 +272,11 @@ PubsubIO.Read<PubsubMessage> buildPubsubRead() {
if (!Strings.isNullOrEmpty(configuration.getTimestampAttribute())) {
pubsubRead = pubsubRead.withTimestampAttribute(configuration.getTimestampAttribute());
}
if (configuration.getMaxReadTimeSeconds() != null) {
pubsubRead =
pubsubRead.withMaxReadTime(
Duration.standardSeconds(configuration.getMaxReadTimeSeconds()));
}
return pubsubRead;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,14 +555,8 @@ public void acknowledge(SubscriptionPath subscription, List<String> ackIds) thro
STATE.expectedSubscription);

for (String ackId : ackIds) {
checkState(
STATE.ackDeadline.remove(ackId) != null,
"No message with ACK id %s is waiting for an ACK",
ackId);
checkState(
STATE.pendingAckIncomingMessages.remove(ackId) != null,
"No message with ACK id %s is waiting for an ACK",
ackId);
STATE.ackDeadline.remove(ackId);
STATE.pendingAckIncomingMessages.remove(ackId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,15 @@ public InFlightState(long requestTimeMsSinceEpoch, long ackDeadlineMsSinceEpoch)
/** Stats only: Maximum number of checkpoints in flight at any time. */
private int maxInFlightCheckpoints;

/** Stats only: Maximum read time before ending process. */
private @Nullable Duration maxReadTime;

/** Stats only: Start time of reading process from source. */
private long startTime;

/** Stats only: Process is finished or not. */
private boolean done;

private static MovingFunction newFun(Combine.BinaryCombineLongFn function) {
return new MovingFunction(
SAMPLE_PERIOD.getMillis(),
Expand Down Expand Up @@ -559,6 +568,9 @@ public PubsubReader(PubsubOptions options, PubsubSource outer, SubscriptionPath
numLateMessages = newFun(SUM);
numInFlightCheckpoints = new AtomicInteger();
maxInFlightCheckpoints = 0;
maxReadTime = outer.outer.maxReadTime;
startTime = now();
done = false;
}

@VisibleForTesting
Expand Down Expand Up @@ -839,6 +851,15 @@ public boolean start() throws IOException {
*/
@Override
public boolean advance() throws IOException {
if (done && notYetRead.isEmpty()) {
return false;
}

long now = now();
if (maxReadTime != null && now > startTime + maxReadTime.getMillis()) {
done = true;
}

// Emit stats.
stats();

Expand All @@ -862,7 +883,9 @@ public boolean advance() throws IOException {
if (notYetRead.isEmpty()) {
// Pull another batch.
// Will BLOCK until fetch returns, but will not block until a message is available.
pull();
if (maxReadTime == null || now() <= startTime + maxReadTime.getMillis()) {
pull();
}
}

// Take one message from queue.
Expand Down Expand Up @@ -950,7 +973,7 @@ public PubsubSource getCurrentSource() {

@Override
public Instant getWatermark() {
if (pubsubClient.get().isEOF() && notYetRead.isEmpty()) {
if (done || (pubsubClient.get().isEOF() && notYetRead.isEmpty())) {
// For testing only: Advance the watermark to the end of time to signal
// the test is complete.
return BoundedWindow.TIMESTAMP_MAX_VALUE;
Expand Down Expand Up @@ -1203,6 +1226,9 @@ public void populateDisplayData(DisplayData.Builder builder) {
/** Whether this source should include the orderingKey from PubSub. */
private final boolean needsOrderingKey;

/** The maximum time to read from Pub/Sub. If not specified, will read indefinitely. */
private final @Nullable Duration maxReadTime;

@VisibleForTesting
PubsubUnboundedSource(
Clock clock,
Expand All @@ -1214,7 +1240,8 @@ public void populateDisplayData(DisplayData.Builder builder) {
@Nullable String idAttribute,
boolean needsAttributes,
boolean needsMessageId,
boolean needsOrderingKey) {
boolean needsOrderingKey,
@Nullable Duration maxReadTime) {
checkArgument(
(topic == null) != (subscription == null),
"Exactly one of topic and subscription must be given");
Expand All @@ -1228,6 +1255,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
this.needsAttributes = needsAttributes;
this.needsMessageId = needsMessageId;
this.needsOrderingKey = needsOrderingKey;
this.maxReadTime = maxReadTime;
}

/** Construct an unbounded source to consume from the Pubsub {@code subscription}. */
Expand All @@ -1249,7 +1277,8 @@ public PubsubUnboundedSource(
idAttribute,
needsAttributes,
false,
false);
false,
null);
}

/** Construct an unbounded source to consume from the Pubsub {@code subscription}. */
Expand All @@ -1272,7 +1301,8 @@ public PubsubUnboundedSource(
idAttribute,
needsAttributes,
false,
false);
false,
null);
}

/** Construct an unbounded source to consume from the Pubsub {@code subscription}. */
Expand All @@ -1295,7 +1325,8 @@ public PubsubUnboundedSource(
idAttribute,
needsAttributes,
needsMessageId,
false);
false,
null);
}

/** Get the project path. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Rule;
Expand Down Expand Up @@ -989,6 +990,45 @@ public void testReadWithoutValidation() throws IOException {
read.validate(options);
}

@Test
public void testReadWithMaxReadTime() {
// Create 1000 messages
List<IncomingMessage> messages =
IntStream.range(0, 1000)
.mapToObj(
i ->
IncomingMessage.of(
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8("message-" + i))
.build(),
1234L,
0,
UUID.randomUUID().toString(),
UUID.randomUUID().toString()))
.collect(Collectors.toList());

// Create Pubsub client factory
clientFactory = PubsubTestClient.createFactoryForPull(CLOCK, SUBSCRIPTION, 60, messages);

// Read messages
PCollection<String> read =
pipeline.apply(
PubsubIO.readStrings()
.fromSubscription(SUBSCRIPTION.getPath())
.withMaxReadTime(Duration.standardSeconds(2))
.withClock(CLOCK)
.withClientFactory(clientFactory));

// Check that some messages are read and appropriately stops.
PAssert.that(read)
.satisfies(
input -> {
assertThat(input.iterator().hasNext(), is(true));
return null;
});
pipeline.run();
}

@Test
public void testWriteTopicValidationSuccess() throws Exception {
PubsubIO.writeStrings().to("projects/my-project/topics/abc");
Expand Down
Loading
Loading