Skip to content

Commit 1f3416a

Browse files
PLUGIN-1939: Adding nullable split support in DB plugins.
1 parent 91dd191 commit 1f3416a

File tree

2 files changed

+144
-0
lines changed

2 files changed

+144
-0
lines changed

database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616

1717
package io.cdap.plugin.db.source;
1818

19+
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.base.Strings;
1921
import com.google.common.base.Throwables;
22+
import com.google.common.collect.ImmutableList;
2023
import io.cdap.plugin.db.ConnectionConfigAccessor;
2124
import io.cdap.plugin.db.JDBCDriverShim;
2225
import io.cdap.plugin.db.NoOpCommitConnection;
2326
import io.cdap.plugin.db.TransactionIsolationLevel;
2427
import io.cdap.plugin.util.DBUtils;
2528
import org.apache.hadoop.conf.Configuration;
2629
import org.apache.hadoop.mapreduce.InputSplit;
30+
import org.apache.hadoop.mapreduce.JobContext;
2731
import org.apache.hadoop.mapreduce.RecordReader;
2832
import org.apache.hadoop.mapreduce.TaskAttemptContext;
2933
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -41,6 +45,7 @@
4145
import java.sql.SQLException;
4246
import java.sql.Statement;
4347
import java.sql.Types;
48+
import java.util.List;
4449
import java.util.Properties;
4550

4651
/**
@@ -64,6 +69,49 @@ public static void setInput(Configuration conf,
6469
new ConnectionConfigAccessor(conf).setAutoCommitEnabled(enableAutoCommit);
6570
}
6671

72+
/**
73+
* A wrapper around the superclass getSplits method. This is required for testing,
74+
* as mocking a method that calls its superclass implementation is challenging.
75+
* This wrapper allows unit tests to override this method and mock the behavior of
76+
* the superclass call, isolating the logic within the overridden getSplits method.
77+
*
78+
* @param job The job context.
79+
* @return The list of input splits generated by the parent class.
80+
* @throws IOException If an error occurs while getting the splits.
81+
*/
82+
@VisibleForTesting
83+
List<InputSplit> getBaseSplits(JobContext job) throws IOException {
84+
return super.getSplits(job);
85+
}
86+
87+
@Override
88+
public List<InputSplit> getSplits(JobContext job) throws IOException {
89+
List<InputSplit> splits = getBaseSplits(job);
90+
91+
// Check if a split for NULL values or an all-encompassing split ("1=1") already exists.
92+
boolean shouldAddNullSplit = splits.stream()
93+
.map(DataDrivenDBInputFormat.DataDrivenDBInputSplit.class::cast)
94+
.map(DataDrivenDBInputFormat.DataDrivenDBInputSplit::getLowerClause)
95+
.noneMatch(lowerClause ->
96+
!Strings.isNullOrEmpty(lowerClause)
97+
&& (lowerClause.contains("IS NULL") || lowerClause.contains("1=1")));
98+
99+
if (shouldAddNullSplit) {
100+
// Create a dedicated split to handle potential NULL values in the split-by column.
101+
String splitByColumn = getDBConf().getInputOrderBy();
102+
String clause = String.format("%s IS NULL", splitByColumn);
103+
InputSplit nullSplit =
104+
new DataDrivenDBInputFormat.DataDrivenDBInputSplit(clause, clause);
105+
106+
return ImmutableList.<InputSplit>builder()
107+
.addAll(splits)
108+
.add(nullSplit)
109+
.build();
110+
}
111+
112+
return splits;
113+
}
114+
67115
@Override
68116
public Connection getConnection() {
69117
if (this.connection == null) {
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright © 2019 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.source;
18+
19+
import com.google.common.collect.ImmutableList;
20+
import org.apache.hadoop.mapreduce.InputSplit;
21+
import org.apache.hadoop.mapreduce.JobContext;
22+
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
23+
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
24+
import org.junit.Assert;
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
import org.junit.runner.RunWith;
28+
import org.mockito.Mock;
29+
import org.mockito.Mockito;
30+
import org.mockito.runners.MockitoJUnitRunner;
31+
32+
import java.io.IOException;
33+
import java.util.List;
34+
35+
@RunWith(MockitoJUnitRunner.class)
36+
public class DataDrivenETLDBInputFormatTest {
37+
38+
@Mock
39+
private JobContext mockJobContext;
40+
@Mock
41+
private DBConfiguration mockDbConfiguration;
42+
43+
private DataDrivenETLDBInputFormat inputFormat;
44+
45+
@Before
46+
public void setUp() {
47+
inputFormat = Mockito.spy(new DataDrivenETLDBInputFormat());
48+
Mockito.doReturn(mockDbConfiguration).when(inputFormat).getDBConf();
49+
Mockito.doReturn("id").when(mockDbConfiguration).getInputOrderBy();
50+
}
51+
52+
@Test
53+
public void testGetSplitsAddsNullSplit() throws IOException {
54+
DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
55+
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id >= 0", "id < 100");
56+
List<InputSplit> initialSplits = ImmutableList.of(existingSplit);
57+
Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);
58+
59+
List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);
60+
61+
Assert.assertEquals("A new split for NULLs should be added", 2, finalSplits.size());
62+
63+
DataDrivenDBInputFormat.DataDrivenDBInputSplit nullSplit =
64+
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(1);
65+
Assert.assertEquals("id IS NULL", nullSplit.getLowerClause());
66+
Assert.assertEquals("id IS NULL", nullSplit.getUpperClause());
67+
}
68+
69+
@Test
70+
public void testGetSplitsDoesNotAddNullSplitIfPresent() throws IOException {
71+
DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
72+
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id >= 0", "id < 100");
73+
DataDrivenDBInputFormat.DataDrivenDBInputSplit nullSplit =
74+
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id IS NULL", "id IS NULL");
75+
List<InputSplit> initialSplits = ImmutableList.of(existingSplit, nullSplit);
76+
77+
Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);
78+
79+
List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);
80+
81+
Assert.assertEquals("Should not add a duplicate NULL split", 2, finalSplits.size());
82+
}
83+
84+
@Test
85+
public void testGetSplitsDoesNotAddNullSplitIfSelectAllPresent() throws IOException {
86+
DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
87+
new DataDrivenDBInputFormat.DataDrivenDBInputSplit("1=1", "1=1");
88+
List<InputSplit> initialSplits = ImmutableList.of(existingSplit);
89+
90+
Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);
91+
92+
List<InputSplit> finalSplits = inputFormat.getSplits(mockJobContext);
93+
94+
Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size());
95+
}
96+
}

0 commit comments

Comments
 (0)