Skip to content

Commit 56910f2

Browse files
Create new file if different columns for same topic
1 parent fe1971e commit 56910f2

File tree

7 files changed

+75
-34
lines changed

7 files changed

+75
-34
lines changed

src/main/java/org/radarcns/RestructureAvroRecords.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ private void processFile(Path filePath, String topicName, FileCacheStore cache,
240240
record = dataFileReader.next(record);
241241

242242
// Get the fields
243-
this.writeRecord(record, topicName, cache);
243+
this.writeRecord(record, topicName, cache, 0);
244244
}
245245

246246
// Write which file has been processed and update bins
@@ -253,7 +253,7 @@ record = dataFileReader.next(record);
253253
}
254254
}
255255

256-
private void writeRecord(GenericRecord record, String topicName, FileCacheStore cache)
256+
private void writeRecord(GenericRecord record, String topicName, FileCacheStore cache, int suffix)
257257
throws IOException {
258258
GenericRecord keyField = (GenericRecord) record.get("key");
259259
GenericRecord valueField = (GenericRecord) record.get("value");
@@ -264,7 +264,7 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
264264
}
265265

266266
Date time = getDate(keyField, valueField);
267-
java.nio.file.Path outputFileName = createFilename(time);
267+
java.nio.file.Path outputFileName = createFilename(time, suffix);
268268

269269
String projectId;
270270

@@ -284,28 +284,43 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
284284
java.nio.file.Path outputPath = userTopicDir.resolve(outputFileName);
285285

286286
// Write data
287-
cache.writeRecord(outputPath, record);
287+
int response = cache.writeRecord(outputPath, record);
288288

289-
java.nio.file.Path schemaPath = userTopicDir.resolve(SCHEMA_OUTPUT_FILE_NAME);
290-
if (!Files.exists(schemaPath)) {
291-
try (Writer writer = Files.newBufferedWriter(schemaPath)) {
292-
writer.write(record.getSchema().toString(true));
289+
if (response == FileCacheStore.CACHE_AND_NO_WRITE) {
290+
// Write was unsuccessful due to different number of columns,
291+
// try again with new file name
292+
writeRecord(record, topicName, cache, ++suffix);
293+
} else {
294+
// Write was successful, finalize the write
295+
java.nio.file.Path schemaPath = userTopicDir.resolve(SCHEMA_OUTPUT_FILE_NAME);
296+
if (!Files.exists(schemaPath)) {
297+
try (Writer writer = Files.newBufferedWriter(schemaPath)) {
298+
writer.write(record.getSchema().toString(true));
299+
}
293300
}
294-
}
295301

296-
// Count data (binned and total)
297-
bins.add(topicName, keyField.get("sourceId").toString(), time);
298-
processedRecordsCount++;
302+
// Count data (binned and total)
303+
bins.add(topicName, keyField.get("sourceId").toString(), time);
304+
processedRecordsCount++;
305+
}
299306
}
300307

