Skip to content

Commit 4535ff9

Browse files
author
Ulada Butsenka
committed
Add filter parameter to Iceberg read configuration- Added filter (str) parameter to ReadFromIceberg transform in IcebergToPostgres.yaml- Updated IcebergToPostgresYaml.java interface with filter parameter between drop and keep- Adjusted order numbers for subsequent parameters- Added filter parameter to iceberg_options.yaml with example
1 parent e1eee9a commit 4535ff9

File tree

4 files changed

+35
-22
lines changed

4 files changed

+35
-22
lines changed

yaml/src/main/java/com/google/cloud/teleport/templates/yaml/IcebergToPostgresYaml.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,16 @@ public interface IcebergToPostgresYaml {
9999

100100
@TemplateParameter.Text(
101101
order = 6,
102+
name = "filter",
103+
optional = true,
104+
description =
105+
"An optional filter expression to apply to the input records from the Iceberg table.",
106+
helpText = "A filter expression to apply to records from the Iceberg table.",
107+
example = "age > 18")
108+
String getFilter();
109+
110+
@TemplateParameter.Text(
111+
order = 7,
102112
name = "keep",
103113
optional = true,
104114
description =
@@ -109,7 +119,7 @@ public interface IcebergToPostgresYaml {
109119
String getKeep();
110120

111121
@TemplateParameter.Text(
112-
order = 7,
122+
order = 8,
113123
name = "jdbcUrl",
114124
optional = false,
115125
description = "Connection URL for the JDBC source/sink.",
@@ -119,7 +129,7 @@ public interface IcebergToPostgresYaml {
119129
String getJdbcUrl();
120130

121131
@TemplateParameter.Text(
122-
order = 8,
132+
order = 9,
123133
name = "username",
124134
optional = true,
125135
description = "Username for the JDBC connection.",
@@ -128,7 +138,7 @@ public interface IcebergToPostgresYaml {
128138
String getUsername();
129139

130140
@TemplateParameter.Password(
131-
order = 9,
141+
order = 10,
132142
name = "password",
133143
optional = true,
134144
description = "Password for the JDBC connection.",
@@ -137,7 +147,7 @@ public interface IcebergToPostgresYaml {
137147
String getPassword();
138148

139149
@TemplateParameter.Text(
140-
order = 10,
150+
order = 11,
141151
name = "driverClassName",
142152
optional = true,
143153
description =
@@ -148,7 +158,7 @@ public interface IcebergToPostgresYaml {
148158
String getDriverClassName();
149159

150160
@TemplateParameter.Text(
151-
order = 11,
161+
order = 12,
152162
name = "driverJars",
153163
optional = true,
154164
description = "Comma-separated GCS paths of the JDBC driver jars.",
@@ -157,7 +167,7 @@ public interface IcebergToPostgresYaml {
157167
String getDriverJars();
158168

159169
@TemplateParameter.Text(
160-
order = 12,
170+
order = 13,
161171
name = "connectionProperties",
162172
optional = true,
163173
description = "JDBC connection properties.",
@@ -166,7 +176,7 @@ public interface IcebergToPostgresYaml {
166176
String getConnectionProperties();
167177

168178
@TemplateParameter.Text(
169-
order = 13,
179+
order = 14,
170180
name = "connectionInitSql",
171181
optional = true,
172182
description = "A list of SQL statements to execute upon connection initialization.",
@@ -175,7 +185,7 @@ public interface IcebergToPostgresYaml {
175185
String getConnectionInitSql();
176186

177187
@TemplateParameter.Text(
178-
order = 14,
188+
order = 15,
179189
name = "jdbcType",
180190
optional = true,
181191
description = "Type of JDBC source. Default: postgres.",
@@ -186,7 +196,7 @@ public interface IcebergToPostgresYaml {
186196
String getJdbcType();
187197

188198
@TemplateParameter.Text(
189-
order = 15,
199+
order = 16,
190200
name = "location",
191201
optional = false,
192202
description = "The name of the table to write to.",
@@ -195,7 +205,7 @@ public interface IcebergToPostgresYaml {
195205
String getLocation();
196206

197207
@TemplateParameter.Text(
198-
order = 16,
208+
order = 17,
199209
name = "writeStatement",
200210
optional = true,
201211
description = "The SQL statement to use for inserting records.",
@@ -204,7 +214,7 @@ public interface IcebergToPostgresYaml {
204214
String getWriteStatement();
205215

206216
@TemplateParameter.Integer(
207-
order = 17,
217+
order = 18,
208218
name = "batchSize",
209219
optional = true,
210220
description = "The number of records to group for each write operation.",
@@ -213,7 +223,7 @@ public interface IcebergToPostgresYaml {
213223
Integer getBatchSize();
214224

215225
@TemplateParameter.Boolean(
216-
order = 18,
226+
order = 19,
217227
name = "autosharding",
218228
optional = true,
219229
description = "If true, enables using a dynamically determined number of shards to write.",

yaml/src/main/python/options/iceberg_options.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ options:
3737
required: false
3838
type: text
3939
- order: 6
40+
name: "filter"
41+
description: "An optional filter expression to apply to the input records."
42+
help: "A filter expression to apply to records from the Iceberg table."
43+
example: "age > 18"
44+
required: false
45+
type: text
46+
- order: 7
4047
name: "keep"
4148
description: "A list of field names to keep in the input record."
4249
help: "A list of field names to keep. Mutually exclusive with 'drop' and 'only'."

yaml/src/main/yaml/IcebergToPostgres.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pipeline:
4040
catalog_properties: {{ catalogProperties }}
4141
config_properties: {{ configProperties }}
4242
drop: {{ drop }}
43+
filter: {{ filter }}
4344
keep: {{ keep }}
4445

4546
- type: WriteToPostgres

yaml/src/test/java/com/google/cloud/teleport/templates/yaml/IcebergToPostgresYamlIT.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,16 @@ public class IcebergToPostgresYamlIT extends TemplateTestBase {
5656
private static final Logger LOG = LoggerFactory.getLogger(IcebergToPostgresYamlIT.class);
5757

5858
// Iceberg Setup
59-
private static final String CATALOG_NAME = "hadoop_catalog";
59+
private static final String CATALOG_NAME = "gcs_catalog";
6060
private static final String NAMESPACE = "iceberg_namespace";
6161
private static final String ICEBERG_TABLE_NAME = "source_table";
6262
private static final String ICEBERG_TABLE_IDENTIFIER = NAMESPACE + "." + ICEBERG_TABLE_NAME;
6363

6464
// Postgres Setup
6565
private static final String POSTGRES_TABLE_NAME = "target_table";
66-
private static final String WRITE_STATEMENT =
67-
"INSERT INTO " + POSTGRES_TABLE_NAME + " (id, name, active) VALUES (?, ?, ?)";
66+
67+
// private static final String WRITE_STATEMENT =
68+
// "INSERT INTO " + POSTGRES_TABLE_NAME + " (id, name, active) VALUES (?, ?, ?)";
6869

6970
private PostgresResourceManager postgresResourceManager;
7071
private IcebergResourceManager icebergResourceManager;
@@ -135,6 +136,7 @@ public void testIcebergToPostgres() throws IOException {
135136
.addParameter("jdbcUrl", postgresResourceManager.getUri())
136137
.addParameter("username", postgresResourceManager.getUsername())
137138
.addParameter("password", postgresResourceManager.getPassword())
139+
// .addParameter("writeStatement", WRITE_STATEMENT)
138140
.addParameter("location", POSTGRES_TABLE_NAME);
139141

140142
// Act
@@ -188,13 +190,6 @@ protected PipelineOperator.Config createConfig(LaunchInfo info) {
188190
.build();
189191
}
190192

191-
/**
192-
* Build catalog properties for Iceberg warehouse backed by GCS.
193-
*
194-
* <p>Uses BigLake REST catalog for Iceberg metadata and GCS for warehouse storage.
195-
*
196-
* @return Map of catalog properties required by Iceberg
197-
*/
198193
private Map<String, String> getCatalogProperties() {
199194
return Map.of(
200195
"type", "rest",

0 commit comments

Comments
 (0)