1818
1919import com .google .auth .oauth2 .GoogleCredentials ;
2020import com .google .cloud .storage .Blob ;
21+ import com .google .cloud .storage .BlobInfo ;
2122import com .google .cloud .storage .Bucket ;
2223import com .google .cloud .storage .BucketInfo ;
2324import com .google .cloud .storage .Storage ;
2425import com .google .cloud .storage .StorageOptions ;
2526import com .google .common .base .Preconditions ;
27+ import com .google .common .base .Strings ;
2628import com .google .common .collect .ImmutableMap ;
2729import com .google .common .io .Files ;
2830import com .google .gson .Gson ;
@@ -90,6 +92,7 @@ public class GCSTest extends DataprocETLTestBase {
9092 private static final String GCS_MOVE_PLUGIN_NAME = "GCSMove" ;
9193 private static final String GCS_COPY_PLUGIN_NAME = "GCSCopy" ;
9294 private static final String SINK_PLUGIN_NAME = "GCS" ;
95+ private static final String MULTI_SINK_PLUGIN_NAME = "GCSMultiFiles" ;
9396 private static final String SOURCE_PLUGIN_NAME = "GCSFile" ;
9497 private static final Schema ALL_DT_SCHEMA = Schema .recordOf (
9598 "record" ,
@@ -118,6 +121,8 @@ public class GCSTest extends DataprocETLTestBase {
118121
119122 private static Storage storage ;
120123 private List <String > markedForDeleteBuckets ;
124+ private static final String CSV_CONTENT_TYPE = "text/csv" ;
125+ private static final String MULTISINK_RUNTIME_ARG = "multisink.%s" ;
121126
122127 @ BeforeClass
123128 public static void testClassSetup () throws IOException {
@@ -761,7 +766,8 @@ public void testGcsSourceFormats() throws Exception {
761766 id,first,last,email,address,city,state,zip
762767 1,Marilyn,Hawkins,[email protected] ,238 Melvin Way,Palo Alto,CA,94302 763768 */
764- ETLStage sink = new ETLStage ("sink" , createSinkPlugin ("csv" , createPath (bucket , "output" ), schema ));
769+ ETLStage sink = new ETLStage ("sink" , createSinkPlugin ("csv" , createPath (bucket , "output" ),
770+ schema , CSV_CONTENT_TYPE ));
765771 pipelineConfig = ETLBatchConfig .builder ().addStage (sink );
766772 for (String format : formats ) {
767773 String path = String .format ("%s/%s" , createPath (bucket , OUTPUT_BLOB_NAME ), format );
@@ -776,6 +782,7 @@ public void testGcsSourceFormats() throws Exception {
776782
777783 Map <String , Integer > lineCounts = new HashMap <>();
778784 List <String > results = getResultBlobsContent (bucket , "output" );
785+ List <String > resultBlobsContentType = getResultBlobsContentType (bucket , "output" );
779786 for (String result : results ) {
780787 for (String line : result .split ("\n " )) {
781788 lineCounts .putIfAbsent (line , 0 );
@@ -787,6 +794,72 @@ public void testGcsSourceFormats() throws Exception {
787794 expected .put (line2 , formats .size ());
788795 expected .put (line3 , formats .size ());
789796 Assert .assertEquals (expected , lineCounts );
797+ Assert .assertEquals (CSV_CONTENT_TYPE , resultBlobsContentType .get (0 ));
798+ }
799+
800+ @ Test
801+ public void testMultiSinkContentType () throws Exception {
802+ String bucketName = "cask-gcs-multisink-" + UUID .randomUUID ().toString ();
803+ Bucket bucket = createBucket (bucketName );
804+
805+ Schema schema = Schema .recordOf ("customer" ,
806+ Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
807+ Schema .Field .of ("name" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
808+ Schema .Field .of ("email" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
809+ Schema .Field .of ("departament" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))));
810+
811+ Schema outputSchema = Schema .recordOf ("output.schema" ,
812+ Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
813+ Schema .Field .of ("name" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
814+ Schema .Field .of ("email" , Schema .nullableOf (Schema .of (Schema .Type .STRING )))
815+ );
816+
817+
818+ String line1 =
"1,Marilyn Hawkins,[email protected] ,DepartmentA" ;
819+ String line2 =
"2,Terry Perez,[email protected] ,DepartmentB" ;
820+ String line3 =
"3,Jack Ferguson,[email protected] ,DepartmentA" ;
821+ String inputPath = "input" ;
822+
823+ bucket .create (inputPath , String .join ("\n " , Arrays .asList (line1 , line2 , line3 )).getBytes (StandardCharsets .UTF_8 ));
824+
825+
826+ Map <String , String > inputSourceConfig = new HashMap <>();
827+ inputSourceConfig .put ("schema" , schema .toString ());
828+ inputSourceConfig .put ("format" , "${sourceFormat}" );
829+ inputSourceConfig .put ("referenceName" , "source_" + UUID .randomUUID ().toString ());
830+ inputSourceConfig .put ("project" , getProjectId ());
831+ inputSourceConfig .put ("path" , createPath (bucket , inputPath ));
832+ ETLStage source = new ETLStage ("source" ,
833+ new ETLPlugin (SOURCE_PLUGIN_NAME ,
834+ BatchSource .PLUGIN_TYPE ,
835+ inputSourceConfig ,
836+ GOOGLE_CLOUD_ARTIFACT ));
837+
838+ ETLBatchConfig .Builder pipelineConfig = ETLBatchConfig .builder ().addStage (source );
839+
840+ String path = createPath (bucket , OUTPUT_BLOB_NAME );
841+ ETLStage sink = new ETLStage ("multsink" , createMultiSinkPlugin (path , schema , "departament" ));
842+ pipelineConfig .addStage (sink ).addConnection (source .getName (), sink .getName ());
843+
844+ AppRequest <ETLBatchConfig > appRequest = getBatchAppRequestV2 (pipelineConfig .build ());
845+ ApplicationId appId = TEST_NAMESPACE .app ("GCSMultiSinkContentType" );
846+ ApplicationManager appManager = deployApplication (appId , appRequest );
847+
848+ String multisink1 = String .format (MULTISINK_RUNTIME_ARG , "DepartmentA" );
849+ String multisink2 = String .format (MULTISINK_RUNTIME_ARG , "DepartmentB" );
850+ Map <String , String > args = new HashMap <>();
851+ args .put (multisink1 , outputSchema .toString ());
852+ args .put (multisink2 , outputSchema .toString ());
853+ args .put ("sourceFormat" , "csv" );
854+ args .put ("sinkFormat" , "csv" );
855+ args .put ("contentType" , CSV_CONTENT_TYPE );
856+ startWorkFlow (appManager , ProgramRunStatus .COMPLETED , args );
857+
858+ List <String > multisinkContentType1 = getResultBlobsContentType (bucket , OUTPUT_BLOB_NAME + "/DepartmentA" );
859+ List <String > multisinkContentType2 = getResultBlobsContentType (bucket , OUTPUT_BLOB_NAME + "/DepartmentB" );
860+ Assert .assertEquals (CSV_CONTENT_TYPE , multisinkContentType1 .get (0 ));
861+ Assert .assertEquals (CSV_CONTENT_TYPE , multisinkContentType2 .get (0 ));
862+
790863 }
791864
792865 private ETLStage createSourceStage (String format , String path , String regex , Schema schema ) {
@@ -803,14 +876,32 @@ private ETLStage createSourceStage(String format, String path, String regex, Sch
803876 }
804877
805878 private ETLPlugin createSinkPlugin (String format , String path , Schema schema ) {
806- return new ETLPlugin (SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE ,
807- ImmutableMap .of (
808- "path" , path ,
809- "format" , format ,
810- "project" , getProjectId (),
811- "referenceName" , format ,
812- "schema" , schema .toString ()),
813- GOOGLE_CLOUD_ARTIFACT );
879+ return createSinkPlugin (format , path , schema , null );
880+ }
881+
882+ private ETLPlugin createSinkPlugin (String format , String path , Schema schema , String contentType ) {
883+ ImmutableMap .Builder <String , String > propertyBuilder = new ImmutableMap .Builder <String , String >()
884+ .put ("path" , path )
885+ .put ("format" , format )
886+ .put ("project" , getProjectId ())
887+ .put ("referenceName" , format )
888+ .put ("schema" , schema .toString ());
889+ if (!Strings .isNullOrEmpty (contentType )) {
890+ propertyBuilder .put ("contentType" , contentType );
891+ }
892+ return new ETLPlugin (SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE , propertyBuilder .build (), GOOGLE_CLOUD_ARTIFACT );
893+ }
894+
895+ private ETLPlugin createMultiSinkPlugin (String path , Schema schema , String splitField ) {
896+ Map <String , String > map = new HashMap <>();
897+ map .put ("path" , path );
898+ map .put ("format" , "${sinkFormat}" );
899+ map .put ("project" , getProjectId ());
900+ map .put ("schema" , schema .toString ());
901+ map .put ("referenceName" , "gcs-multi-input" );
902+ map .put ("splitField" , splitField );
903+ map .put ("contentType" , "${contentType}" );
904+ return new ETLPlugin (MULTI_SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE , map , GOOGLE_CLOUD_ARTIFACT );
814905 }
815906
816907 static class DataTypesRecord {
@@ -881,4 +972,19 @@ private static String blobContentToString(Blob blob) {
881972 return null ;
882973 }
883974
975+ /**
976+ * Reads content type of files in path
977+ */
978+ private List <String > getResultBlobsContentType (Bucket bucket , String path ) {
979+ String successFile = path + "/_SUCCESS" ;
980+ assertExists (bucket , successFile );
981+
982+ return StreamSupport .stream (bucket .list ().iterateAll ().spliterator (), false )
983+ .filter (blob -> blob .getName ().startsWith (path + "/" )
984+ && !successFile .equals (blob .getName ()) && !blob .getName ().endsWith ("/" ))
985+ .map (BlobInfo ::getContentType )
986+ .filter (Objects ::nonNull )
987+ .collect (Collectors .toList ());
988+ }
989+
884990}
0 commit comments