|
16 | 16 |
|
17 | 17 | package org.radarcns;
|
18 | 18 |
|
| 19 | +import com.fasterxml.jackson.databind.JsonMappingException; |
19 | 20 | import org.apache.avro.Schema.Field;
|
20 | 21 | import org.apache.avro.file.DataFileReader;
|
21 | 22 | import org.apache.avro.generic.GenericDatumReader;
|
@@ -70,7 +71,9 @@ public class RestructureAvroRecords {
|
70 | 71 | private long processedFileCount;
|
71 | 72 | private long processedRecordsCount;
|
72 | 73 | private static final boolean USE_GZIP = "gzip".equalsIgnoreCase(System.getProperty("org.radarcns.compression"));
|
73 |
| - private static final boolean DO_DEDUPLICATE = "true".equalsIgnoreCase(System.getProperty("org.radarcns.deduplicate", "true")); |
| 74 | + |
| 75 | + // Default set to false because causes loss of records from Biovotion data. https://github.com/RADAR-base/Restructure-HDFS-topic/issues/16 |
| 76 | + private static final boolean DO_DEDUPLICATE = "true".equalsIgnoreCase(System.getProperty("org.radarcns.deduplicate", "false")); |
74 | 77 |
|
75 | 78 | public static void main(String [] args) throws Exception {
|
76 | 79 | if (args.length != 3) {
|
@@ -178,7 +181,12 @@ public void start(String directoryName) throws IOException {
|
178 | 181 | for (Map.Entry<String, List<Path>> entry : topicPaths.entrySet()) {
|
179 | 182 | try (FileCacheStore cache = new FileCacheStore(converterFactory, 100, USE_GZIP, DO_DEDUPLICATE)) {
|
180 | 183 | for (Path filePath : entry.getValue()) {
|
181 |
| - this.processFile(filePath, entry.getKey(), cache, offsets); |
| 184 | + // If JsonMappingException occurs, log the error and continue with other files |
| 185 | + try { |
| 186 | + this.processFile(filePath, entry.getKey(), cache, offsets); |
| 187 | + } catch (JsonMappingException exc) { |
| 188 | + logger.error("Cannot map values", exc); |
| 189 | + } |
182 | 190 | progressBar.update(++processedFileCount);
|
183 | 191 | }
|
184 | 192 | }
|
@@ -258,9 +266,20 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
|
258 | 266 | Date time = getDate(keyField, valueField);
|
259 | 267 | java.nio.file.Path outputFileName = createFilename(time);
|
260 | 268 |
|
| 269 | + String projectId; |
| 270 | + |
| 271 | + if(keyField.get("projectId") == null) { |
| 272 | + projectId = "unknown-project"; |
| 273 | + } else { |
| 274 | + // Clean Project id for use in final pathname |
| 275 | + projectId = keyField.get("projectId").toString().replaceAll("[^a-zA-Z0-9_-]+", ""); |
| 276 | + } |
| 277 | + |
261 | 278 | // Clean user id and create final output pathname
|
262 | 279 | String userId = keyField.get("userId").toString().replaceAll("[^a-zA-Z0-9_-]+", "");
|
263 |
| - java.nio.file.Path userDir = this.outputPath.resolve(userId); |
| 280 | + |
| 281 | + java.nio.file.Path projectDir = this.outputPath.resolve(projectId); |
| 282 | + java.nio.file.Path userDir = projectDir.resolve(userId); |
264 | 283 | java.nio.file.Path userTopicDir = userDir.resolve(topicName);
|
265 | 284 | java.nio.file.Path outputPath = userTopicDir.resolve(outputFileName);
|
266 | 285 |
|
|
0 commit comments