Skip to content

Commit 66902e5

Browse files
PLUGIN-1939: Adding nullable split support in DB plugins.
1 parent 91dd191 commit 66902e5

File tree

2 files changed

+178
-0
lines changed

2 files changed

+178
-0
lines changed

database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616

1717
package io.cdap.plugin.db.source;
1818

19+
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Strings;
1921
import com.google.common.base.Throwables;
22+
import com.google.common.collect.ImmutableList;
2023
import io.cdap.plugin.db.ConnectionConfigAccessor;
2124
import io.cdap.plugin.db.JDBCDriverShim;
2225
import io.cdap.plugin.db.NoOpCommitConnection;
2326
import io.cdap.plugin.db.TransactionIsolationLevel;
2427
import io.cdap.plugin.util.DBUtils;
2528
import org.apache.hadoop.conf.Configuration;
2629
import org.apache.hadoop.mapreduce.InputSplit;
30+
import org.apache.hadoop.mapreduce.JobContext;
2731
import org.apache.hadoop.mapreduce.RecordReader;
2832
import org.apache.hadoop.mapreduce.TaskAttemptContext;
2933
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -41,6 +45,7 @@
4145
import java.sql.SQLException;
4246
import java.sql.Statement;
4347
import java.sql.Types;
48+
import java.util.List;
4449
import java.util.Properties;
4550

