Skip to content

Commit 6671cf7

Browse files
committed
Update BQ Pushdown joins to only execute on 2 inputs. 3 input joins will be executed in Spark
1 parent fad4586 commit 6671cf7

File tree

3 files changed

+147
-0
lines changed

3 files changed

+147
-0
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,11 @@ protected static boolean isValidJoinDefinition(SQLJoinDefinition sqlJoinDefiniti
215215
validationProblems);
216216
}
217217

218+
// Validate join stages for join on keys
219+
if (joinDefinition.getCondition().getOp() == JoinCondition.Op.KEY_EQUALITY) {
220+
BigQuerySQLEngineUtils.validateJoinOnKeyStages(joinDefinition, validationProblems);
221+
}
222+
218223
if (!validationProblems.isEmpty()) {
219224
LOG.warn("Join operation for stage '{}' could not be executed in BigQuery. Issues found: {}.",
220225
sqlJoinDefinition.getDatasetName(),

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.cdap.cdap.api.data.schema.Schema;
2626
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
2727
import io.cdap.cdap.etl.api.join.JoinCondition;
28+
import io.cdap.cdap.etl.api.join.JoinDefinition;
2829
import io.cdap.cdap.etl.api.join.JoinStage;
2930
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
3031
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngineConfig;
@@ -214,6 +215,35 @@ public static void validateOnExpressionJoinCondition(JoinCondition.OnExpression
214215
}
215216
}
216217

218+
/**
219+
* Validates stages for a Join on Key operation
220+
*
221+
* TODO: Update logic once BQ SQL engine joins support multiple outer join tables
222+
*
223+
* @param joinDefinition Join Definition to validate
224+
* @param validationProblems List of validation problems to use to append messages
225+
*/
226+
public static void validateJoinOnKeyStages(JoinDefinition joinDefinition, List<String> validationProblems) {
227+
// 2 stages are not an issue
228+
if (joinDefinition.getStages().size() < 3) {
229+
return;
230+
}
231+
232+
// For 3 or more stages, we only support inner joins.
233+
boolean isInnerJoin = true;
234+
235+
// If any of the stages is not required, this is an outer join
236+
for (JoinStage stage : joinDefinition.getStages()) {
237+
isInnerJoin &= stage.isRequired();
238+
}
239+
240+
if (!isInnerJoin) {
241+
validationProblems.add(
242+
String.format("Only 2 input stages are supported for outer joins, %d stages supplied.",
243+
joinDefinition.getStages().size()));
244+
}
245+
}
246+
217247
/**
218248
* Ensure the Stage name is valid for execution in BQ pushdown.
219249
* <p>
@@ -230,6 +260,7 @@ public static boolean isValidIdentifier(String identifier) {
230260

231261
/**
232262
* Get tags for BQ Pushdown tags
263+
*
233264
* @param operation the current operation that is being executed
234265
* @return Map containing tags for a job.
235266
*/

src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineTest.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,49 @@ public void testIsValidJoinDefinitionOnKey() {
7575
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
7676
Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));
7777

