2020
2121import java .nio .file .Path ;
2222import java .time .Instant ;
23+ import java .time .LocalDate ;
24+ import java .time .LocalDateTime ;
2325import java .time .format .DateTimeFormatter ;
26+ import java .time .format .DateTimeParseException ;
2427import java .util .regex .Pattern ;
28+ import javax .annotation .Nonnull ;
29+ import javax .annotation .Nullable ;
2530import org .apache .avro .Schema ;
31+ import org .apache .avro .Schema .Field ;
32+ import org .apache .avro .Schema .Type ;
2633import org .apache .avro .generic .GenericRecord ;
2734import org .slf4j .Logger ;
2835import org .slf4j .LoggerFactory ;
@@ -37,7 +44,17 @@ public abstract class RecordPathFactory implements Plugin {
3744 private Path root ;
3845 private String extension ;
3946
40- public RecordOrganization getRecordOrganization (String topic , GenericRecord record , int attempt ) {
47+ /**
48+ * Get the organization of given record in given topic.
49+ * @param topic Kafka topic name
50+ * @param record record with possible key and value fields containing records
51+ * @param attempt number of previous attempts to write given record. This increases if previous
52+ * paths already existed and are incompatible.
53+ * @return organization of given record
54+ */
55+ @ Nonnull
56+ public RecordOrganization getRecordOrganization (@ Nonnull String topic ,
57+ @ Nonnull GenericRecord record , int attempt ) {
4158 GenericRecord keyField = (GenericRecord ) record .get ("key" );
4259 GenericRecord valueField = (GenericRecord ) record .get ("value" );
4360
@@ -54,8 +71,27 @@ public RecordOrganization getRecordOrganization(String topic, GenericRecord reco
5471 return new RecordOrganization (outputPath , category , time );
5572 }
5673
57- public abstract Path getRelativePath (String topic , GenericRecord key , GenericRecord value , Instant time , int attempt );
74+ /**
75+ * Get the relative path corresponding to given record on given topic.
76+ * @param topic Kafka topic name
77+ * @param key record key
78+ * @param value record value
79+ * @param time time contained in the record
80+ * @param attempt number of previous attempts to write given record. This increases if previous
81+ * paths already existed and are incompatible.
82+ * @return relative path corresponding to given parameters.
83+ */
84+ @ Nonnull
85+ public abstract Path getRelativePath (@ Nonnull String topic , @ Nullable GenericRecord key ,
86+ @ Nullable GenericRecord value , @ Nullable Instant time , int attempt );
5887
88+ /**
89+ * Get the category of a record, representing a partitioning for a given topic and user.
90+ * @param key record key
91+ * @param value record value
92+ * @return category name.
93+ */
94+ @ Nonnull
5995 public abstract String getCategory (GenericRecord key , GenericRecord value );
6096
6197 public Path getRoot () {
@@ -82,48 +118,148 @@ public DateTimeFormatter getTimeBinFormat() {
82118 return HOURLY_TIME_BIN_FORMAT ;
83119 }
84120
85- public String getTimeBin (Instant time ) {
121+ @ Nonnull
122+ public String getTimeBin (@ Nullable Instant time ) {
86123 return time == null ? "unknown_date" : getTimeBinFormat ().format (time );
87124 }
88125
126+ /**
127+ * Organization of a record.
128+ */
89129 public static class RecordOrganization {
90130 private final Path path ;
91131 private final Instant time ;
92132 private final String category ;
93133
94- public RecordOrganization (Path path , String category , Instant time ) {
134+ /**
135+ * Organization of a record.
136+ *
137+ * @param path path that the record should be stored in.
138+ * @param category category or partition that the record belongs to
139+ * @param time time contained in the record, if any
140+ */
141+ public RecordOrganization (@ Nonnull Path path , @ Nonnull String category ,
142+ @ Nullable Instant time ) {
95143 this .path = path ;
96144 this .time = time ;
97145 this .category = category ;
98146 }
99147
148+ @ Nonnull
100149 public Path getPath () {
101150 return path ;
102151 }
103152
153+ @ Nullable
104154 public Instant getTime () {
105155 return time ;
106156 }
107157
158+ @ Nonnull
108159 public String getCategory () {
109160 return category ;
110161 }
111162 }
112163
113- public static Instant getDate (GenericRecord keyField , GenericRecord valueField ) {
114- Schema .Field timeField = valueField .getSchema ().getField ("time" );
115- if (timeField != null ) {
116- double time = (Double ) valueField .get (timeField .pos ());
117- // Convert from millis to date and apply dateFormat
118- return Instant .ofEpochMilli ((long ) (time * 1000d ));
164+ /**
165+ * Get the date contained in given records
166+ * @param keyField key field of the record
167+ * @param valueField value field of the record
168+ * @return date contained in the values of either record, or {@code null} if not found or
169+ * it cannot be parsed.
170+ */
171+ @ Nullable
172+ public static Instant getDate (@ Nullable GenericRecord keyField ,
173+ @ Nullable GenericRecord valueField ) {
174+ Schema .Field timeField ;
175+
176+ if (valueField != null ) {
177+ timeField = valueField .getSchema ().getField ("time" );
178+ if (timeField != null && timeField .schema ().getType () == Type .DOUBLE ) {
179+ double time = (Double ) valueField .get (timeField .pos ());
180+ // Convert from millis to date and apply dateFormat
181+ return Instant .ofEpochMilli ((long ) (time * 1000d ));
182+ }
183+ }
184+
185+ if (keyField != null ) {
186+ timeField = keyField .getSchema ().getField ("timeStart" );
187+
188+ if (timeField != null && timeField .schema ().getType () == Type .DOUBLE ) {
189+ double time = (Double ) keyField .get (timeField .pos ());
190+ // Convert from millis to date and apply dateFormat
191+ return Instant .ofEpochMilli ((long ) (time * 1000d ));
192+ }
193+
194+ // WindowedKey
195+ timeField = keyField .getSchema ().getField ("start" );
196+ if (timeField != null && timeField .schema ().getType () == Type .LONG ) {
197+ return Instant .ofEpochMilli ((Long ) keyField .get ("start" ));
198+ }
199+ }
200+
201+ if (valueField != null ) {
202+ Instant result = parseDateTime (valueField );
203+ if (result != null ) {
204+ return result ;
205+ }
206+ result = parseDate (valueField );
207+ if (result != null ) {
208+ return result ;
209+ }
119210 }
120211
121- // WindowedKey
122- timeField = keyField .getSchema ().getField ("start" );
123- if (timeField == null ) {
124- return null ;
212+ return null ;
213+ }
214+
215+ /**
216+ * Parse the dateTime field of a record, if present.
217+ *
218+ * @param record record that may contain a dateTime field
219+ * @return {@code Instant} representing the dateTime or {@code null} if the field cannot be
220+ * found or parsed.
221+ */
222+ @ Nullable
223+ public static Instant parseDateTime (@ Nonnull GenericRecord record ) {
224+ // dateTime
225+ Field timeField = record .getSchema ().getField ("dateTime" );
226+ if (timeField != null && timeField .schema ().getType () == Type .STRING ) {
227+ String dateTime = record .get (timeField .pos ()).toString ();
228+ try {
229+ if (dateTime .charAt (dateTime .length () - 1 ) == 'Z' ) {
230+ return Instant .parse (dateTime );
231+ } else {
232+ return LocalDateTime .parse (dateTime ).toInstant (UTC );
233+ }
234+ } catch (DateTimeParseException ex ) {
235+ // try next data type
236+ }
125237 }
126- return Instant .ofEpochMilli ((Long ) keyField .get ("start" ));
238+
239+ return null ;
240+ }
241+
242+ /**
243+ * Parse the date field of a record, if present.
244+ *
245+ * @param record record that may contain a date field
246+ * @return {@code Instant} representing the start of given date or {@code null} if the field
247+ * cannot be found or parsed.
248+ */
249+ @ Nullable
250+ public static Instant parseDate (@ Nonnull GenericRecord record ) {
251+ // dateTime
252+ Field timeField = record .getSchema ().getField ("date" );
253+ if (timeField != null && timeField .schema ().getType () == Type .STRING ) {
254+ String date = record .get (timeField .pos ()).toString ();
255+ try {
256+ return LocalDate .parse (date ).atStartOfDay (UTC ).toInstant ();
257+ } catch (DateTimeParseException ex ) {
258+ // no other options
259+ }
260+ }
261+
262+ return null ;
127263 }
128264
129265 public static String sanitizeId (Object id , String defaultValue ) {
0 commit comments