4651
/**
@@ -64,6 +69,58 @@ public static void setInput(Configuration conf,
6469
new ConnectionConfigAccessor(conf).setAutoCommitEnabled(enableAutoCommit);
6570
}
6671

72+
/**
73+
* A wrapper around the superclass getSplits method. This is required for testing,
74+
* as mocking a method that calls its superclass implementation is challenging.
75+
* This wrapper allows unit tests to override this method and mock the behavior of
76+
* the superclass call, isolating the logic within the overridden getSplits method.
77+
*
78+
* @param job The job context.
79+
* @return The list of input splits generated by the parent class.
80+
* @throws IOException If an error occurs while getting the splits.
81+
*/
82+
@VisibleForTesting
83+
List<InputSplit> getBaseSplits(JobContext job) throws IOException {
84+
return super.getSplits(job);
85+
}
86+
87+
@Override
88+
public List<InputSplit> getSplits(JobContext job) throws IOException {
89+
List<InputSplit> splits = getBaseSplits(job);
90+
91+
// Handle the edge case where the base implementation returns no splits. In this scenario,
92+
// create a single, all-encompassing split to ensure the job can proceed.
93+
if (splits == null || splits.isEmpty()) {
94+
return ImmutableList.<InputSplit>builder()
95+
.add(new DataDrivenDBInputSplit("1=1", "1=1"))
96+
.build();
97+
}
98+
99+
// Check if a split for NULL values or an all-encompassing split ("1=1") already exists.
100+
boolean shouldAddNullSplit = splits.stream()
101+
.map(DataDrivenDBInputFormat.DataDrivenDBInputSplit.class::cast)
102+
.map(DataDrivenDBInputFormat.DataDrivenDBInputSplit::getLowerClause)
103+
.noneMatch(lowerClause ->
104+
!Strings.isNullOrEmpty(lowerClause)
105+
&& (lowerClause.contains("IS NULL") || lowerClause.contains("1=1")));
106+
107+
if (shouldAddNullSplit) {
108+
// Create a dedicated split to handle potential NULL values in the split-by column.
109+
String splitByColumn = getDBConf().getInputOrderBy();
110+
LOG.debug("No split found for NULL values. Adding a split for '{} IS NULL'.", splitByColumn);
111+
String clause = String.format("%s IS NULL", splitByColumn);
112+
InputSplit nullSplit =
113+
new DataDrivenDBInputFormat.DataDrivenDBInputSplit(clause, clause);
114+
115+
return ImmutableList.<InputSplit>builder()
116+
.addAll(splits)
117+
.add(nullSplit)
118+
.build();
119+
}
120+
121+
return splits;
122+
}
123+
67124
@Override
68125
public Connection getConnection() {
69126
if (this.connection == null) {
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.source;
18+
19+
import com.google.common.collect.ImmutableList;
20+
import org.apache.hadoop.mapreduce.InputSplit;
21+
import org.apache.hadoop.mapreduce.JobContext;
22+
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
23+
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
24+
import org.junit.Assert;
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
import org.junit.runner.RunWith;
28+
import org.mockito.Mock;
29+
import org.mockito.Mockito;
30+
import org.mockito.runners.MockitoJUnitRunner;
31+
32+
import java.io.IOException;
33+
import java.util.Collections;
34+
import java.util.List;
35+
36+
@RunWith(MockitoJUnitRunner.class)
37+
public class DataDrivenETLDBInputFormatTest {
38+
39+
@Mock
40+
private JobContext mockJobContext;
41+
@Mock
42+
private DBConfiguration mockDbConfiguration;
43+
44+
private DataDrivenETLDBInputFormat inputFormat;
45+
46+
@Before
47+
public void setUp() {
48+
inputFormat = Mockito.spy(new DataDrivenETLDBInputFormat());
49+
Mockito.doReturn(mockDbConfiguration).when(inputFormat).getDBConf();
50+
Mockito.doReturn("id").when(mockDbConfiguration).getInputOrderBy();
51+
}
52+
53+
@Test
54+
public void testGetSplitsAddsNullSplit() throws IOException {
55+
DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
56+
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id >= 0", "id < 100");
57+
List<InputSplit> initialSplits = ImmutableList.of(existingSplit);
58+
Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);
59+
60+
List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);
61+
62+
Assert.assertEquals("A new split for NULLs should be added", 2, finalSplits.size());
63+
64+
DataDrivenDBInputFormat.DataDrivenDBInputSplit nullSplit =
65+
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(1);
66+
Assert.assertEquals("id IS NULL", nullSplit.getLowerClause());
67+
Assert.assertEquals("id IS NULL", nullSplit.getUpperClause());
68+
}
69+
70+
@Test
71+
public void testGetSplitsDoesNotAddNullSplitIfPresent() throws IOException {
72+
DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
73+
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id >= 0", "id < 100");
74+
DataDrivenDBInputFormat.DataDrivenDBInputSplit nullSplit =
75+
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id IS NULL", "id IS NULL");
76+
List<InputSplit> initialSplits = ImmutableList.of(existingSplit, nullSplit);
77+
78+
Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);
79+
80+
List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);
81+
82+
Assert.assertEquals("Should not add a duplicate NULL split", 2, finalSplits.size());
83+
}
84+
85+
@Test
86+
public void testGetSplitsDoesNotAddNullSplitIfSelectAllPresent() throws IOException {
87+
DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
88+
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("1=1", "1=1");
89+
List<InputSplit> initialSplits = ImmutableList.of(existingSplit);
90+
91+
Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);
92+
93+
List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);
94+
95+
Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size());
96+
}
97+
98+
@Test
99+
public void testGetSplitsDoesNotAddNullSplitIfBaseReturnsNull() throws IOException {
100+
Mockito.doReturn(null).when(inputFormat).getBaseSplits(mockJobContext);
101+
102+
List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);
103+
Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size());
104+
105+
DataDrivenDBInputFormat.DataDrivenDBInputSplit split =
106+
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(0);
107+
Assert.assertEquals("1=1", split.getLowerClause());
108+
}
109+
110+
@Test
111+
public void testGetSplitsDoesNotAddNullSplitIfBaseReturnsEmptyList() throws IOException {
112+
Mockito.doReturn(Collections.emptyList()).when(inputFormat).getBaseSplits(mockJobContext);
113+
114+
List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);
115+
Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size());
116+
117+
DataDrivenDBInputFormat.DataDrivenDBInputSplit split =
118+
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(0);
119+
Assert.assertEquals("1=1", split.getLowerClause());
120+
}
121+
}

0 commit comments

Comments
 (0)