|
17 | 17 | package org.radarcns;
|
18 | 18 |
|
19 | 19 | import java.io.File;
|
| 20 | +import java.io.FileWriter; |
20 | 21 | import java.io.IOException;
|
21 | 22 | import java.nio.file.Files;
|
22 | 23 | import java.text.SimpleDateFormat;
|
@@ -52,6 +53,7 @@ public class RestructureAvroRecords {
|
52 | 53 | private final String outputFileExtension;
|
53 | 54 | private static final String OFFSETS_FILE_NAME = "offsets.csv";
|
54 | 55 | private static final String BINS_FILE_NAME = "bins.csv";
|
| 56 | + private static final String SCHEMA_OUTPUT_FILE_NAME = "schema.json"; |
55 | 57 | private static final SimpleDateFormat FILE_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd_HH");
|
56 | 58 |
|
57 | 59 | static {
|
@@ -257,14 +259,21 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
|
257 | 259 | String outputFileName = createFilename(valueField, timeField);
|
258 | 260 |
|
259 | 261 | // Clean user id and create final output pathname
|
260 |
| - String userId = keyField.get("userId").toString().replaceAll("\\W+", ""); |
| 262 | + String userId = keyField.get("userId").toString().replaceAll("[^a-zA-Z0-9_-]+", ""); |
261 | 263 | File userDir = new File(this.outputPath, userId);
|
262 | 264 | File userTopicDir = new File(userDir, topicName);
|
263 | 265 | File outputFile = new File(userTopicDir, outputFileName);
|
264 | 266 |
|
265 | 267 | // Write data
|
266 | 268 | cache.writeRecord(outputFile, record);
|
267 | 269 |
|
| 270 | + File schemaFile = new File(userTopicDir, SCHEMA_OUTPUT_FILE_NAME); |
| 271 | + if (!schemaFile.exists()) { |
| 272 | + try (FileWriter writer = new FileWriter(schemaFile, false)) { |
| 273 | + writer.write(record.getSchema().toString(true)); |
| 274 | + } |
| 275 | + } |
| 276 | + |
268 | 277 | // Count data (binned and total)
|
269 | 278 | bins.add(topicName, keyField.get("sourceId").toString(), valueField, timeField);
|
270 | 279 | processedRecordsCount++;
|
|
0 commit comments