1818
1919import com .google .auth .oauth2 .GoogleCredentials ;
2020import com .google .cloud .storage .Blob ;
21+ import com .google .cloud .storage .BlobInfo ;
2122import com .google .cloud .storage .Bucket ;
2223import com .google .cloud .storage .BucketInfo ;
2324import com .google .cloud .storage .Storage ;
2425import com .google .cloud .storage .StorageOptions ;
2526import com .google .common .base .Preconditions ;
27+ import com .google .common .base .Strings ;
2628import com .google .common .collect .ImmutableMap ;
2729import com .google .common .io .Files ;
2830import com .google .gson .Gson ;
7779import java .util .concurrent .TimeUnit ;
7880import java .util .stream .Collectors ;
7981import java .util .stream .StreamSupport ;
82+ import javax .annotation .Nullable ;
8083
8184/**
8285 * Tests reading from GCS (Google Cloud Storage) and writing to GCS from within a Dataproc cluster.
@@ -90,6 +93,7 @@ public class GCSTest extends DataprocETLTestBase {
9093 private static final String GCS_MOVE_PLUGIN_NAME = "GCSMove" ;
9194 private static final String GCS_COPY_PLUGIN_NAME = "GCSCopy" ;
9295 private static final String SINK_PLUGIN_NAME = "GCS" ;
96+ private static final String MULTI_SINK_PLUGIN_NAME = "GCSMultiFiles" ;
9397 private static final String SOURCE_PLUGIN_NAME = "GCSFile" ;
9498 private static final Schema ALL_DT_SCHEMA = Schema .recordOf (
9599 "record" ,
@@ -118,6 +122,8 @@ public class GCSTest extends DataprocETLTestBase {
118122
119123 private static Storage storage ;
120124 private List <String > markedForDeleteBuckets ;
125+ private static final String CSV_CONTENT_TYPE = "text/csv" ;
126+ private static final String MULTISINK_RUNTIME_ARG = "multisink.%s" ;
121127
122128 @ BeforeClass
123129 public static void testClassSetup () throws IOException {
@@ -761,7 +767,8 @@ public void testGcsSourceFormats() throws Exception {
761767 id,first,last,email,address,city,state,zip
762768 1,Marilyn,Hawkins,mhawkins0@ted.com,238 Melvin Way,Palo Alto,CA,94302
763769 */
764- ETLStage sink = new ETLStage ("sink" , createSinkPlugin ("csv" , createPath (bucket , "output" ), schema ));
770+ ETLStage sink = new ETLStage ("sink" , createSinkPlugin ("csv" , createPath (bucket , "output" ),
771+ schema , CSV_CONTENT_TYPE ));
765772 pipelineConfig = ETLBatchConfig .builder ().addStage (sink );
766773 for (String format : formats ) {
767774 String path = String .format ("%s/%s" , createPath (bucket , OUTPUT_BLOB_NAME ), format );
@@ -776,6 +783,7 @@ public void testGcsSourceFormats() throws Exception {
776783
777784 Map <String , Integer > lineCounts = new HashMap <>();
778785 List <String > results = getResultBlobsContent (bucket , "output" );
786+ List <String > resultBlobsContentType = getResultBlobsContentType (bucket , "output" );
779787 for (String result : results ) {
780788 for (String line : result .split ("\n " )) {
781789 lineCounts .putIfAbsent (line , 0 );
@@ -787,6 +795,72 @@ public void testGcsSourceFormats() throws Exception {
787795 expected .put (line2 , formats .size ());
788796 expected .put (line3 , formats .size ());
789797 Assert .assertEquals (expected , lineCounts );
798+ Assert .assertEquals (CSV_CONTENT_TYPE , resultBlobsContentType .get (0 ));
799+ }
800+
801+ @ Test
802+ public void testMultiSinkContentType () throws Exception {
803+ String bucketName = "cask-gcs-multisink-" + UUID .randomUUID ().toString ();
804+ Bucket bucket = createBucket (bucketName );
805+
806+ Schema schema = Schema .recordOf ("customer" ,
807+ Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
808+ Schema .Field .of ("name" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
809+ Schema .Field .of ("email" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
810+ Schema .Field .of ("departament" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))));
811+
812+ Schema outputSchema = Schema .recordOf ("output.schema" ,
813+ Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
814+ Schema .Field .of ("name" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
815+ Schema .Field .of ("email" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))));
816+
817+ String line1 = "1,Marilyn Hawkins,mhawkins0@example.com,DepartmentA" ;
818+ String line2 = "2,Terry Perez,tperez1@example.com,DepartmentB" ;
819+ String line3 = "3,Jack Ferguson,jferguson2@example.com,DepartmentA" ;
820+ String inputPath = "input" ;
821+
822+ bucket .create (inputPath , String .join ("\n " , Arrays .asList (line1 , line2 , line3 )).getBytes (StandardCharsets .UTF_8 ));
823+
824+ Map <String , String > inputSourceConfig = new HashMap <>();
825+ inputSourceConfig .put ("schema" , schema .toString ());
826+ inputSourceConfig .put ("format" , "${sourceFormat}" );
827+ inputSourceConfig .put ("referenceName" , "source_" + UUID .randomUUID ().toString ());
828+ inputSourceConfig .put ("project" , getProjectId ());
829+ inputSourceConfig .put ("path" , createPath (bucket , inputPath ));
830+ ETLStage source = new ETLStage ("source" ,
831+ new ETLPlugin (SOURCE_PLUGIN_NAME ,
832+ BatchSource .PLUGIN_TYPE ,
833+ inputSourceConfig ,
834+ GOOGLE_CLOUD_ARTIFACT ));
835+
836+ ETLBatchConfig .Builder pipelineConfig = ETLBatchConfig .builder ().addStage (source );
837+
838+ String path = createPath (bucket , OUTPUT_BLOB_NAME );
839+ ETLStage sink = new ETLStage ("multsink" , createMultiSinkPlugin ("csv" ));
840+ pipelineConfig .addStage (sink ).addConnection (source .getName (), sink .getName ());
841+
842+ AppRequest <ETLBatchConfig > appRequest = getBatchAppRequestV2 (pipelineConfig .build ());
843+ ApplicationId appId = TEST_NAMESPACE .app ("GCSMultiSinkContentType" );
844+ ApplicationManager appManager = deployApplication (appId , appRequest );
845+
846+ String multisink1 = String .format (MULTISINK_RUNTIME_ARG , "DepartmentA" );
847+ String multisink2 = String .format (MULTISINK_RUNTIME_ARG , "DepartmentB" );
848+ Map <String , String > args = new HashMap <>();
849+ args .put (multisink1 , outputSchema .toString ());
850+ args .put (multisink2 , outputSchema .toString ());
851+ args .put ("sourceFormat" , "csv" );
852+ args .put ("multiSinkPath" , path );
853+ args .put ("multiSinkProjectId" , getProjectId ());
854+ args .put ("multiSinkSchema" , schema .toString ());
855+ args .put ("multiSinkSplitField" , "departament" );
856+ args .put ("contentType" , CSV_CONTENT_TYPE );
857+ startWorkFlow (appManager , ProgramRunStatus .COMPLETED , args );
858+
859+ List <String > multisinkContentType1 = getResultBlobsContentType (bucket , OUTPUT_BLOB_NAME + "/DepartmentA" );
860+ List <String > multisinkContentType2 = getResultBlobsContentType (bucket , OUTPUT_BLOB_NAME + "/DepartmentB" );
861+ Assert .assertEquals (CSV_CONTENT_TYPE , multisinkContentType1 .get (0 ));
862+ Assert .assertEquals (CSV_CONTENT_TYPE , multisinkContentType2 .get (0 ));
863+
790864 }
791865
792866 private ETLStage createSourceStage (String format , String path , String regex , Schema schema ) {
@@ -803,14 +877,34 @@ private ETLStage createSourceStage(String format, String path, String regex, Sch
803877 }
804878
805879 private ETLPlugin createSinkPlugin (String format , String path , Schema schema ) {
806- return new ETLPlugin (SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE ,
807- ImmutableMap .of (
808- "path" , path ,
809- "format" , format ,
810- "project" , getProjectId (),
811- "referenceName" , format ,
812- "schema" , schema .toString ()),
813- GOOGLE_CLOUD_ARTIFACT );
880+ return createSinkPlugin (format , path , schema , null );
881+ }
882+
883+ private ETLPlugin createSinkPlugin (String format , String path , Schema schema ,@ Nullable String contentType ) {
884+ ImmutableMap .Builder <String , String > propertyBuilder = new ImmutableMap .Builder <String , String >()
885+ .put ("path" , path )
886+ .put ("format" , format )
887+ .put ("project" , getProjectId ())
888+ .put ("referenceName" , format )
889+ .put ("schema" , schema .toString ());
890+ if (!Strings .isNullOrEmpty (contentType )) {
891+ propertyBuilder .put ("contentType" , contentType );
892+ }
893+ return new ETLPlugin (SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE , propertyBuilder .build (), GOOGLE_CLOUD_ARTIFACT );
894+ }
895+
896+ private ETLPlugin createMultiSinkPlugin (String sinkFormat ) {
897+ Map <String , String > map = new HashMap <>();
898+ map .put ("path" , "${multiSinkPath}" );
899+ map .put ("format" , sinkFormat );
900+ //todo make macro when https://cdap.atlassian.net/browse/PLUGIN-553 is fixed
901+ //map.put("format", "${sinkFormat}");
902+ map .put ("project" , "${multiSinkProjectId}" );
903+ map .put ("schema" , "${multiSinkSchema}" );
904+ map .put ("referenceName" , "gcs-multi-input" );
905+ map .put ("splitField" , "${multiSinkSplitField}" );
906+ map .put ("contentType" , "${contentType}" );
907+ return new ETLPlugin (MULTI_SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE , map , GOOGLE_CLOUD_ARTIFACT );
814908 }
815909
816910 static class DataTypesRecord {
@@ -881,4 +975,19 @@ private static String blobContentToString(Blob blob) {
881975 return null ;
882976 }
883977
978+ /**
979+ * Reads content type of files in path
980+ */
981+ private List <String > getResultBlobsContentType (Bucket bucket , String path ) {
982+ String successFile = path + "/_SUCCESS" ;
983+ assertExists (bucket , successFile );
984+
985+ return StreamSupport .stream (bucket .list ().iterateAll ().spliterator (), false )
986+ .filter (blob -> blob .getName ().startsWith (path + "/" )
987+ && !successFile .equals (blob .getName ()) && !blob .getName ().endsWith ("/" ))
988+ .map (BlobInfo ::getContentType )
989+ .filter (Objects ::nonNull )
990+ .collect (Collectors .toList ());
991+ }
992+
884993}
0 commit comments