25
25
import java .time .format .DateTimeFormatter ;
26
26
import java .time .format .DateTimeParseException ;
27
27
import java .util .regex .Pattern ;
28
+ import javax .annotation .Nonnull ;
29
+ import javax .annotation .Nullable ;
28
30
import org .apache .avro .Schema ;
31
+ import org .apache .avro .Schema .Field ;
29
32
import org .apache .avro .Schema .Type ;
30
33
import org .apache .avro .generic .GenericRecord ;
31
34
import org .slf4j .Logger ;
@@ -41,7 +44,17 @@ public abstract class RecordPathFactory implements Plugin {
41
44
private Path root ;
42
45
private String extension ;
43
46
44
- public RecordOrganization getRecordOrganization (String topic , GenericRecord record , int attempt ) {
47
+ /**
48
+ * Get the organization of given record in given topic.
49
+ * @param topic Kafka topic name
50
+ * @param record record with possible key and value fields containing records
51
+ * @param attempt number of previous attempts to write given record. This increases if previous
52
+ * paths already existed and are incompatible.
53
+ * @return organization of given record
54
+ */
55
+ @ Nonnull
56
+ public RecordOrganization getRecordOrganization (@ Nonnull String topic ,
57
+ @ Nonnull GenericRecord record , int attempt ) {
45
58
GenericRecord keyField = (GenericRecord ) record .get ("key" );
46
59
GenericRecord valueField = (GenericRecord ) record .get ("value" );
47
60
@@ -58,8 +71,27 @@ public RecordOrganization getRecordOrganization(String topic, GenericRecord reco
58
71
return new RecordOrganization (outputPath , category , time );
59
72
}
60
73
61
- public abstract Path getRelativePath (String topic , GenericRecord key , GenericRecord value , Instant time , int attempt );
74
+ /**
75
+ * Get the relative path corresponding to given record on given topic.
76
+ * @param topic Kafka topic name
77
+ * @param key record key
78
+ * @param value record value
79
+ * @param time time contained in the record
80
+ * @param attempt number of previous attempts to write given record. This increases if previous
81
+ * paths already existed and are incompatible.
82
+ * @return relative path corresponding to given parameters.
83
+ */
84
+ @ Nonnull
85
+ public abstract Path getRelativePath (@ Nonnull String topic , @ Nullable GenericRecord key ,
86
+ @ Nullable GenericRecord value , @ Nullable Instant time , int attempt );
62
87
88
+ /**
89
+ * Get the category of a record, representing a partitioning for a given topic and user.
90
+ * @param key record key
91
+ * @param value record value
92
+ * @return category name.
93
+ */
94
+ @ Nonnull
63
95
public abstract String getCategory (GenericRecord key , GenericRecord value );
64
96
65
97
public Path getRoot () {
@@ -86,80 +118,147 @@ public DateTimeFormatter getTimeBinFormat() {
86
118
return HOURLY_TIME_BIN_FORMAT ;
87
119
}
88
120
89
- public String getTimeBin (Instant time ) {
121
+ @ Nonnull
122
+ public String getTimeBin (@ Nullable Instant time ) {
90
123
return time == null ? "unknown_date" : getTimeBinFormat ().format (time );
91
124
}
92
125
126
+ /**
127
+ * Organization of a record.
128
+ */
93
129
public static class RecordOrganization {
94
130
private final Path path ;
95
131
private final Instant time ;
96
132
private final String category ;
97
133
98
- public RecordOrganization (Path path , String category , Instant time ) {
134
+ /**
135
+ * Organization of a record.
136
+ *
137
+ * @param path path that the record should be stored in.
138
+ * @param category category or partition that the record belongs to
139
+ * @param time time contained in the record, if any
140
+ */
141
+ public RecordOrganization (@ Nonnull Path path , @ Nonnull String category ,
142
+ @ Nullable Instant time ) {
99
143
this .path = path ;
100
144
this .time = time ;
101
145
this .category = category ;
102
146
}
103
147
148
+ @ Nonnull
104
149
public Path getPath () {
105
150
return path ;
106
151
}
107
152
153
+ @ Nullable
108
154
public Instant getTime () {
109
155
return time ;
110
156
}
111
157
158
+ @ Nonnull
112
159
public String getCategory () {
113
160
return category ;
114
161
}
115
162
}
116
163
117
- public static Instant getDate (GenericRecord keyField , GenericRecord valueField ) {
118
- Schema .Field timeField = valueField .getSchema ().getField ("time" );
119
- if (timeField != null && timeField .schema ().getType () == Type .DOUBLE ) {
120
- double time = (Double ) valueField .get (timeField .pos ());
121
- // Convert from millis to date and apply dateFormat
122
- return Instant .ofEpochMilli ((long ) (time * 1000d ));
164
+ /**
165
+ * Get the date contained in given records
166
+ * @param keyField key field of the record
167
+ * @param valueField value field of the record
168
+ * @return date contained in the values of either record, or {@code null} if not found or
169
+ * it cannot be parsed.
170
+ */
171
+ @ Nullable
172
+ public static Instant getDate (@ Nullable GenericRecord keyField ,
173
+ @ Nullable GenericRecord valueField ) {
174
+ Schema .Field timeField ;
175
+
176
+ if (valueField != null ) {
177
+ timeField = valueField .getSchema ().getField ("time" );
178
+ if (timeField != null && timeField .schema ().getType () == Type .DOUBLE ) {
179
+ double time = (Double ) valueField .get (timeField .pos ());
180
+ // Convert from millis to date and apply dateFormat
181
+ return Instant .ofEpochMilli ((long ) (time * 1000d ));
182
+ }
123
183
}
124
- timeField = keyField .getSchema ().getField ("timeStart" );
125
184
126
- if (timeField != null && timeField .schema ().getType () == Type .DOUBLE ) {
127
- double time = (Double ) keyField .get (timeField .pos ());
128
- // Convert from millis to date and apply dateFormat
129
- return Instant .ofEpochMilli ((long ) (time * 1000d ));
185
+ if (keyField != null ) {
186
+ timeField = keyField .getSchema ().getField ("timeStart" );
187
+
188
+ if (timeField != null && timeField .schema ().getType () == Type .DOUBLE ) {
189
+ double time = (Double ) keyField .get (timeField .pos ());
190
+ // Convert from millis to date and apply dateFormat
191
+ return Instant .ofEpochMilli ((long ) (time * 1000d ));
192
+ }
193
+
194
+ // WindowedKey
195
+ timeField = keyField .getSchema ().getField ("start" );
196
+ if (timeField != null && timeField .schema ().getType () == Type .LONG ) {
197
+ return Instant .ofEpochMilli ((Long ) keyField .get ("start" ));
198
+ }
130
199
}
131
200
132
- // WindowedKey
133
- timeField = keyField .getSchema ().getField ("start" );
134
- if (timeField != null && timeField .schema ().getType () == Type .LONG ) {
135
- return Instant .ofEpochMilli ((Long ) keyField .get ("start" ));
201
+ if (valueField != null ) {
202
+ Instant result = parseDateTime (valueField );
203
+ if (result != null ) {
204
+ return result ;
205
+ }
206
+ result = parseDate (valueField );
207
+ if (result != null ) {
208
+ return result ;
209
+ }
136
210
}
137
211
212
+ return null ;
213
+ }
214
+
215
+ /**
216
+ * Parse the dateTime field of a record, if present.
217
+ *
218
+ * @param record record that may contain a dateTime field
219
+ * @return {@code Instant} representing the dateTime or {@code null} if the field cannot be
220
+ * found or parsed.
221
+ */
222
+ @ Nullable
223
+ public static Instant parseDateTime (@ Nonnull GenericRecord record ) {
138
224
// dateTime
139
- timeField = valueField .getSchema ().getField ("dateTime" );
225
+ Field timeField = record .getSchema ().getField ("dateTime" );
140
226
if (timeField != null && timeField .schema ().getType () == Type .STRING ) {
141
- String dateTime = valueField .get (timeField .pos ()).toString ();
227
+ String dateTime = record .get (timeField .pos ()).toString ();
142
228
try {
143
- return Instant .parse (dateTime );
229
+ if (dateTime .charAt (dateTime .length () - 1 ) == 'Z' ) {
230
+ return Instant .parse (dateTime );
231
+ } else {
232
+ return LocalDateTime .parse (dateTime ).toInstant (UTC );
233
+ }
144
234
} catch (DateTimeParseException ex ) {
145
- // try local date
146
- }
147
- try {
148
- return LocalDateTime .parse (dateTime ).toInstant (UTC );
149
- } catch (DateTimeParseException ex ) {
150
- // no other options
235
+ // try next data type
151
236
}
152
237
}
153
238
154
- timeField = valueField .getSchema ().getField ("date" );
239
+ return null ;
240
+ }
241
+
242
+ /**
243
+ * Parse the date field of a record, if present.
244
+ *
245
+ * @param record record that may contain a date field
246
+ * @return {@code Instant} representing the start of given date or {@code null} if the field
247
+ * cannot be found or parsed.
248
+ */
249
+ @ Nullable
250
+ public static Instant parseDate (@ Nonnull GenericRecord record ) {
251
+ // dateTime
252
+ Field timeField = record .getSchema ().getField ("date" );
155
253
if (timeField != null && timeField .schema ().getType () == Type .STRING ) {
156
- String date = valueField .get (timeField .pos ()).toString ();
254
+ String date = record .get (timeField .pos ()).toString ();
157
255
try {
158
256
return LocalDate .parse (date ).atStartOfDay (UTC ).toInstant ();
159
257
} catch (DateTimeParseException ex ) {
160
258
// no other options
161
259
}
162
260
}
261
+
163
262
return null ;
164
263
}
165
264
0 commit comments