1818
1919import com .google .auth .oauth2 .GoogleCredentials ;
2020import com .google .cloud .storage .Blob ;
21+ import com .google .cloud .storage .BlobInfo ;
2122import com .google .cloud .storage .Bucket ;
2223import com .google .cloud .storage .BucketInfo ;
2324import com .google .cloud .storage .Storage ;
2425import com .google .cloud .storage .StorageOptions ;
2526import com .google .common .base .Preconditions ;
27+ import com .google .common .base .Strings ;
2628import com .google .common .collect .ImmutableMap ;
2729import com .google .common .io .Files ;
2830import com .google .gson .Gson ;
7779import java .util .concurrent .TimeUnit ;
7880import java .util .stream .Collectors ;
7981import java .util .stream .StreamSupport ;
82+ import javax .annotation .Nullable ;
8083
8184/**
8285 * Tests reading from GCS (Google Cloud Storage) and writing to GCS from within a Dataproc cluster.
86+ *
87+ * TODO: PLUGIN-553 for GCSMultiFiles sink plugin property 'format' with macro wouldn't work
8388 */
8489public class GCSTest extends DataprocETLTestBase {
8590
@@ -90,6 +95,7 @@ public class GCSTest extends DataprocETLTestBase {
9095 private static final String GCS_MOVE_PLUGIN_NAME = "GCSMove" ;
9196 private static final String GCS_COPY_PLUGIN_NAME = "GCSCopy" ;
9297 private static final String SINK_PLUGIN_NAME = "GCS" ;
98+ private static final String MULTI_SINK_PLUGIN_NAME = "GCSMultiFiles" ;
9399 private static final String SOURCE_PLUGIN_NAME = "GCSFile" ;
94100 private static final Schema ALL_DT_SCHEMA = Schema .recordOf (
95101 "record" ,
@@ -118,6 +124,8 @@ public class GCSTest extends DataprocETLTestBase {
118124
119125 private static Storage storage ;
120126 private List <String > markedForDeleteBuckets ;
127+ private static final String CSV_CONTENT_TYPE = "text/csv" ;
128+ private static final String MULTISINK_RUNTIME_ARG = "multisink.%s" ;
121129
122130 @ BeforeClass
123131 public static void testClassSetup () throws IOException {
@@ -187,6 +195,10 @@ private String createPath(Bucket bucket, String blobName) {
187195 return String .format ("gs://%s/%s" , bucket .getName (), blobName );
188196 }
189197
198+ private void insertData (Bucket bucket , String inputPath , String ... data ) {
199+ bucket .create (inputPath , String .join ("\n " , Arrays .asList (data )).getBytes (StandardCharsets .UTF_8 ));
200+ }
201+
190202 @ Test
191203 public void testGCSCopy () throws Exception {
192204 String prefix = "cdap-gcs-cp-test" ;
@@ -707,7 +719,7 @@ public void testGcsSourceFormats() throws Exception {
707719 String line2 = "2,Terry Perez,tperez1@example.com" ;
708720 String line3 = "3,Jack Ferguson,jferguson2@example.com" ;
709721 String inputPath = "input" ;
710- bucket . create ( inputPath , String . join ( " \n " , Arrays . asList ( line1 , line2 , line3 )). getBytes ( StandardCharsets . UTF_8 ) );
722+ insertData ( bucket , inputPath , line1 , line2 , line3 );
711723
712724 String suffix = UUID .randomUUID ().toString ();
713725 /*
@@ -761,7 +773,8 @@ public void testGcsSourceFormats() throws Exception {
761773 id,first,last,email,address,city,state,zip
762774 1,Marilyn,Hawkins,mhawkins0@ted.com,238 Melvin Way,Palo Alto,CA,94302
763775 */
764- ETLStage sink = new ETLStage ("sink" , createSinkPlugin ("csv" , createPath (bucket , "output" ), schema ));
776+ ETLStage sink = new ETLStage ("sink" , createSinkPlugin ("csv" , createPath (bucket , "output" ),
777+ schema , CSV_CONTENT_TYPE ));
765778 pipelineConfig = ETLBatchConfig .builder ().addStage (sink );
766779 for (String format : formats ) {
767780 String path = String .format ("%s/%s" , createPath (bucket , OUTPUT_BLOB_NAME ), format );
@@ -776,6 +789,7 @@ public void testGcsSourceFormats() throws Exception {
776789
777790 Map <String , Integer > lineCounts = new HashMap <>();
778791 List <String > results = getResultBlobsContent (bucket , "output" );
792+ List <String > resultBlobsContentType = getResultBlobsContentType (bucket , "output" );
779793 for (String result : results ) {
780794 for (String line : result .split ("\n " )) {
781795 lineCounts .putIfAbsent (line , 0 );
@@ -787,6 +801,71 @@ public void testGcsSourceFormats() throws Exception {
787801 expected .put (line2 , formats .size ());
788802 expected .put (line3 , formats .size ());
789803 Assert .assertEquals (expected , lineCounts );
804+ Assert .assertEquals (CSV_CONTENT_TYPE , resultBlobsContentType .get (0 ));
805+ }
806+
807+ @ Test
808+ public void testMultiSinkContentType () throws Exception {
809+ String bucketName = "cask-gcs-multisink-" + UUID .randomUUID ().toString ();
810+ Bucket bucket = createBucket (bucketName );
811+
812+ Schema schema = Schema .recordOf ("customer" ,
813+ Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
814+ Schema .Field .of ("name" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
815+ Schema .Field .of ("email" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
816+ Schema .Field .of ("departament" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))));
817+
818+ Schema outputSchema = Schema .recordOf ("output.schema" ,
819+ Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
820+ Schema .Field .of ("name" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
821+ Schema .Field .of ("email" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))));
822+
823+ String line1 = "1,Marilyn Hawkins,mhawkins0@example.com,DepartmentA" ;
824+ String line2 = "2,Terry Perez,tperez1@example.com,DepartmentB" ;
825+ String line3 = "3,Jack Ferguson,jferguson2@example.com,DepartmentA" ;
826+ String inputPath = "input" ;
827+ insertData (bucket , inputPath , line1 , line2 , line3 );
828+
829+ Map <String , String > inputSourceConfig = new HashMap <>();
830+ inputSourceConfig .put ("schema" , schema .toString ());
831+ inputSourceConfig .put ("format" , "${sourceFormat}" );
832+ inputSourceConfig .put ("referenceName" , "source_" + UUID .randomUUID ().toString ());
833+ inputSourceConfig .put ("project" , getProjectId ());
834+ inputSourceConfig .put ("path" , createPath (bucket , inputPath ));
835+ ETLStage source = new ETLStage ("source" ,
836+ new ETLPlugin (SOURCE_PLUGIN_NAME ,
837+ BatchSource .PLUGIN_TYPE ,
838+ inputSourceConfig ,
839+ GOOGLE_CLOUD_ARTIFACT ));
840+
841+ ETLBatchConfig .Builder pipelineConfig = ETLBatchConfig .builder ().addStage (source );
842+
843+ String path = createPath (bucket , OUTPUT_BLOB_NAME );
844+ ETLStage sink = new ETLStage ("multsink" , createMultiSinkPlugin ("csv" ));
845+ pipelineConfig .addStage (sink ).addConnection (source .getName (), sink .getName ());
846+
847+ AppRequest <ETLBatchConfig > appRequest = getBatchAppRequestV2 (pipelineConfig .build ());
848+ ApplicationId appId = TEST_NAMESPACE .app ("GCSMultiSinkContentType" );
849+ ApplicationManager appManager = deployApplication (appId , appRequest );
850+
851+ String multisink1 = String .format (MULTISINK_RUNTIME_ARG , "DepartmentA" );
852+ String multisink2 = String .format (MULTISINK_RUNTIME_ARG , "DepartmentB" );
853+ Map <String , String > args = new HashMap <>();
854+ args .put (multisink1 , outputSchema .toString ());
855+ args .put (multisink2 , outputSchema .toString ());
856+ args .put ("sourceFormat" , "csv" );
857+ args .put ("multiSinkPath" , path );
858+ args .put ("multiSinkProjectId" , getProjectId ());
859+ args .put ("multiSinkSchema" , schema .toString ());
860+ args .put ("multiSinkSplitField" , "departament" );
861+ args .put ("contentType" , CSV_CONTENT_TYPE );
862+ startWorkFlow (appManager , ProgramRunStatus .COMPLETED , args );
863+
864+ List <String > multisinkContentType1 = getResultBlobsContentType (bucket , OUTPUT_BLOB_NAME + "/DepartmentA" );
865+ List <String > multisinkContentType2 = getResultBlobsContentType (bucket , OUTPUT_BLOB_NAME + "/DepartmentB" );
866+ Assert .assertEquals (CSV_CONTENT_TYPE , multisinkContentType1 .get (0 ));
867+ Assert .assertEquals (CSV_CONTENT_TYPE , multisinkContentType2 .get (0 ));
868+
790869 }
791870
792871 private ETLStage createSourceStage (String format , String path , String regex , Schema schema ) {
@@ -803,14 +882,32 @@ private ETLStage createSourceStage(String format, String path, String regex, Sch
803882 }
804883
805884 private ETLPlugin createSinkPlugin (String format , String path , Schema schema ) {
806- return new ETLPlugin (SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE ,
807- ImmutableMap .of (
808- "path" , path ,
809- "format" , format ,
810- "project" , getProjectId (),
811- "referenceName" , format ,
812- "schema" , schema .toString ()),
813- GOOGLE_CLOUD_ARTIFACT );
885+ return createSinkPlugin (format , path , schema , null );
886+ }
887+
888+ private ETLPlugin createSinkPlugin (String format , String path , Schema schema ,@ Nullable String contentType ) {
889+ ImmutableMap .Builder <String , String > propertyBuilder = new ImmutableMap .Builder <String , String >()
890+ .put ("path" , path )
891+ .put ("format" , format )
892+ .put ("project" , getProjectId ())
893+ .put ("referenceName" , format )
894+ .put ("schema" , schema .toString ());
895+ if (!Strings .isNullOrEmpty (contentType )) {
896+ propertyBuilder .put ("contentType" , contentType );
897+ }
898+ return new ETLPlugin (SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE , propertyBuilder .build (), GOOGLE_CLOUD_ARTIFACT );
899+ }
900+
901+ private ETLPlugin createMultiSinkPlugin (String sinkFormat ) {
902+ Map <String , String > map = new HashMap <>();
903+ map .put ("path" , "${multiSinkPath}" );
904+ map .put ("format" , sinkFormat );
905+ map .put ("project" , "${multiSinkProjectId}" );
906+ map .put ("schema" , "${multiSinkSchema}" );
907+ map .put ("referenceName" , "gcs-multi-input" );
908+ map .put ("splitField" , "${multiSinkSplitField}" );
909+ map .put ("contentType" , "${contentType}" );
910+ return new ETLPlugin (MULTI_SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE , map , GOOGLE_CLOUD_ARTIFACT );
814911 }
815912
816913 static class DataTypesRecord {
@@ -881,4 +978,19 @@ private static String blobContentToString(Blob blob) {
881978 return null ;
882979 }
883980
981+ /**
982+ * Reads content type of files in path
983+ */
984+ private List <String > getResultBlobsContentType (Bucket bucket , String path ) {
985+ String successFile = path + "/_SUCCESS" ;
986+ assertExists (bucket , successFile );
987+
988+ return StreamSupport .stream (bucket .list ().iterateAll ().spliterator (), false )
989+ .filter (blob -> blob .getName ().startsWith (path + "/" )
990+ && !successFile .equals (blob .getName ()) && !blob .getName ().endsWith ("/" ))
991+ .map (BlobInfo ::getContentType )
992+ .filter (Objects ::nonNull )
993+ .collect (Collectors .toList ());
994+ }
995+
884996}
0 commit comments