78+
Schema outputSchema =
79+
Schema.recordOf("Join",
80+
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
81+
Schema.Field.of("from_zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));
82+
83+
// First join is a right join, second join is a left join
84+
JoinStage shipments = JoinStage.builder("Shipments", shipmentSchema).setRequired(true).build();
85+
JoinStage fromAddresses = JoinStage.builder("FromAddress", fromAddressSchema).setRequired(true).build();
86+
87+
// null safe
88+
JoinCondition condition = JoinCondition.onKeys()
89+
.addKey(new JoinKey("Shipments", Arrays.asList("id")))
90+
.addKey(new JoinKey("FromAddress", Arrays.asList("shipment_id")))
91+
.setNullSafe(false)
92+
.build();
93+
94+
JoinDefinition joinDefinition = JoinDefinition.builder()
95+
.select(new JoinField("Shipments", "id", "shipment_id"),
96+
new JoinField("FromAddress", "zip", "from_zip"))
97+
.from(shipments, fromAddresses)
98+
.on(condition)
99+
.setOutputSchemaName("Join")
100+
.setOutputSchema(outputSchema)
101+
.build();
102+
103+
SQLJoinDefinition sqlJoinDefinition = new SQLJoinDefinition("Join", joinDefinition);
104+
105+
Assert.assertTrue(BigQuerySQLEngine.isValidJoinDefinition(sqlJoinDefinition));
106+
verify(logger, times(0)).warn(anyString(), anyString(), anyString());
107+
}
108+
109+
@Test
110+
public void testInnerJoinFor3StagesIsSupported() {
111+
Schema shipmentSchema =
112+
Schema.recordOf("Shipments",
113+
Schema.Field.of("id", Schema.of(Schema.Type.INT)));
114+
115+
Schema fromAddressSchema =
116+
Schema.recordOf("FromAddress",
117+
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
118+
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
119+
Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));
120+
78121
Schema toAddressSchema =
79122
Schema.recordOf("ToAddress",
80123
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
@@ -116,6 +159,74 @@ public void testIsValidJoinDefinitionOnKey() {
116159
verify(logger, times(0)).warn(anyString(), anyString(), anyString());
117160
}
118161

162+
@Test
163+
public void testOuterJoinFor3StagesIsNotSupported() {
164+
ArgumentCaptor<String> messageTemplateCaptor = ArgumentCaptor.forClass(String.class);
165+
ArgumentCaptor<String> stageNameCaptor = ArgumentCaptor.forClass(String.class);
166+
ArgumentCaptor<String> issuesCaptor = ArgumentCaptor.forClass(String.class);
167+
168+
Schema shipmentSchema =
169+
Schema.recordOf("Shipments",
170+
Schema.Field.of("id", Schema.of(Schema.Type.INT)));
171+
172+
Schema fromAddressSchema =
173+
Schema.recordOf("FromAddress",
174+
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
175+
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
176+
Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));
177+
178+
Schema toAddressSchema =
179+
Schema.recordOf("ToAddress",
180+
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
181+
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
182+
Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));
183+
184+
Schema outputSchema =
185+
Schema.recordOf("Join",
186+
Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)),
187+
Schema.Field.of("from_zip", Schema.nullableOf(Schema.of(Schema.Type.INT))),
188+
Schema.Field.of("to_zip", Schema.nullableOf(Schema.of(Schema.Type.INT))));
189+
190+
// First join is a right join, second join is a left join
191+
JoinStage shipments = JoinStage.builder("Shipments", shipmentSchema).setRequired(true).build();
192+
JoinStage fromAddresses = JoinStage.builder("FromAddress", fromAddressSchema).setRequired(true).build();
193+
JoinStage toAddresses = JoinStage.builder("ToAddress", toAddressSchema).setRequired(false).build();
194+
195+
// null safe
196+
JoinCondition condition = JoinCondition.onKeys()
197+
.addKey(new JoinKey("Shipments", Arrays.asList("id")))
198+
.addKey(new JoinKey("FromAddress", Arrays.asList("shipment_id")))
199+
.addKey(new JoinKey("ToAddress", Arrays.asList("shipment_id")))
200+
.setNullSafe(false)
201+
.build();
202+
203+
JoinDefinition joinDefinition = JoinDefinition.builder()
204+
.select(new JoinField("Shipments", "id", "shipment_id"),
205+
new JoinField("FromAddress", "zip", "from_zip"),
206+
new JoinField("ToAddress", "zip", "to_zip"))
207+
.from(shipments, fromAddresses, toAddresses)
208+
.on(condition)
209+
.setOutputSchemaName("Join")
210+
.setOutputSchema(outputSchema)
211+
.build();
212+
213+
SQLJoinDefinition sqlJoinDefinition = new SQLJoinDefinition("Join", joinDefinition);
214+
215+
Assert.assertFalse(BigQuerySQLEngine.isValidJoinDefinition(sqlJoinDefinition));
216+
verify(logger).warn(messageTemplateCaptor.capture(), stageNameCaptor.capture(), issuesCaptor.capture());
217+
218+
String messageTemplate = messageTemplateCaptor.getValue();
219+
Assert.assertTrue(messageTemplate.contains(
220+
"Join operation for stage '{}' could not be executed in BigQuery. Issues found:"));
221+
222+
String stageName = stageNameCaptor.getValue();
223+
Assert.assertEquals("Join", stageName);
224+
225+
String issues = issuesCaptor.getValue();
226+
Assert.assertTrue(issues.contains(
227+
"Only 2 input stages are supported for outer joins, 3 stages supplied."));
228+
}
229+
119230
@Test
120231
public void testIsValidJoinDefinitionOnKeyWithErrors() {
121232
ArgumentCaptor<String> messageTemplateCaptor = ArgumentCaptor.forClass(String.class);

0 commit comments

Comments
 (0)