Skip to content

Commit ab2c24c

Browse files
authored
Merge pull request #919 from data-integrations/dataplex-sink-plugin-code
Code Added for Dataplex sink plugin.
2 parents 0659ce3 + f30c618 commit ab2c24c

File tree

11 files changed

+3382
-1
lines changed

11 files changed

+3382
-1
lines changed

docs/Dataplex-batchsink.md

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
# Google Dataplex Sink
2+
3+
Description
4+
-----------
5+
This sink writes to a Dataplex asset which is either Google Cloud Storage Bucket or BigQuery Dataset.
6+
7+
8+
Credentials
9+
-----------
10+
If the plugin is run on a Google Cloud Dataproc cluster, the service account key does not need to be
11+
provided and can be set to 'auto-detect'.
12+
Credentials will be automatically read from the cluster environment.
13+
14+
If the plugin is not run on a Dataproc cluster, the path to a service account key must be provided.
15+
The service account key can be found on the Dashboard in the Cloud Platform Console.
16+
Make sure the account key has permission to access BigQuery and Google Cloud Storage.
17+
The service account key file needs to be available on every node in your cluster and
18+
must be readable by all users running the job.
19+
20+
Properties
21+
----------
22+
**Reference Name:** Name used to uniquely identify this sink for lineage, annotating metadata, etc.
23+
24+
**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
25+
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
26+
that the Dataplex job will run in. If a temporary bucket needs to be created, the service account
27+
must have permission in this project to create buckets.
28+
29+
**Service Account** - service account key used for authorization
30+
31+
* **File Path**: Path on the local file system of the service account key used for
32+
authorization. Can be set to 'auto-detect' when running on a Dataproc cluster.
33+
When running on other clusters, the file must be present on every node in the cluster.
34+
35+
* **JSON**: Contents of the service account JSON file.
36+
37+
**Location ID**: Location ID in which the Dataplex lake has been created. On the **Lake** page, click the name of the lake.
38+
The lake page has the location ID.
39+
40+
**Lake ID**: ID for the Dataplex lake. You can find it on the **Lake** detail page in Dataplex.
41+
42+
**Zone ID**: ID for the Dataplex zone. You can find it on the **Zone** detail page in Dataplex.
43+
44+
**Asset ID**: ID for the Dataplex Asset. It represents a cloud resource that is being managed within a lake as a member
45+
of a zone.
46+
47+
**Asset Type**: Type of asset selected to ingest the data in Dataplex.
48+
* Bigquery Dataset - Asset is of type BigQuery dataset.
49+
* Storage Bucket - Asset is of type Storage Bucket.
50+
51+
BigQuery Dataset specific fields
52+
----------
53+
54+
**Table Name**: Table to write to. A table contains individual records organized in rows.
55+
Each record is composed of columns (also called fields).
56+
Every table is defined by a schema that describes the column names, data types, and other information.
57+
58+
**Operation**: Type of write operation to perform. This can be set to Insert, Update or Upsert.
59+
* Insert - all records will be inserted in destination table.
60+
* Update - records that match on Table Key will be updated in the table. Records that do not match
61+
will be dropped.
62+
* Upsert - records that match on Table Key will be updated. Records that do not match will be inserted.
63+
64+
**Table Key**: List of fields that determines relation between tables during Update and Upsert operations.
65+
66+
**Dedupe By**: Column names and sort order used to choose which input record to update/upsert when there are
67+
multiple input records with the same key. For example, if this is set to 'updated_time desc', then if there are
68+
multiple input records with the same key, the one with the largest value for 'updated_time' will be applied.
69+
70+
**Partition Filter**: Partition filter that can be used for partition elimination during Update or
71+
Upsert operations. Should only be used with Update or Upsert operations for tables where
72+
require partition filter is enabled. For example, if the table is partitioned the Partition Filter
73+
'_PARTITIONTIME > "2020-01-01" and _PARTITIONTIME < "2020-03-01"',
74+
the update operation will be performed only in the partitions meeting the criteria.
75+
76+
**Truncate Table**: Whether or not to truncate the table before writing to it.
77+
Should only be used with the Insert operation.
78+
79+
**Update Table Schema**: Whether the BigQuery table schema should be modified
80+
when it does not match the schema expected by the pipeline.
81+
* When this is set to false, any mismatches between the schema expected by the pipeline
82+
and the schema in BigQuery will result in pipeline failure.
83+
* When this is set to true, the schema in BigQuery will be updated to match the schema
84+
expected by the pipeline, assuming the schemas are compatible.
85+
86+
**Partitioning Type**: Specifies the partitioning type. Can either be Integer or Time or None. Defaults to Time.
87+
This value is ignored if the table already exists.
88+
* When this is set to Time, table will be created with time partitioning.
89+
* When this is set to Integer, table will be created with range partitioning.
90+
* When this is set to None, table will be created without time partitioning.
91+
92+
**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t
93+
exist already, and partitioning type is set to Integer.
94+
* The start value is inclusive.
95+
96+
**Range End**: For integer partitioning, specifies the end of the range. Only used when table doesn’t
97+
exist already, and partitioning type is set to Integer.
98+
* The end value is exclusive.
99+
100+
**Range Interval**: For integer partitioning, specifies the partition interval. Only used when table doesn’t exist already,
101+
and partitioning type is set to Integer.
102+
* The interval value must be a positive integer.
103+
104+
**Partition Field**: Partitioning column for the BigQuery table. This should be left empty if the
105+
BigQuery table is an ingestion-time partitioned table.
106+
107+
**Require Partition Filter**: Whether to create a table that requires a partition filter. This value
108+
is ignored if the table already exists.
109+
* When this is set to true, table will be created with required partition filter.
110+
* When this is set to false, table will be created without required partition filter.
111+
112+
**Clustering Order**: List of fields that determines the sort order of the data. Fields must be of type
113+
INT, LONG, STRING, DATE, TIMESTAMP, BOOLEAN or DECIMAL. Tables cannot be clustered on more than 4 fields.
114+
This value is only used when the BigQuery table is automatically created and ignored if the table
115+
already exists.
116+
117+
118+
Compatible changes fall under the following categories:
119+
* the pipeline schema contains nullable fields that do not exist in the BigQuery schema.
120+
In this case, the new fields will be added to the BigQuery schema.
121+
* the pipeline schema contains nullable fields that are non-nullable in the BigQuery schema.
122+
In this case, the fields will be modified to become nullable in the BigQuery schema.
123+
* the pipeline schema does not contain fields that exist in the BigQuery schema.
124+
In this case, those fields in the BigQuery schema will be modified to become nullable.
125+
126+
Incompatible schema changes will result in pipeline failure.
127+
128+
**Schema**: Schema of the data to write.
129+
If a schema is provided, it must be compatible with the table schema in BigQuery.
130+
131+
Data Type Mappings from CDAP to BigQuery Asset
132+
----------
133+
The following table lists out different CDAP data types, as well as the
134+
corresponding BigQuery data type for each CDAP type, for updates and upserts.
135+
136+
| CDAP type | BigQuery type |
137+
|----------------|---------------|
138+
| array | repeated |
139+
| boolean | bool |
140+
| bytes | bytes |
141+
| date | date |
142+
| datetime | datetime, string|
143+
| decimal | numeric |
144+
| double / float | float64 |
145+
| enum | unsupported |
146+
| int / long | int64 |
147+
| map | unsupported |
148+
| record | struct |
149+
| string | string, datetime(Should be ISO 8601 format)|
150+
| time | time |
151+
| timestamp | timestamp |
152+
| union | unsupported |
153+
154+
For inserts, the type conversions are the same as those used in loading Avro
155+
data to BigQuery; the table is available
156+
[here](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions).
157+
158+
Storage Bucket specific fields
159+
----------
160+
161+
**Table Name**: Table to write to. In context of GCS Bucket, a table is a directory where data would be stored
162+
and read by dataplex discover jobs.
163+
164+
**Path Suffix:** Time format for the output directory that will be appended to the path.
165+
For example, the format 'yyyy-MM-dd-HH-mm' will result in a directory of the form '2015-01-01-20-42'.
166+
If not specified, nothing will be appended to the path."
167+
168+
**Format:** Format to write the records in.
169+
The format must be one of 'json', 'avro', 'parquet', 'csv', 'orc' in case of raw zone
170+
and 'avro', 'parquet', 'orc' in case of curated zone.
171+
If the format is a macro, only the pre-packaged formats can be used.
172+
173+
**Content Type:** The Content Type entity is used to indicate the media type of the resource.
174+
Defaults to 'application/octet-stream'. The following table shows valid content types for each format.
175+
176+
| Format type | Content type |
177+
|---------------|--------------------------------------------------------------------------------------------|
178+
| avro | application/avro, application/octet-stream |
179+
| csv | text/csv, application/csv, text/plain, application/octet-stream |
180+
| json | application/json, text/plain, application/octet-stream |
181+
| orc | application/octet-stream |
182+
| parquet | application/octet-stream |
183+
184+
185+
**Schema:** Schema of the data to write.
186+
The 'avro' and 'parquet' formats require a schema but other formats do not.
187+

icons/Dataplex-batchsink.png

2.93 KB
Loading

0 commit comments

Comments
 (0)