Skip to content

Commit bb5ecd1

Browse files
authored
Revert "Cherrypick #15418 and #15515"
1 parent 3b74c92 commit bb5ecd1

25 files changed

+297
-936
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ class BeamModulePlugin implements Plugin<Project> {
446446
def errorprone_version = "2.3.4"
447447
def google_clients_version = "1.31.0"
448448
def google_cloud_bigdataoss_version = "2.2.2"
449-
def google_cloud_pubsublite_version = "1.0.4"
449+
def google_cloud_pubsublite_version = "0.13.2"
450450
def google_code_gson_version = "2.8.6"
451451
def google_oauth_clients_version = "1.31.0"
452452
// Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,36 +24,25 @@
2424
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
2525

2626
/** Common util functions for converting between PubsubMessage proto and {@link PubsubMessage}. */
27-
public final class PubsubMessages {
28-
private PubsubMessages() {}
29-
30-
public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) {
31-
Map<String, String> attributes = input.getAttributeMap();
32-
com.google.pubsub.v1.PubsubMessage.Builder message =
33-
com.google.pubsub.v1.PubsubMessage.newBuilder()
34-
.setData(ByteString.copyFrom(input.getPayload()));
35-
// TODO(BEAM-8085) this should not be null
36-
if (attributes != null) {
37-
message.putAllAttributes(attributes);
38-
}
39-
String messageId = input.getMessageId();
40-
if (messageId != null) {
41-
message.setMessageId(messageId);
42-
}
43-
return message.build();
44-
}
45-
46-
public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input) {
47-
return new PubsubMessage(
48-
input.getData().toByteArray(), input.getAttributesMap(), input.getMessageId());
49-
}
50-
27+
public class PubsubMessages {
5128
// Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation.
5229
public static class ParsePayloadAsPubsubMessageProto
5330
implements SerializableFunction<PubsubMessage, byte[]> {
5431
@Override
5532
public byte[] apply(PubsubMessage input) {
56-
return toProto(input).toByteArray();
33+
Map<String, String> attributes = input.getAttributeMap();
34+
com.google.pubsub.v1.PubsubMessage.Builder message =
35+
com.google.pubsub.v1.PubsubMessage.newBuilder()
36+
.setData(ByteString.copyFrom(input.getPayload()));
37+
// TODO(BEAM-8085) this should not be null
38+
if (attributes != null) {
39+
message.putAllAttributes(attributes);
40+
}
41+
String messageId = input.getMessageId();
42+
if (messageId != null) {
43+
message.setMessageId(messageId);
44+
}
45+
return message.build().toByteArray();
5746
}
5847
}
5948

@@ -65,7 +54,8 @@ public PubsubMessage apply(byte[] input) {
6554
try {
6655
com.google.pubsub.v1.PubsubMessage message =
6756
com.google.pubsub.v1.PubsubMessage.parseFrom(input);
68-
return fromProto(message);
57+
return new PubsubMessage(
58+
message.getData().toByteArray(), message.getAttributesMap(), message.getMessageId());
6959
} catch (InvalidProtocolBufferException e) {
7060
throw new RuntimeException("Could not decode Pubsub message", e);
7161
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.pubsublite;
19+
20+
import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer;
21+
22+
import com.google.cloud.pubsublite.Message;
23+
import com.google.cloud.pubsublite.proto.PubSubMessage;
24+
import org.apache.beam.sdk.transforms.MapElements;
25+
import org.apache.beam.sdk.transforms.PTransform;
26+
import org.apache.beam.sdk.values.PCollection;
27+
import org.apache.beam.sdk.values.TypeDescriptor;
28+
29+
/**
30+
* A class providing a conversion validity check between Cloud Pub/Sub and Pub/Sub Lite message
31+
* types.
32+
*/
33+
public final class CloudPubsubChecks {
34+
private CloudPubsubChecks() {}
35+
36+
/**
37+
* Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the
38+
* standard transformation methods in the client library.
39+
*
40+
* <p>Will fail the pipeline if a message has multiple attributes per key.
41+
*/
42+
public static PTransform<PCollection<? extends PubSubMessage>, PCollection<PubSubMessage>>
43+
ensureUsableAsCloudPubsub() {
44+
return MapElements.into(TypeDescriptor.of(PubSubMessage.class))
45+
.via(
46+
message -> {
47+
Object unused = toCpsPublishTransformer().transform(Message.fromProto(message));
48+
return message;
49+
});
50+
}
51+
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java

Lines changed: 0 additions & 104 deletions
This file was deleted.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)