1616
1717package io .cdap .cdap .app .etl .gcp ;
1818
19+ import com .google .cloud .bigquery .BigQuery ;
20+ import com .google .cloud .bigquery .DatasetInfo ;
1921import com .google .common .base .Joiner ;
20- import com .google .common .collect .ImmutableList ;
2122import com .google .common .collect .ImmutableMap ;
2223import com .google .common .collect .ImmutableSet ;
2324import com .google .common .collect .Lists ;
6667import org .apache .avro .generic .GenericRecordBuilder ;
6768import org .apache .avro .io .DatumReader ;
6869import org .junit .Assert ;
70+ import org .junit .BeforeClass ;
6971import org .junit .Test ;
7072import org .junit .experimental .categories .Category ;
73+ import org .slf4j .Logger ;
74+ import org .slf4j .LoggerFactory ;
7175
7276import java .io .ByteArrayInputStream ;
7377import java .io .IOException ;
7478import java .net .HttpURLConnection ;
7579import java .net .URL ;
80+ import java .time .LocalDateTime ;
81+ import java .time .format .DateTimeFormatter ;
7682import java .util .ArrayList ;
7783import java .util .HashMap ;
7884import java .util .HashSet ;
8692 */
8793public class GoogleBigQuerySQLEngineTest extends DataprocETLTestBase {
8894
95+ private static final Logger LOG = LoggerFactory .getLogger (GoogleBigQuerySQLEngineTest .class );
8996 private static final String BQ_SQLENGINE_PLUGIN_NAME = "BigQueryPushdownEngine" ;
90- private static final String BIG_QUERY_DATASET = "bq_dataset_joiner_test " ;
97+ private static final String BIG_QUERY_DATASET_PREFIX = "bq_pd_ds_ " ;
9198 private static final String CONNECTION_NAME = String .format ("test_bq_%s" , GoogleBigQueryUtils .getUUID ());
9299 public static final String PURCHASE_SOURCE = "purchaseSource" ;
93100 public static final String ITEM_SINK = "itemSink" ;
94101 public static final String USER_SINK = "userSink" ;
95102 public static final String DEDUPLICATE_SOURCE = "userSource" ;
96103 public static final String DEDUPLICATE_SINK = "userSink" ;
104+ public static final long MILLISECONDS_IN_A_DAY = 24 * 60 * 60 * 1000 ;
105+ public static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter .ofPattern ("yyyy_MM_dd_HH_mm_ss_SSS" );
97106
107+ private static BigQuery bq ;
108+ private String bigQueryDataset ;
109+
110+
98111 private static final Map <String , String > CONFIG_MAP = new ImmutableMap .Builder <String , String >()
99112 .put ("uniqueFields" , "profession" )
100113 .put ("filterOperation" , "age:Min" )
101114 .build ();
102115
103- private static final List <String > CONDITIONAL_AGGREGATES = ImmutableList .of (
104- "highestPrice:maxIf(price):condition(city.equals('LA'))" ,
105- "averageDonutPrice:avgIf(price):condition(item.equals('doughnut'))" ,
106- "totalPurchasesInTokyo:sumIf(price):condition(city.equals('Tokyo'))" ,
107- "anyPurchaseInBerlin:anyIf(item):condition(city.equals('Berlin'))" ,
108- "doughnutsSold:countIf(item):condition(item.equals('doughnut'))" ,
109- "lowestPrice:minIf(price):condition(!item.equals('bagel'))"
110- );
111-
112116 public static final Schema PURCHASE_SCHEMA = Schema .recordOf (
113117 "purchase" ,
114118 Schema .Field .of ("ts" , Schema .of (Schema .Type .LONG )),
@@ -128,6 +132,11 @@ public class GoogleBigQuerySQLEngineTest extends DataprocETLTestBase {
128132 Schema .Field .of ("totalPurchases" , Schema .of (Schema .Type .LONG )),
129133 Schema .Field .of ("totalSpent" , Schema .of (Schema .Type .LONG )));
130134
135+ @ BeforeClass
136+ public static void testClassSetup () throws IOException {
137+ bq = GoogleBigQueryUtils .getBigQuery (getProjectId (), getServiceAccountCredentials ());
138+ }
139+
131140 @ Override
132141 protected void innerSetup () throws Exception {
133142 Tasks .waitFor (true , () -> {
@@ -140,10 +149,13 @@ protected void innerSetup() throws Exception {
140149 }
141150 }, 5 , TimeUnit .MINUTES , 3 , TimeUnit .SECONDS );
142151 createConnection (CONNECTION_NAME , "BigQuery" );
152+ bigQueryDataset = BIG_QUERY_DATASET_PREFIX + LocalDateTime .now ().format (DATE_TIME_FORMAT );
153+ createDataset (bigQueryDataset );
143154 }
144155
145156 @ Override
146157 protected void innerTearDown () throws Exception {
158+ deleteDataset (bigQueryDataset );
147159 deleteConnection (CONNECTION_NAME );
148160 }
149161
@@ -181,7 +193,8 @@ private Map<String, String> getProps(boolean useConnection, String includedStage
181193 props .put (ConfigUtil .NAME_CONNECTION , connectionId );
182194 props .put (ConfigUtil .NAME_USE_CONNECTION , "true" );
183195 }
184- props .put ("dataset" , BIG_QUERY_DATASET );
196+ props .put ("dataset" , bigQueryDataset );
197+ props .put ("retainTables" , "true" );
185198 if (includedStages != null ) {
186199 props .put ("includedStages" , includedStages );
187200 }
@@ -540,35 +553,6 @@ private void testSQLEngineGroupBy(Engine engine, boolean useConnection) throws E
540553
541554 verifyOutput (groupedUsers , groupedItems );
542555 }
543-
544- private void ingestConditionData (String conditionDatasetName ) throws Exception {
545- DataSetManager <Table > manager = getTableDataset (conditionDatasetName );
546- Table table = manager .get ();
547- putConditionValues (table , 1 , "Ben" , 23 , true , "Berlin" , "doughnut" , 1.5 );
548- putConditionValues (table , 2 , "Ben" , 23 , true , "LA" , "pretzel" , 2.05 );
549- putConditionValues (table , 3 , "Ben" , 23 , true , "Berlin" , "doughnut" , 0.75 );
550- putConditionValues (table , 4 , "Ben" , 23 , true , "Tokyo" , "pastry" , 3.25 );
551- putConditionValues (table , 5 , "Emma" , 18 , false , "Tokyo" , "doughnut" , 1.75 );
552- putConditionValues (table , 6 , "Emma" , 18 , false , "LA" , "bagel" , 2.95 );
553- putConditionValues (table , 7 , "Emma" , 18 , false , "Berlin" , "pretzel" , 2.05 );
554- putConditionValues (table , 8 , "Ron" , 22 , true , "LA" , "bagel" , 2.95 );
555- putConditionValues (table , 9 , "Ron" , 22 , true , "Tokyo" , "pretzel" , 0.5 );
556- putConditionValues (table , 10 , "Ron" , 22 , true , "Berlin" , "doughnut" , 1.75 );
557-
558- manager .flush ();
559- }
560-
561- private void putConditionValues (Table table , int id , String name , double age , boolean isMember , String city ,
562- String item , double price ) {
563- Put put = new Put (Bytes .toBytes (id ));
564- put .add ("name" , name );
565- put .add ("age" , age );
566- put .add ("isMember" , isMember );
567- put .add ("city" , city );
568- put .add ("item" , item );
569- put .add ("price" , price );
570- table .put (put );
571- }
572556
573557 private Map <String , List <Long >> readOutputGroupBy (ServiceManager serviceManager , String sink , Schema schema )
574558 throws IOException {
@@ -806,4 +790,23 @@ private void stopServiceForDataset(String datasetName) throws Exception {
806790 .getServiceManager (AbstractDatasetApp .DatasetService .class .getSimpleName ())
807791 .stop ();
808792 }
793+
794+ private static void createDataset (String bigQueryDataset ) {
795+ LOG .info ("Creating bigquery dataset {}" , bigQueryDataset );
796+ // Create dataset with a default table expiration of 24 hours.
797+ DatasetInfo datasetInfo = DatasetInfo .newBuilder (bigQueryDataset )
798+ .setDefaultTableLifetime (MILLISECONDS_IN_A_DAY )
799+ .setDefaultPartitionExpirationMs (MILLISECONDS_IN_A_DAY )
800+ .build ();
801+ bq .create (datasetInfo );
802+ LOG .info ("Created bigquery dataset {}" , bigQueryDataset );
803+ }
804+
805+ private static void deleteDataset (String bigQueryDataset ) {
806+ LOG .info ("Deleting bigquery dataset {}" , bigQueryDataset );
807+ boolean deleted = bq .delete (bigQueryDataset , BigQuery .DatasetDeleteOption .deleteContents ());
808+ if (deleted ) {
809+ LOG .info ("Deleted bigquery dataset {}" , bigQueryDataset );
810+ }
811+ }
809812}
0 commit comments