4848import java .util .List ;
4949import java .util .Map ;
5050import java .util .TimeZone ;
51+ import java .util .regex .Pattern ;
5152
5253public class RestructureAvroRecords {
5354 private static final Logger logger = LoggerFactory .getLogger (RestructureAvroRecords .class );
@@ -57,6 +58,7 @@ public class RestructureAvroRecords {
5758 private static final java .nio .file .Path BINS_FILE_NAME = Paths .get ("bins.csv" );
5859 private static final java .nio .file .Path SCHEMA_OUTPUT_FILE_NAME = Paths .get ("schema.json" );
5960 private static final SimpleDateFormat FILE_DATE_FORMAT = new SimpleDateFormat ("yyyyMMdd_HH" );
61+ private static final Pattern ILLEGAL_CHARACTER_PATTERN = Pattern .compile ("[^a-zA-Z0-9_-]+" );
6062
6163 static {
6264 FILE_DATE_FORMAT .setTimeZone (TimeZone .getTimeZone ("UTC" ));
@@ -287,20 +289,8 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
287289 Date time = getDate (keyField , valueField );
288290 java .nio .file .Path outputFileName = createFilename (time , suffix );
289291
290- String projectId ;
291-
292- if (keyField .get ("projectId" ) == null ) {
293- projectId = "unknown-project" ;
294- } else {
295- // Clean Project id for use in final pathname
296- projectId = keyField .get ("projectId" ).toString ().replaceAll ("[^a-zA-Z0-9_-]+" , "" );
297- if (projectId .isEmpty ()) {
298- projectId = "empty-project-id" ;
299- }
300- }
301-
302- // Clean user id and create final output pathname
303- String userId = keyField .get ("userId" ).toString ().replaceAll ("[^a-zA-Z0-9_-]+" , "" );
292+ String projectId = sanitizeId (keyField .get ("projectId" ), "unknown-project" );
293+ String userId = sanitizeId (keyField .get ("userId" ), "unknown-user" );
304294
305295 java .nio .file .Path projectDir = this .outputPath .resolve (projectId );
306296 java .nio .file .Path userDir = projectDir .resolve (userId );
@@ -323,8 +313,9 @@ private void writeRecord(GenericRecord record, String topicName, FileCacheStore
323313 }
324314 }
325315
316+ String sourceId = sanitizeId (keyField .get ("sourceId" ), "unknown-source" );
326317 // Count data (binned and total)
327- bins .add (topicName , keyField . get ( " sourceId" ). toString () , time );
318+ bins .add (topicName , sourceId , time );
328319 processedRecordsCount ++;
329320 }
330321 }
@@ -372,6 +363,18 @@ public static Date getDate(GenericRecord keyField, GenericRecord valueField) {
372363 return new Date (time );
373364 }
374365
366+ private static String sanitizeId (Object id , String defaultValue ) {
367+ if (id == null ) {
368+ return defaultValue ;
369+ }
370+ String idString = ILLEGAL_CHARACTER_PATTERN .matcher (id .toString ()).replaceAll ("" );
371+ if (idString .isEmpty ()) {
372+ return defaultValue ;
373+ } else {
374+ return idString ;
375+ }
376+ }
377+
375378 public static class Builder {
376379 private boolean useGzip ;
377380 private boolean doDeduplicate ;
0 commit comments