|
17 | 17 | package org.radarcns;
|
18 | 18 |
|
19 | 19 | import java.io.File;
|
| 20 | +import java.io.FileWriter; |
20 | 21 | import java.io.IOException;
|
21 | 22 | import java.text.SimpleDateFormat;
|
22 | 23 | import java.util.ArrayList;
|
@@ -49,6 +50,7 @@ public class RestructureAvroRecords {
|
49 | 50 | private final String outputFileExtension;
|
50 | 51 | private static final String OFFSETS_FILE_NAME = "offsets.csv";
|
51 | 52 | private static final String BINS_FILE_NAME = "bins.csv";
|
| 53 | + private static final String SCHEMA_OUTPUT_FILE_NAME = "schema.json"; |
52 | 54 | private static final SimpleDateFormat FILE_DATE_FORMAT = new SimpleDateFormat("yyyyMMdd_HH");
|
53 | 55 |
|
54 | 56 | static {
|
@@ -252,14 +254,21 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
|
252 | 254 | String outputFileName = createFilename(valueField, timeField);
|
253 | 255 |
|
254 | 256 | // Clean user id and create final output pathname
|
255 |
| - String userId = keyField.get("userId").toString().replaceAll("\\W+", ""); |
| 257 | + String userId = keyField.get("userId").toString().replaceAll("[^a-zA-Z0-9_-]+", ""); |
256 | 258 | File userDir = new File(this.outputPath, userId);
|
257 | 259 | File userTopicDir = new File(userDir, topicName);
|
258 | 260 | File outputFile = new File(userTopicDir, outputFileName);
|
259 | 261 |
|
260 | 262 | // Write data
|
261 | 263 | cache.writeRecord(outputFile, record);
|
262 | 264 |
|
| 265 | + File schemaFile = new File(userTopicDir, SCHEMA_OUTPUT_FILE_NAME); |
| 266 | + if (!schemaFile.exists()) { |
| 267 | + try (FileWriter writer = new FileWriter(schemaFile, false)) { |
| 268 | + writer.write(record.getSchema().toString(true)); |
| 269 | + } |
| 270 | + } |
| 271 | + |
263 | 272 | // Count data (binned and total)
|
264 | 273 | bins.add(topicName, keyField.get("sourceId").toString(), valueField, timeField);
|
265 | 274 | processedRecordsCount++;
|
|
0 commit comments