301-
private java.nio.file.Path createFilename(Date date) {
308+
private java.nio.file.Path createFilename(Date date, int suffix) {
302309
if (date == null) {
303310
logger.warn("Time field of record valueField is not set");
304311
return Paths.get("unknown_date." + outputFileExtension);
305312
}
313+
314+
String finalSuffix;
315+
if(suffix == 0) {
316+
finalSuffix = "";
317+
} else {
318+
finalSuffix = "_" + suffix;
319+
}
320+
306321
// Make a timestamped filename YYYYMMDD_HH00.json
307322
String hourlyTimestamp = createHourTimestamp(date);
308-
return Paths.get(hourlyTimestamp + "00." + outputFileExtension);
323+
return Paths.get(hourlyTimestamp + "00" + finalSuffix +"." + outputFileExtension);
309324
}
310325

311326
public static String createHourTimestamp(Date date) {

src/main/java/org/radarcns/util/CsvAvroConverter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public boolean hasHeader() {
5959
private final ObjectWriter csvWriter;
6060
private final Map<String, Object> map;
6161
private final CsvGenerator generator;
62+
private int numOfColumns;
6263

6364
public CsvAvroConverter(CsvFactory factory, Writer writer, GenericRecord record, boolean writeHeader)
6465
throws IOException {
@@ -72,15 +73,22 @@ public CsvAvroConverter(CsvFactory factory, Writer writer, GenericRecord record,
7273
if (writeHeader) {
7374
schema = schema.withHeader();
7475
}
76+
numOfColumns = schema.size();
7577
generator = factory.createGenerator(writer);
7678
csvWriter = new CsvMapper(factory).writer(schema);
7779
}
7880

7981
@Override
80-
public void writeRecord(GenericRecord record) throws IOException {
82+
public boolean writeRecord(GenericRecord record) throws IOException {
8183
Map<String, Object> localMap = convertRecord(record);
84+
85+
if(localMap.size() > numOfColumns) {
86+
// Cannot write to same file so return false
87+
return false;
88+
}
8289
csvWriter.writeValue(generator, localMap);
8390
localMap.clear();
91+
return true;
8492
}
8593

8694
public Map<String, Object> convertRecord(GenericRecord record) {

src/main/java/org/radarcns/util/FileCache.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,10 @@ public FileCache(RecordConverterFactory converterFactory, Path path,
7777
}
7878

7979
/** Write a record to the cache. */
80-
public void writeRecord(GenericRecord record) throws IOException {
81-
this.recordConverter.writeRecord(record);
80+
public boolean writeRecord(GenericRecord record) throws IOException {
81+
boolean result = this.recordConverter.writeRecord(record);
8282
lastUse = System.nanoTime();
83+
return result;
8384
}
8485

8586
@Override

src/main/java/org/radarcns/util/FileCacheStore.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ public class FileCacheStore implements Flushable, Closeable {
4040
private final int maxFiles;
4141
private final Map<Path, FileCache> caches;
4242

43+
// Response codes for each write record case
44+
public static final int CACHE_AND_WRITE = 1; //used cache and write successful
45+
public static final int NO_CACHE_AND_WRITE= 2;
46+
public static final int CACHE_AND_NO_WRITE =3;
47+
public static final int NO_CACHE_AND_NO_WRITE =4;
48+
49+
4350
public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boolean gzip, boolean deduplicate) {
4451
this.converterFactory = converterFactory;
4552
this.maxFiles = maxFiles;
@@ -54,14 +61,19 @@ public FileCacheStore(RecordConverterFactory converterFactory, int maxFiles, boo
5461
*
5562
* @param path file to append data to
5663
* @param record data
57-
* @return true if the cache was used, false if a new file was opened.
64+
* @return integer according to one of the response codes.
5865
* @throws IOException when failing to open a file or writing to it.
5966
*/
60-
public boolean writeRecord(Path path, GenericRecord record) throws IOException {
67+
public int writeRecord(Path path, GenericRecord record) throws IOException {
6168
FileCache cache = caches.get(path);
6269
if (cache != null) {
63-
cache.writeRecord(record);
64-
return true;
70+
if(cache.writeRecord(record)){
71+
return CACHE_AND_WRITE;
72+
} else {
73+
// This is the case when cache is used but write is unsuccessful
74+
// because of different number columns in same topic
75+
return CACHE_AND_NO_WRITE;
76+
}
6577
} else {
6678
ensureCapacity();
6779

@@ -70,8 +82,12 @@ public boolean writeRecord(Path path, GenericRecord record) throws IOException {
7082

7183
cache = new FileCache(converterFactory, path, record, gzip);
7284
caches.put(path, cache);
73-
cache.writeRecord(record);
74-
return false;
85+
if(cache.writeRecord(record)) {
86+
return NO_CACHE_AND_WRITE;
87+
} else {
88+
return NO_CACHE_AND_NO_WRITE;
89+
}
90+
7591
}
7692
}
7793

src/main/java/org/radarcns/util/JsonAvroConverter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ public JsonAvroConverter(JsonFactory factory, Writer writer) throws IOException
5454
}
5555

5656
@Override
57-
public void writeRecord(GenericRecord record) throws IOException {
57+
public boolean writeRecord(GenericRecord record) throws IOException {
5858
jsonWriter.writeValue(generator, convertRecord(record));
59+
return true;
5960
}
6061

6162
public Map<String, Object> convertRecord(GenericRecord record) {

src/main/java/org/radarcns/util/RecordConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@
2727

2828
/** Converts a GenericRecord to Java primitives or writes it to file. */
2929
public interface RecordConverter extends Flushable, Closeable {
30-
void writeRecord(GenericRecord record) throws IOException;
30+
boolean writeRecord(GenericRecord record) throws IOException;
3131
Map<String, Object> convertRecord(GenericRecord record);
3232
}

src/test/java/org/radarcns/util/FileCacheStoreTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,23 @@ public void appendLine() throws IOException {
5757

5858
try (FileCacheStore cache = new FileCacheStore(csvFactory, 2, false, false)) {
5959
record = new GenericRecordBuilder(simpleSchema).set("a", "something").build();
60-
assertFalse(cache.writeRecord(f1, record));
60+
assertEquals(cache.writeRecord(f1, record), FileCacheStore.NO_CACHE_AND_WRITE);
6161
record = new GenericRecordBuilder(simpleSchema).set("a", "somethingElse").build();
62-
assertTrue(cache.writeRecord(f1, record));
62+
assertEquals(cache.writeRecord(f1, record), FileCacheStore.CACHE_AND_WRITE);
6363
record = new GenericRecordBuilder(simpleSchema).set("a", "something").build();
64-
assertFalse(cache.writeRecord(f2, record));
64+
assertEquals(cache.writeRecord(f2, record), FileCacheStore.NO_CACHE_AND_WRITE);
6565
record = new GenericRecordBuilder(simpleSchema).set("a", "third").build();
66-
assertTrue(cache.writeRecord(f1, record));
66+
assertEquals(cache.writeRecord(f1, record), FileCacheStore.CACHE_AND_WRITE);
6767
record = new GenericRecordBuilder(simpleSchema).set("a", "f3").build();
68-
assertFalse(cache.writeRecord(f3, record));
68+
assertEquals(cache.writeRecord(f3, record), FileCacheStore.NO_CACHE_AND_WRITE);
6969
record = new GenericRecordBuilder(simpleSchema).set("a", "f2").build();
70-
assertFalse(cache.writeRecord(f2, record));
70+
assertEquals(cache.writeRecord(f2, record), FileCacheStore.NO_CACHE_AND_WRITE);
7171
record = new GenericRecordBuilder(simpleSchema).set("a", "f3").build();
72-
assertTrue(cache.writeRecord(f3, record));
72+
assertEquals(cache.writeRecord(f3, record), FileCacheStore.CACHE_AND_WRITE);
7373
record = new GenericRecordBuilder(simpleSchema).set("a", "f4").build();
74-
assertFalse(cache.writeRecord(f4, record));
74+
assertEquals(cache.writeRecord(f4, record), FileCacheStore.NO_CACHE_AND_WRITE);
7575
record = new GenericRecordBuilder(simpleSchema).set("a", "f3").build();
76-
assertTrue(cache.writeRecord(f3, record));
76+
assertEquals(cache.writeRecord(f3, record), FileCacheStore.CACHE_AND_WRITE);
7777
}
7878

7979
assertEquals("a\nsomething\nsomethingElse\nthird\n", new String(Files.readAllBytes(f1)));

0 commit comments

Comments
 (0)