@@ -121,7 +121,7 @@ protected boolean shouldGetSchema() {
121121 * Config for the plugin.
122122 */
123123 @ SuppressWarnings ("ConstantConditions" )
124- public static class GCSSourceConfig extends PluginConfig implements FileSourceProperties {
124+ public static class GCSSourceConfig extends AbstractFileSourceConfig implements FileSourceProperties {
125125 public static final String NAME_PATH = "path" ;
126126 public static final String NAME_FORMAT = "format" ;
127127 private static final String NAME_FILE_SYSTEM_PROPERTIES = "fileSystemProperties" ;
@@ -133,10 +133,6 @@ public static class GCSSourceConfig extends PluginConfig implements FileSourcePr
133133 private static final Gson GSON = new Gson ();
134134 private static final Type MAP_STRING_STRING_TYPE = new TypeToken <Map <String , String >>() { }.getType ();
135135
136- @ Name (Constants .Reference .REFERENCE_NAME )
137- @ Description ("This will be used to uniquely identify this source for lineage, annotating metadata, etc." )
138- public String referenceName ;
139-
140136 @ Macro
141137 @ Description ("The path to read from. For example, gs://<bucket>/path/to/directory/" )
142138 private String path ;
@@ -146,76 +142,11 @@ public static class GCSSourceConfig extends PluginConfig implements FileSourcePr
146142 @ Description ("Map of properties to set on the InputFormat." )
147143 private String fileSystemProperties ;
148144
149- @ Macro
150- @ Nullable
151- @ Description ("Maximum size of each partition used to read data. "
152- + "Smaller partitions will increase the level of parallelism, but will require more resources and overhead." )
153- private Long maxSplitSize ;
154-
155145 @ Macro
156146 @ Nullable
157147 @ Description ("Minimum size of each partition used to read data. " )
158148 private Long minSplitSize ;
159149
160- @ Macro
161- @ Nullable
162- @ Description ("Output field to place the path of the file that the record was read from. "
163- + "If not specified, the file path will not be included in output records. "
164- + "If specified, the field must exist in the output schema as a string." )
165- private String pathField ;
166-
167- @ Macro
168- @ Description ("Format of the data to read. Supported formats are 'avro', 'blob', 'csv', 'delimited', 'json', "
169- + "'parquet', 'text', and 'tsv'." )
170- private String format ;
171-
172- @ Macro
173- @ Nullable
174- @ Description ("Output schema. If a Path Field is set, it must be present in the schema as a string." )
175- private String schema ;
176-
177- @ Macro
178- @ Nullable
179- @ Description ("Whether to only use the filename instead of the URI of the file path when a path field is given. "
180- + "The default value is false." )
181- private Boolean filenameOnly ;
182-
183- @ Macro
184- @ Nullable
185- @ Description ("Regular expression that file paths must match in order to be included in the input. "
186- + "The full file path is compared, not just the file name."
187- + "If no value is given, no file filtering will be done. "
188- + "See https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html for more information about "
189- + "the regular expression syntax." )
190- private String fileRegex ;
191-
192- @ Macro
193- @ Nullable
194- @ Description ("Whether to recursively read directories within the input directory. The default is false." )
195- private Boolean recursive ;
196-
197- @ Macro
198- @ Nullable
199- @ Description ("The delimiter to use if the format is 'delimited'. The delimiter will be ignored if the format "
200- + "is anything other than 'delimited'." )
201- private String delimiter ;
202-
203- @ Macro
204- @ Nullable
205- @ Description ("Whether to skip the first line of each file. Supported formats are 'text', 'csv', 'tsv', " +
206- "'delimited'. Default value is false." )
207- private Boolean skipHeader ;
208-
209- @ Macro
210- @ Nullable
211- @ Description ("File encoding for the source files. The default encoding is 'UTF-8'" )
212- private String fileEncoding ;
213-
214- // this is a hidden property that only exists for wrangler's parse-as-csv that uses the header as the schema
215- // when this is true and the format is text, the header will be the first record returned by every record reader
216- @ Nullable
217- private Boolean copyHeader ;
218-
219150 @ Macro
220151 @ Nullable
221152 @ Description ("Whether the data file is encrypted. If it is set to 'true', a associated metadata file needs to be "
@@ -250,15 +181,8 @@ public static class GCSSourceConfig extends PluginConfig implements FileSourcePr
250181 @ Description ("The existing connection to use." )
251182 private GCPConnectorConfig connection ;
252183
253- public GCSSourceConfig () {
254- this .maxSplitSize = 128L * 1024 * 1024 ;
255- this .recursive = false ;
256- this .filenameOnly = false ;
257- this .copyHeader = false ;
258- }
259-
260184 public void validate (FailureCollector collector ) {
261- IdUtils . validateReferenceName ( referenceName , collector );
185+ super . validate ( collector );
262186 ConfigUtil .validateConnection (this , useConnection , connection , collector );
263187 // validate that path is valid
264188 if (!containsMacro (NAME_PATH )) {
@@ -285,39 +209,13 @@ public void validate(FailureCollector collector) {
285209 .withStacktrace (e .getStackTrace ());
286210 }
287211 }
288-
289- if (fileEncoding != null && !fileEncoding .equals (AbstractFileSourceConfig .DEFAULT_FILE_ENCODING )
290- && !FixedLengthCharset .isValidEncoding (fileEncoding )) {
291- collector .addFailure ("Specified file encoding is not valid." ,
292- "Use one of the supported file encodings." );
293- }
294- }
295-
296- @ Override
297- public String getFormatName () {
298- return Formats .getFormatPluginName (format );
299- }
300-
301- @ Override
302- public String getReferenceName () {
303- return referenceName ;
304212 }
305213
306214 @ Override
307215 public String getPath () {
308216 return path ;
309217 }
310218
311- @ Nullable
312- @ Override
313- public Pattern getFilePattern () {
314- try {
315- return fileRegex == null ? null : Pattern .compile (fileRegex );
316- } catch (RuntimeException e ) {
317- throw new IllegalArgumentException ("Invalid file regular expression." + e .getMessage (), e );
318- }
319- }
320-
321219 @ Nullable
322220 public Pattern getExclusionPattern () {
323221 if (!isEncrypted ()) {
@@ -327,11 +225,6 @@ public Pattern getExclusionPattern() {
327225 return Pattern .compile (".*" + Pattern .quote (getEncryptedMetadataSuffix ()) + "$" );
328226 }
329227
330- @ Override
331- public long getMaxSplitSize () {
332- return maxSplitSize ;
333- }
334-
335228 @ Nullable
336229 public Long getMinSplitSize () {
337230 return minSplitSize ;
@@ -342,44 +235,8 @@ public boolean shouldAllowEmptyInput() {
342235 return false ;
343236 }
344237
345- @ Override
346- public boolean shouldReadRecursively () {
347- return recursive ;
348- }
349-
350- @ Nullable
351- @ Override
352- public String getPathField () {
353- return pathField ;
354- }
355-
356- @ Override
357- public boolean useFilenameAsPath () {
358- return filenameOnly ;
359- }
360-
361- @ Nullable
362- @ Override
363- public Schema getSchema () {
364- try {
365- return Strings .isNullOrEmpty (schema ) ? null : Schema .parseJson (schema );
366- } catch (Exception e ) {
367- throw new IllegalArgumentException ("Unable to parse schema with error: " + e .getMessage (), e );
368- }
369- }
370-
371238 public boolean isCopyHeader () {
372- return copyHeader != null && copyHeader ;
373- }
374-
375- @ Override
376- public boolean skipHeader () {
377- return skipHeader == null ? false : skipHeader ;
378- }
379-
380- @ Nullable
381- public String getFileEncoding () {
382- return fileEncoding ;
239+ return shouldCopyHeader ();
383240 }
384241
385242 public boolean isEncrypted () {
0 commit comments