Skip to content

Commit f51420e

Browse files
committed
Last-ditch optimization
1 parent bb418e4 commit f51420e

16 files changed

+421
-273
lines changed

build.gradle

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ ext {
2222

2323
avroVersion = '1.8.2'
2424
jacksonVersion = '2.9.6'
25-
hadoopVersion = '2.7.6'
25+
hadoopVersion = '3.0.3'
2626
jCommanderVersion = '1.72'
27-
27+
almworksVersion = '1.1.1'
2828
}
2929

3030
repositories {
@@ -36,11 +36,12 @@ dependencies {
3636
implementation group: 'com.fasterxml.jackson.core' , name: 'jackson-databind', version: jacksonVersion
3737
implementation group: 'com.fasterxml.jackson.dataformat' , name: 'jackson-dataformat-csv', version: jacksonVersion
3838
implementation group: 'com.beust', name: 'jcommander', version: jCommanderVersion
39+
implementation group: 'com.almworks.integers', name: 'integers', version: almworksVersion
3940

4041
implementation group: 'org.apache.avro', name: 'avro-mapred', version: avroVersion
4142
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: hadoopVersion
4243

43-
runtimeOnly group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: hadoopVersion
44+
runtimeOnly group: 'org.apache.hadoop', name: 'hadoop-hdfs-client', version: hadoopVersion
4445

4546
testImplementation group: 'junit', name: 'junit', version: '4.12'
4647
}

src/main/java/org/radarcns/hdfs/RadarHdfsRestructure.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
import org.radarcns.hdfs.accounting.Bin;
3030
import org.radarcns.hdfs.accounting.OffsetRange;
3131
import org.radarcns.hdfs.accounting.OffsetRangeSet;
32+
import org.radarcns.hdfs.accounting.TopicPartition;
3233
import org.radarcns.hdfs.data.FileCacheStore;
3334
import org.radarcns.hdfs.util.ProgressBar;
35+
import org.radarcns.hdfs.util.ReadOnlyFunctionalValue;
3436
import org.radarcns.hdfs.util.Timer;
3537
import org.slf4j.Logger;
3638
import org.slf4j.LoggerFactory;
@@ -111,12 +113,9 @@ public void start(String directoryName) throws IOException {
111113

112114
private TopicFileList getTopicPaths(FileSystem fs, Path path, OffsetRangeSet seenFiles) {
113115
return new TopicFileList(walk(fs, path)
114-
.filter(f -> {
115-
String name = f.getName();
116-
return name.endsWith(".avro")
117-
&& !seenFiles.contains(OffsetRange.parseFilename(name));
118-
})
116+
.filter(f -> f.getName().endsWith(".avro"))
119117
.map(f -> new TopicFile(f.getParent().getParent().getName(), f))
118+
.filter(f -> !seenFiles.contains(f.range))
120119
.collect(Collectors.toList()));
121120
}
122121

@@ -147,6 +146,8 @@ private void processPaths(TopicFileList topicPaths, Accountant accountant) throw
147146

148147
processedFileCount = new LongAdder();
149148
processedRecordsCount = new LongAdder();
149+
OffsetRangeSet seenOffsets = accountant.getOffsets()
150+
.withFactory(ReadOnlyFunctionalValue::new);
150151

151152
ExecutorService executor = Executors.newWorkStealingPool(pathFactory.isTopicPartitioned() ? this.numThreads : 1);
152153

@@ -168,7 +169,7 @@ private void processPaths(TopicFileList topicPaths, Accountant accountant) throw
168169
try (FileCacheStore cache = fileStoreFactory.newFileCacheStore(accountant)) {
169170
for (TopicFile file : paths.files) {
170171
try {
171-
this.processFile(file, cache, progressBar);
172+
this.processFile(file, cache, progressBar, seenOffsets);
172173
} catch (JsonMappingException exc) {
173174
logger.error("Cannot map values", exc);
174175
}
@@ -196,7 +197,7 @@ private void processPaths(TopicFileList topicPaths, Accountant accountant) throw
196197
}
197198

198199
private void processFile(TopicFile file, FileCacheStore cache,
199-
ProgressBar progressBar) throws IOException {
200+
ProgressBar progressBar, OffsetRangeSet seenOffsets) throws IOException {
200201
logger.debug("Reading {}", file.path);
201202

202203
// Read and parseFilename avro file
@@ -216,45 +217,49 @@ private void processFile(TopicFile file, FileCacheStore cache,
216217
new GenericDatumReader<>());
217218

218219
GenericRecord record = null;
219-
int i = 0;
220+
long offset = file.range.getOffsetFrom();
220221
while (dataFileReader.hasNext()) {
221222
record = dataFileReader.next(record);
222223
timer.add("read", timeRead);
223224

224225
long timeAccount = System.nanoTime();
225-
OffsetRange singleOffset = file.range.createSingleOffset(i++);
226+
boolean alreadyContains = seenOffsets.contains(file.range.getTopicPartition(), offset);
226227
timer.add("accounting.create", timeAccount);
227-
// Get the fields
228-
this.writeRecord(record, file.topic, cache, singleOffset, 0);
229-
228+
if (!alreadyContains) {
229+
// Get the fields
230+
this.writeRecord(file.range.getTopicPartition(), record, cache, offset, 0);
231+
}
230232
processedRecordsCount.increment();
231233
progressBar.update(processedRecordsCount.sum());
234+
235+
offset++;
232236
timeRead = System.nanoTime();
233237
}
234238
}
235239

236-
private void writeRecord(GenericRecord record, String topicName, FileCacheStore cache,
237-
OffsetRange offset, int suffix) throws IOException {
238-
RecordPathFactory.RecordOrganization metadata = pathFactory.getRecordOrganization(topicName, record, suffix);
240+
private void writeRecord(TopicPartition topicPartition, GenericRecord record,
241+
FileCacheStore cache, long offset, int suffix) throws IOException {
242+
RecordPathFactory.RecordOrganization metadata = pathFactory.getRecordOrganization(
243+
topicPartition.topic, record, suffix);
239244

240245
Timer timer = Timer.getInstance();
241246

242247
long timeAccount = System.nanoTime();
243248
String timeBin = pathFactory.getTimeBin(metadata.getTime());
244-
Accountant.Transaction transaction = new Accountant.Transaction(offset,
245-
new Bin(topicName, metadata.getCategory(), timeBin));
249+
Accountant.Transaction transaction = new Accountant.Transaction(topicPartition, offset,
250+
new Bin(topicPartition.topic, metadata.getCategory(), timeBin));
246251
timer.add("accounting.create", timeAccount);
247252

248253
// Write data
249254
long timeWrite = System.nanoTime();
250255
FileCacheStore.WriteResponse response = cache.writeRecord(
251-
topicName, metadata.getPath(), record, transaction);
256+
metadata.getPath(), record, transaction);
252257
timer.add("write", timeWrite);
253258

254259
if (!response.isSuccessful()) {
255260
// Write was unsuccessful due to different number of columns,
256261
// try again with new file name
257-
writeRecord(record, topicName, cache, offset, ++suffix);
262+
writeRecord(topicPartition, record, cache, offset, ++suffix);
258263
}
259264
}
260265

src/main/java/org/radarcns/hdfs/accounting/Accountant.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import org.radarcns.hdfs.FileStoreFactory;
44
import org.radarcns.hdfs.config.RestructureSettings;
55
import org.radarcns.hdfs.data.StorageDriver;
6-
import org.radarcns.hdfs.util.TemporaryDirectory;
76
import org.radarcns.hdfs.util.DirectFunctionalValue;
7+
import org.radarcns.hdfs.util.TemporaryDirectory;
88
import org.radarcns.hdfs.util.Timer;
99
import org.slf4j.Logger;
1010
import org.slf4j.LoggerFactory;
@@ -16,7 +16,6 @@
1616
import java.nio.file.Paths;
1717
import java.util.HashMap;
1818
import java.util.Map;
19-
import java.util.TreeSet;
2019

2120
public class Accountant implements Flushable, Closeable {
2221
private static final Logger logger = LoggerFactory.getLogger(Accountant.class);
@@ -80,25 +79,14 @@ public OffsetRangeSet getOffsets() {
8079
@Override
8180
public void flush() throws IOException {
8281
long timeFlush = System.nanoTime();
83-
IOException exception = null;
84-
try {
85-
binFile.flush();
86-
} catch (IOException ex) {
87-
logger.error("Failed to close bins", ex);
88-
exception = ex;
89-
}
82+
83+
binFile.triggerWrite();
9084

9185
try {
9286
offsetFile.flush();
93-
} catch (IOException ex) {
94-
logger.error("Failed to close offsets", ex);
95-
exception = ex;
87+
} finally {
88+
Timer.getInstance().add("accounting.flush", timeFlush);
9689
}
97-
98-
if (exception != null) {
99-
throw exception;
100-
}
101-
Timer.getInstance().add("accounting.flush", timeFlush);
10290
}
10391

10492
public BinFile getBins() {
@@ -110,25 +98,31 @@ public static class Ledger {
11098
private final Map<Bin, Long> bins;
11199

112100
public Ledger() {
113-
offsets = new OffsetRangeSet(() -> new DirectFunctionalValue<>(new TreeSet<>()));
101+
offsets = new OffsetRangeSet(DirectFunctionalValue::new);
114102
bins = new HashMap<>();
115103
}
116104

117105
public void add(Transaction transaction) {
118106
long timeAdd = System.nanoTime();
119-
offsets.add(transaction.offset);
107+
offsets.add(transaction.topicPartition, transaction.offset);
120108
bins.compute(transaction.bin, (b, vOld) -> vOld == null ? 1L : vOld + 1L);
121109
Timer.getInstance().add("accounting.add", timeAdd);
122110
}
123111
}
124112

125113
public static class Transaction {
126-
private final OffsetRange offset;
114+
private final TopicPartition topicPartition;
115+
private final long offset;
127116
private final Bin bin;
128117

129-
public Transaction(OffsetRange offset, Bin bin) {
118+
public Transaction(TopicPartition topicPartition, long offset, Bin bin) {
119+
this.topicPartition = topicPartition;
130120
this.offset = offset;
131121
this.bin = bin;
132122
}
123+
124+
public TopicPartition getTopicPartition() {
125+
return topicPartition;
126+
}
133127
}
134128
}

src/main/java/org/radarcns/hdfs/accounting/OffsetRange.java

Lines changed: 19 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,11 @@
1616

1717
package org.radarcns.hdfs.accounting;
1818

19-
import com.fasterxml.jackson.annotation.JsonCreator;
20-
import com.fasterxml.jackson.annotation.JsonProperty;
21-
22-
import javax.annotation.Nonnull;
23-
2419
/** POJO class for storing offsets. */
25-
public class OffsetRange implements Comparable<OffsetRange> {
26-
private final String topic;
27-
private final int partition;
28-
private long offsetFrom;
29-
private long offsetTo;
20+
public class OffsetRange {
21+
private final TopicPartition topicPartition;
22+
private final long offsetFrom;
23+
private final long offsetTo;
3024

3125
public static OffsetRange parseFilename(String filename) throws NumberFormatException, IndexOutOfBoundsException {
3226
String[] fileNameParts = filename.split("[+.]");
@@ -39,61 +33,50 @@ public static OffsetRange parseFilename(String filename) throws NumberFormatExce
3933
}
4034

4135
/** Full constructor. */
42-
@JsonCreator
43-
public OffsetRange(
44-
@JsonProperty("topic") String topic,
45-
@JsonProperty("partition") int partition,
46-
@JsonProperty("offsetFrom") long offsetFrom,
47-
@JsonProperty("offsetTo") long offsetTo) {
48-
this.topic = topic;
49-
this.partition = partition;
36+
public OffsetRange(String topic, int partition, long offsetFrom, long offsetTo) {
37+
this(new TopicPartition(topic, partition), offsetFrom, offsetTo);
38+
}
39+
40+
public OffsetRange(TopicPartition topicPartition, long offsetFrom, long offsetTo) {
41+
this.topicPartition = topicPartition;
5042
this.offsetFrom = offsetFrom;
5143
this.offsetTo = offsetTo;
5244
}
5345

5446
public String getTopic() {
55-
return topic;
47+
return topicPartition.topic;
5648
}
5749

5850
public int getPartition() {
59-
return partition;
51+
return topicPartition.partition;
6052
}
6153

6254
public long getOffsetFrom() {
6355
return offsetFrom;
6456
}
6557

66-
public void setOffsetFrom(long offsetFrom) {
67-
this.offsetFrom = offsetFrom;
68-
}
69-
7058
public long getOffsetTo() {
7159
return offsetTo;
7260
}
7361

74-
public void setOffsetTo(long offsetTo) {
75-
this.offsetTo = offsetTo;
76-
}
77-
7862
public OffsetRange createSingleOffset(int index) {
7963
if (index < 0 || index > offsetTo - offsetFrom) {
8064
throw new IndexOutOfBoundsException("Index " + index + " does not reference offsets "
8165
+ offsetFrom + " to " + offsetTo);
8266
}
8367

8468
long singleOffset = offsetFrom + index;
85-
return new OffsetRange(topic, partition, singleOffset, singleOffset);
69+
return new OffsetRange(topicPartition, singleOffset, singleOffset);
8670
}
8771

8872
@Override
8973
public String toString() {
90-
return topic + '+' + partition + '+' + offsetFrom + '+' + offsetTo;
74+
return topicPartition.topic + '+' + topicPartition.partition + '+' + offsetFrom + '+' + offsetTo;
9175
}
9276

9377
@Override
9478
public int hashCode() {
95-
int result = topic.hashCode();
96-
result = 31 * result + partition;
79+
int result = topicPartition.hashCode();
9780
result = 31 * result + (int) (offsetFrom ^ (offsetFrom >>> 32));
9881
result = 31 * result + (int) (offsetTo ^ (offsetTo >>> 32));
9982
return result;
@@ -105,26 +88,13 @@ public boolean equals(Object o) {
10588
return false;
10689
}
10790
OffsetRange other = (OffsetRange) o;
108-
return topic.equals(other.topic)
109-
&& partition == other.partition
91+
return topicPartition.equals(other.topicPartition)
11092
&& offsetFrom == other.offsetFrom
11193
&& offsetTo == other.offsetTo;
11294
}
11395

114-
@Override
115-
public int compareTo(@Nonnull OffsetRange o) {
116-
int ret = Long.compare(offsetFrom, o.offsetFrom);
117-
if (ret != 0) {
118-
return ret;
119-
}
120-
ret = Long.compare(offsetTo, o.offsetTo);
121-
if (ret != 0) {
122-
return ret;
123-
}
124-
ret = topic.compareTo(o.topic);
125-
if (ret != 0) {
126-
return ret;
127-
}
128-
return Integer.compare(partition, o.partition);
96+
public TopicPartition getTopicPartition() {
97+
return topicPartition;
12998
}
99+
130100
}

0 commit comments

Comments
 (0)