Skip to content

Commit 9ed550a

Browse files
authored
Add PubSubToBigTable blueprint/template (#3150)
* first draft * generated pubsubtoBigTableYaml.java first draft * updataed blueprint with options * add option files * first draft of it file * first draft of readme * add bigtable options * add new builder method * add windowing option file * update pipeline based on new windowing parameter * generated new java template file * updated it to handle pub sub messages better etc
1 parent 6b018bc commit 9ed550a

File tree

9 files changed

+831
-0
lines changed

9 files changed

+831
-0
lines changed

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/conditions/PubsubMessagesCheck.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.pubsub.v1.PullResponse;
2222
import com.google.pubsub.v1.ReceivedMessage;
2323
import com.google.pubsub.v1.SubscriptionName;
24+
import com.google.pubsub.v1.TopicName;
2425
import java.util.ArrayList;
2526
import java.util.List;
2627
import javax.annotation.Nullable;
@@ -86,6 +87,12 @@ public CheckResult check() {
8687
String.format("Expected at least %d messages and found %d", minMessages(), totalRows));
8788
}
8889

90+
public static Builder builder(
91+
PubsubResourceManager resourceManager, TopicName topic, String subscription) {
92+
SubscriptionName subscriptionName = resourceManager.createSubscription(topic, subscription);
93+
return builder(resourceManager, subscriptionName);
94+
}
95+
8996
public static Builder builder(
9097
PubsubResourceManager resourceManager, SubscriptionName subscription) {
9198
return new AutoValue_PubsubMessagesCheck.Builder()
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
2+
PubSub to BigTable (YAML) template
3+
---
4+
The PubSub to BigTable template is a streaming pipeline which ingests data from a
5+
PubSub topic, executes a user-defined mapping, and writes the resulting records
6+
to BigTable. Any errors which occur in the transformation of the data are written
7+
to a separate Pub/Sub topic.
8+
9+
10+
:memo: This is a Google-provided template! Please
11+
check [Provided templates documentation](https://cloud.google.com/dataflow/docs/guides/templates/provided-yaml/pubsub-to-bigtable)
12+
on how to use it without having to build from sources using [Create job from template](https://console.cloud.google.com/dataflow/createjob?template=PubSub_To_BigTable_Yaml).
13+
14+
:bulb: This is a generated documentation based
15+
on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/code-contributions.md#metadata-annotations)
16+
. Do not change this file directly.
17+
18+
## Parameters
19+
20+
### Required parameters
21+
22+
* **topic**: Pub/Sub topic to read the input from. For example, `projects/your-project-id/topics/your-topic-name`.
23+
* **schema**: A schema is required if data format is JSON, AVRO or PROTO. For JSON, this is a JSON schema. For AVRO and PROTO, this is the full schema definition.
24+
* **language**: The language used to define (and execute) the expressions and/or callables in fields. Defaults to generic.
25+
* **fields**: The output fields to compute, each mapping to the expression or callable that creates them.
26+
* **project_id**: The Google Cloud project ID of the BigTable instance.
27+
* **instance_id**: The BigTable instance ID.
28+
* **table_id**: BigTable table ID to write the output to.
29+
* **outputDeadLetterPubSubTopic**: Pub/Sub error topic for failed transformation messages. For example, `projects/your-project-id/topics/your-error-topic-name`.
30+
31+
### Optional parameters
32+
33+
* **format**: The message format. One of: AVRO, JSON, PROTO, RAW, or STRING. Defaults to: JSON.
34+
35+
36+
37+
## Getting Started
38+
39+
### Requirements
40+
41+
* Java 17
42+
* Maven
43+
* [gcloud CLI](https://cloud.google.com/sdk/gcloud), and execution of the
44+
following commands:
45+
* `gcloud auth login`
46+
* `gcloud auth application-default login`
47+
48+
:star2: Those dependencies are pre-installed if you use Google Cloud Shell!
49+
50+
[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https%3A%2F%2Fgithub.com%2FGoogleCloudPlatform%2FDataflowTemplates.git&cloudshell_open_in_editor=yaml/src/main/java/com/google/cloud/teleport/templates/yaml/PubSubToBigTableYaml.java)
51+
52+
### Templates Plugin
53+
54+
This README provides instructions using
55+
the [Templates Plugin](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/code-contributions.md#templates-plugin).
56+
57+
#### Validating the Template
58+
59+
This template has a validation command that is used to check code quality.
60+
61+
```shell
62+
mvn clean install -PtemplatesValidate \
63+
-DskipTests -am \
64+
-pl yaml
65+
```
66+
67+
### Building Template
68+
69+
This template is a Flex Template, meaning that the pipeline code will be
70+
containerized and the container will be executed on Dataflow. Please
71+
check [Use Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates)
72+
and [Configure Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates)
73+
for more information.
74+
75+
#### Staging the Template
76+
77+
If the plan is to just stage the template (i.e., make it available to use) by
78+
the `gcloud` command or Dataflow "Create job from template" UI,
79+
the `-PtemplatesStage` profile should be used:
80+
81+
```shell
82+
export PROJECT=<my-project>
83+
export BUCKET_NAME=<bucket-name>
84+
export ARTIFACT_REGISTRY_REPO=<region>-docker.pkg.dev/$PROJECT/<repo>
85+
86+
mvn clean package -PtemplatesStage \
87+
-DskipTests \
88+
-DprojectId="$PROJECT" \
89+
-DbucketName="$BUCKET_NAME" \
90+
-DartifactRegistry="$ARTIFACT_REGISTRY_REPO" \
91+
-DstagePrefix="templates" \
92+
-DtemplateName="PubSub_To_BigTable_Yaml" \
93+
-f yaml
94+
```
95+
96+
The `-DartifactRegistry` parameter can be specified to set the artifact registry repository of the Flex Templates image.
97+
If not provided, it defaults to `gcr.io/<project>`.
98+
99+
The command should build and save the template to Google Cloud, and then print
100+
the complete location on Cloud Storage:
101+
102+
```
103+
Flex Template was staged! gs://<bucket-name>/templates/flex/PubSub_To_BigTable_Yaml
104+
```
105+
106+
The specific path should be copied as it will be used in the following steps.
107+
108+
#### Running the Template
109+
110+
**Using the staged template**:
111+
112+
You can use the path above run the template (or share with others for execution).
113+
114+
To start a job with the template at any time using `gcloud`, you are going to
115+
need valid resources for the required parameters.
116+
117+
Provided that, the following command line can be used:
118+
119+
```shell
120+
export PROJECT=<my-project>
121+
export BUCKET_NAME=<bucket-name>
122+
export REGION=us-central1
123+
export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/flex/PubSub_To_BigTable_Yaml"
124+
125+
### Required
126+
export TOPIC=<topic>
127+
export SCHEMA=<schema>
128+
export LANGUAGE=<language>
129+
export FIELDS=<fields>
130+
export PROJECT_ID=<project_id>
131+
export INSTANCE_ID=<instance_id>
132+
export TABLE_ID=<table_id>
133+
export OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC=<outputDeadLetterPubSubTopic>
134+
135+
### Optional
136+
export FORMAT=JSON
137+
138+
gcloud dataflow flex-template run "pubsub-to-bigtable-yaml-job" \
139+
--project "$PROJECT" \
140+
--region "$REGION" \
141+
--template-file-gcs-location "$TEMPLATE_SPEC_GCSPATH" \
142+
--parameters "topic=$TOPIC" \
143+
--parameters "format=$FORMAT" \
144+
--parameters "schema=$SCHEMA" \
145+
--parameters "language=$LANGUAGE" \
146+
--parameters "fields=$FIELDS" \
147+
--parameters "project_id=$PROJECT_ID" \
148+
--parameters "instance_id=$INSTANCE_ID" \
149+
--parameters "table_id=$TABLE_ID" \
150+
--parameters "outputDeadLetterPubSubTopic=$OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC"
151+
```
152+
153+
For more information about the command, please check:
154+
https://cloud.google.com/sdk/gcloud/reference/dataflow/flex-template/run
155+
156+
157+
**Using the plugin**:
158+
159+
Instead of just generating the template in the folder, it is possible to stage
160+
and run the template in a single command. This may be useful for testing when
161+
changing the templates.
162+
163+
```shell
164+
export PROJECT=<my-project>
165+
export BUCKET_NAME=<bucket-name>
166+
export REGION=us-central1
167+
168+
### Required
169+
export TOPIC=<topic>
170+
export SCHEMA=<schema>
171+
export LANGUAGE=<language>
172+
export FIELDS=<fields>
173+
export PROJECT_ID=<project_id>
174+
export INSTANCE_ID=<instance_id>
175+
export TABLE_ID=<table_id>
176+
export OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC=<outputDeadLetterPubSubTopic>
177+
178+
### Optional
179+
export FORMAT=JSON
180+
181+
mvn clean package -PtemplatesRun \
182+
-DskipTests \
183+
-DprojectId="$PROJECT" \
184+
-DbucketName="$BUCKET_NAME" \
185+
-Dregion="$REGION" \
186+
-DjobName="pubsub-to-bigtable-yaml-job" \
187+
-DtemplateName="PubSub_To_BigTable_Yaml" \
188+
-Dparameters="topic=$TOPIC,format=$FORMAT,schema=$SCHEMA,language=$LANGUAGE,fields=$FIELDS,project_id=$PROJECT_ID,instance_id=$INSTANCE_ID,table_id=$TABLE_ID,outputDeadLetterPubSubTopic=$OUTPUT_DEAD_LETTER_PUB_SUB_TOPIC" \
189+
-f yaml
190+
```
191+
192+
## Terraform
193+
194+
Dataflow supports the utilization of Terraform to manage template jobs,
195+
see [dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job).
196+
197+
Terraform modules have been generated for most templates in this repository. This includes the relevant parameters
198+
specific to the template. If available, they may be used instead of
199+
[dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job)
200+
directly.
201+
202+
To use the autogenerated module, execute the standard
203+
[terraform workflow](https://developer.hashicorp.com/terraform/intro/core-workflow):
204+
205+
```shell
206+
cd v2/yaml/terraform/PubSub_To_BigTable_Yaml
207+
terraform init
208+
terraform apply
209+
```
210+
211+
To use
212+
[dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job)
213+
directly:
214+
215+
```terraform
216+
provider "google-beta" {
217+
project = var.project
218+
}
219+
variable "project" {
220+
default = "<my-project>"
221+
}
222+
variable "region" {
223+
default = "us-central1"
224+
}
225+
226+
resource "google_dataflow_flex_template_job" "pubsub_to_bigtable_yaml" {
227+
228+
provider = google-beta
229+
container_spec_gcs_path = "gs://dataflow-templates-${var.region}/latest/flex/PubSub_To_BigTable_Yaml"
230+
name = "pubsub-to-bigtable-yaml"
231+
region = var.region
232+
parameters = {
233+
topic = "<topic>"
234+
schema = "<schema>"
235+
language = "<language>"
236+
fields = "<fields>"
237+
project_id = "<project_id>"
238+
instance_id = "<instance_id>"
239+
table_id = "<table_id>"
240+
outputDeadLetterPubSubTopic = "<outputDeadLetterPubSubTopic>"
241+
# format = "JSON"
242+
}
243+
}
244+
```

0 commit comments

Comments
 (0)