Skip to content

Commit 0311ce2

Browse files
committed
proper fix for path-based prefixing
1 parent b59b0f0 commit 0311ce2

File tree

3 files changed

+13
-11
lines changed

3 files changed

+13
-11
lines changed

commons/src/main/java/io/github/pastorgl/datacooker/data/DataContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,9 @@ public ListOrderedMap<String, StreamInfo> createDataStreams(String adapter, Stri
134134
ia.initialize(sparkContext, config, path);
135135

136136
ListOrderedMap<String, StreamInfo> si = new ListOrderedMap<>();
137-
ListOrderedMap<String, DataStream> inputs = ia.load(partCount, partitioning);
137+
ListOrderedMap<String, DataStream> inputs = ia.load(inputName, partCount, partitioning);
138138
for (Map.Entry<String, DataStream> ie : inputs.entrySet()) {
139-
String dsName = ie.getKey().isEmpty() ? inputName : inputName + "/" + ie.getKey();
139+
String dsName = ie.getKey();
140140
if (store.containsKey(dsName)) {
141141
throw new RuntimeException("DS \"" + dsName + "\" requested to CREATE already exists");
142142
}

commons/src/main/java/io/github/pastorgl/datacooker/storage/InputAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@
1010
import org.apache.commons.collections4.map.ListOrderedMap;
1111

1212
public abstract class InputAdapter extends StorageAdapter<InputAdapterMeta> {
13-
public abstract ListOrderedMap<String, DataStream> load(int partCount, Partitioning partitioning) throws Exception;
13+
public abstract ListOrderedMap<String, DataStream> load(String prefix, int partCount, Partitioning partitioning) throws Exception;
1414
}

commons/src/main/java/io/github/pastorgl/datacooker/storage/hadoop/input/HadoopInput.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ protected void configure(Configuration params) throws InvalidConfigurationExcept
4343
}
4444

4545
@Override
46-
public ListOrderedMap<String, DataStream> load(int partCount, Partitioning partitioning) {
46+
public ListOrderedMap<String, DataStream> load(String prefix, int partCount, Partitioning partitioning) {
4747
if (partCount <= 0) {
4848
partCount = numOfExecutors;
4949
}
@@ -84,7 +84,7 @@ public ListOrderedMap<String, DataStream> load(int partCount, Partitioning parti
8484
System.out.println("Discovered " + discoveredFiles.size() + " Hadoop FileSystem file(s):");
8585
discoveredFiles.stream().map(Tuple2::_2).forEach(System.out::println);
8686

87-
ListOrderedMap<String, List<String>> prefixMap = new ListOrderedMap<>();
87+
ListOrderedMap<String, List<String>> subMap = new ListOrderedMap<>();
8888

8989
if (subs) {
9090
for (Tuple2<String, String> file : discoveredFiles) {
@@ -93,15 +93,15 @@ public ListOrderedMap<String, DataStream> load(int partCount, Partitioning parti
9393
prefixLen--;
9494
}
9595

96-
String ds = "";
96+
String sub = "";
9797
int p = file._2.substring(prefixLen).indexOf("/");
9898
if (p != -1) {
9999
int l = file._2.substring(prefixLen).lastIndexOf("/");
100100
if (l != p) {
101-
ds = file._2.substring(p + 1, l);
101+
sub = file._2.substring(p + 1, l);
102102
}
103103
}
104-
prefixMap.compute(ds, (k, v) -> {
104+
subMap.compute(sub, (k, v) -> {
105105
if (v == null) {
106106
v = new ArrayList<>();
107107
}
@@ -110,11 +110,11 @@ public ListOrderedMap<String, DataStream> load(int partCount, Partitioning parti
110110
});
111111
}
112112
} else {
113-
prefixMap.put("", discoveredFiles.stream().map(Tuple2::_2).collect(Collectors.toList()));
113+
subMap.put("", discoveredFiles.stream().map(Tuple2::_2).collect(Collectors.toList()));
114114
}
115115

116116
ListOrderedMap<String, DataStream> ret = new ListOrderedMap<>();
117-
for (Map.Entry<String, List<String>> ds : prefixMap.entrySet()) {
117+
for (Map.Entry<String, List<String>> ds : subMap.entrySet()) {
118118
List<String> files = ds.getValue();
119119

120120
int groupSize = files.size() / partCount;
@@ -125,7 +125,9 @@ public ListOrderedMap<String, DataStream> load(int partCount, Partitioning parti
125125
List<List<String>> partNum = new ArrayList<>();
126126
Lists.partition(files, groupSize).forEach(p -> partNum.add(new ArrayList<>(p)));
127127

128-
ret.put(ds.getKey(), callForFiles(ds.getKey(), partCount, partNum, partitioning));
128+
String sub = ds.getKey();
129+
String name = sub.isEmpty() ? prefix : prefix + "/" + sub;
130+
ret.put(name, callForFiles(name, partCount, partNum, partitioning));
129131
}
130132

131133
return ret;

0 commit comments

Comments
 (0)