11package com .datastax .cdm ;
22
3+ import com .datastax .driver .core .*;
34import com .fasterxml .jackson .core .type .TypeReference ;
45import com .fasterxml .jackson .databind .ObjectMapper ;
56import com .fasterxml .jackson .dataformat .yaml .YAMLFactory ;
7+ import com .google .common .util .concurrent .ListenableFuture ;
68import org .apache .commons .cli .*;
9+ import org .apache .commons .csv .CSVFormat ;
10+ import org .apache .commons .csv .CSVParser ;
11+ import org .apache .commons .csv .CSVRecord ;
712import org .apache .commons .io .FileUtils ;
813import org .eclipse .jgit .api .Git ;
914import org .eclipse .jgit .api .errors .GitAPIException ;
1015import org .eclipse .jgit .api .errors .InvalidRemoteException ;
1116import org .eclipse .jgit .api .errors .TransportException ;
17+ import org .eclipse .jgit .events .ListenerHandle ;
18+
19+
1220import java .lang .StringBuilder ;
1321
1422//import com.datastax.loader.CqlDelimLoadTask;
1523
1624import java .io .*;
17- import java .net .MalformedURLException ;
1825import java .net .URL ;
19- import java .util .Map ;
26+ import java .nio .charset .Charset ;
27+ import java .util .*;
2028
2129/**
2230 * Created by jhaddad on 6/29/16.
2331 */
2432
33+
2534public class CassandraDatasetManager {
2635
36+ public class InvalidArgsException extends Exception {
37+
38+ }
2739 private static final String YAML_URI = "https://raw.githubusercontent.com/riptano/cdm-java/master/datasets.yaml" ;
2840 private Map <String , Dataset > datasets ;
41+ private Session session ;
42+ private String cassandraContactPoint ;
43+
44+ CassandraDatasetManager () {
45+
46+ }
2947
3048 CassandraDatasetManager (Map <String , Dataset > datasets ) {
3149 this .datasets = datasets ;
@@ -38,11 +56,9 @@ public static void main(String[] args) throws IOException, ParseException, Inter
3856
3957 // check for the .cdm directory
4058 String home_dir = System .getProperty ("user.home" );
41- // System.out.println(home_dir);
4259 String cdm_path = home_dir + "/.cdm" ;
4360
4461 File f = new File (cdm_path );
45- // System.out.println(f);
4662
4763 f .mkdir ();
4864
@@ -53,8 +69,6 @@ public static void main(String[] args) throws IOException, ParseException, Inter
5369 FileUtils .copyURLToFile (y , yaml );
5470 }
5571 // read in the YAML dataset list
56- // System.out.println("Loading Configuration YAML");
57-
5872 ObjectMapper mapper = new ObjectMapper (new YAMLFactory ());
5973
6074 // why extra work? Java Type Erasure will prevent type detection otherwise
@@ -63,7 +77,6 @@ public static void main(String[] args) throws IOException, ParseException, Inter
6377 // debug: show all datasets no matter what
6478 CassandraDatasetManager cdm = new CassandraDatasetManager (data );
6579
66-
6780 // parse the CLI options
6881 Options options = new Options ();
6982
@@ -80,22 +93,27 @@ public static void main(String[] args) throws IOException, ParseException, Inter
8093 return ;
8194 }
8295
83- if (args [0 ].equals ("install" )) {
84- cdm .install (args [1 ]);
85- } else if (args [0 ].equals ("list" )) {
86- cdm .list ();
87- } else if (args [0 ].equals ("new" )) {
88- cdm .new_dataset (args [1 ]);
89- } else if (args [0 ].equals ("dump" )) {
90- cdm .dump ();
91- } else if (args [0 ].equals ("update" )) {
92- cdm .update ();
93- } else {
94- System .out .println ("Not sure what to do." );
95- }
96-
97- // load data using cqlsh for now
96+ // connect to the cluster via the driver
97+ switch (args [0 ]) {
98+ case "install" :
99+ cdm .install (args [1 ]);
100+ break ;
101+ case "list" :
102+ cdm .list ();
103+ break ;
104+ case "new" :
105+ cdm .new_dataset (args [1 ]);
106+ break ;
107+ case "dump" :
108+ cdm .dump ();
109+ break ;
110+ case "update" :
111+ cdm .update ();
112+ break ;
113+ default :
114+ System .out .println ("Not sure what to do." );
98115
116+ }
99117 System .out .println ("Finished." );
100118 }
101119
@@ -106,13 +124,13 @@ private void dump() throws IOException, InterruptedException {
106124 for (String table : config .tables ) {
107125 StringBuilder command = new StringBuilder ();
108126 command .append ("cqlsh -k " )
109- .append (config .keyspace )
110- .append (" -e \" " )
111- .append ("COPY " )
112- .append (table )
113- .append (" TO 'data/" )
114- .append (table )
115- .append (".csv'\" " );
127+ .append (config .keyspace )
128+ .append (" -e \" " )
129+ .append ("COPY " )
130+ .append (table )
131+ .append (" TO 'data/" )
132+ .append (table )
133+ .append (".csv'\" " );
116134 System .out .println (command );
117135 Runtime .getRuntime ().exec (new String []{"bash" , "-c" , command .toString ()}).waitFor ();
118136 }
@@ -205,33 +223,142 @@ void install(String name) throws IOException, InterruptedException, GitAPIExcept
205223 ObjectMapper mapper = new ObjectMapper (new YAMLFactory ());
206224
207225 Config config = mapper .readValue (configFile , Config .class );
226+ String address = "127.0.0.1" ;
227+ {
228+ Cluster cluster = Cluster .builder ().addContactPoint (address ).build ();
229+ Session session = cluster .connect ();
208230
209- String createKeyspace = "cqlsh -e \" DROP KEYSPACE IF EXISTS " + config .keyspace +
210- "; CREATE KEYSPACE " + config .keyspace +
211- " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}\" " ;
231+ StringBuilder createKeyspace = new StringBuilder ();
232+ createKeyspace .append (" CREATE KEYSPACE " )
233+ .append (config .keyspace )
234+ .append (" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}" );
212235
213- System .out .println (createKeyspace );
214- Runtime .getRuntime ().exec (new String []{"bash" , "-c" , createKeyspace }).waitFor ();
236+ System .out .println (createKeyspace );
237+ session .execute ("DROP KEYSPACE IF EXISTS " + config .keyspace );
238+ session .execute (createKeyspace .toString ());
239+ cluster .close ();
215240
241+ }
216242
217243 System .out .println ("Schema: " + schema );
218244 String loadSchema = "cqlsh -k " + config .keyspace + " -f " + schema ;
219245 Runtime .getRuntime ().exec (new String []{"bash" , "-c" , loadSchema }).waitFor ();
220246
221247 System .out .println ("Loading data" );
222248
249+ Cluster cluster2 = Cluster .builder ()
250+ .addContactPoint (address )
251+ .build ();
252+
253+ Session session = cluster2 .connect (config .keyspace );
254+
255+ this .session = session ;
256+
223257 for (String table : config .tables ) {
224258 String dataFile = dataPath + table + ".csv" ;
225- String command = "COPY " + table + " FROM " + "'" + dataFile + "'" ;
226- String loadData = "cqlsh -k " + config .keyspace + " -e \" " + command + "\" " ;
227- System .out .println (loadData );
228- Runtime .getRuntime ().exec (new String []{"bash" , "-c" , loadData }).waitFor ();
229- }
259+ Iterable <CSVRecord > records = openCSV (dataFile );
260+
261+ System .out .println ("Importing " + table );
262+ KeyspaceMetadata keyspaceMetadata = cluster2 .getMetadata ()
263+ .getKeyspace (config .keyspace );
264+ TableMetadata tableMetadata = keyspaceMetadata .getTable (table );
265+
266+ List <ColumnMetadata > columns = tableMetadata .getColumns ();
267+
268+ StringJoiner fields = new StringJoiner (", " );
269+ StringJoiner values = new StringJoiner (", " );
270+
271+ HashMap types = new HashMap ();
272+
273+ ArrayList <Field > fieldlist = new ArrayList <>();
230274
275+ for (ColumnMetadata c : columns ) {
276+ fields .add (c .getName ());
277+ String ftype = c .getType ().getName ().toString ();
278+ types .put (c .getName (), ftype );
279+ fieldlist .add (new Field (c .getName (), ftype ));
280+ }
231281
282+ int totalComplete = 0 ;
283+ List <ResultSetFuture > futures = new ArrayList <>();
284+ for (CSVRecord record : records ) {
285+ // generate a CQL statement
286+ String cql = null ;
287+ try {
288+ cql = generateCQL (table , record , fieldlist );
289+
290+ ResultSetFuture future = session .executeAsync (cql );
291+ futures .add (future );
292+ totalComplete ++;
293+ if (totalComplete % 100 == 0 ) {
294+ futures .forEach (ResultSetFuture ::getUninterruptibly );
295+ futures .clear ();
296+ }
297+ System .out .print ("Complete: " + totalComplete + "\r " );
298+
299+ } catch (InvalidArgsException e ) {
300+ e .printStackTrace ();
301+ System .out .println (record );
302+ }
303+
304+ }
305+ futures .forEach (ResultSetFuture ::getUninterruptibly );
306+ futures .clear ();
307+ System .out .println ("Done importing " + table );
308+ }
309+
310+ cluster2 .close ();
232311 System .out .println ("Loading data" );
233312 }
234313
314+ CSVParser openCSV (String path ) throws IOException {
315+ File f = new File (path );
316+ return CSVParser .parse (f , Charset .forName ("UTF-8" ), CSVFormat .RFC4180 .withEscape ('\\' ));
317+ }
318+
319+ String generateCQL (String table ,
320+ CSVRecord record ,
321+ ArrayList <Field > fields ) throws InvalidArgsException {
322+
323+ HashSet needs_quotes = new HashSet ();
324+
325+ needs_quotes .add ("text" );
326+ needs_quotes .add ("datetime" );
327+ needs_quotes .add ("timestamp" );
328+
329+
330+ StringBuilder query = new StringBuilder ("INSERT INTO " );
331+ query .append (table );
332+ query .append ("(" );
333+
334+ StringJoiner sjfields = new StringJoiner (", " );
335+ StringJoiner values = new StringJoiner (", " );
336+
337+ fields .forEach (f -> sjfields .add (f .name ));
338+ query .append (sjfields .toString ());
339+
340+ query .append (") VALUES (" );
341+ if (record .size () != fields .size ())
342+ throw new InvalidArgsException ();
343+
344+ for (int i = 0 ; i < record .size (); i ++) {
345+ String v = record .get (i );
346+ Field f = fields .get (i );
347+ if (needs_quotes .contains (f .type )) {
348+ v = "'" + v .replace ("'" , "''" ) + "'" ;
349+ }
350+ if (v .trim ().equals ("" )) {
351+ v = "null" ;
352+ }
353+ values .add (v );
354+ }
355+
356+ query .append (values .toString ());
357+
358+ query .append (")" );
359+
360+ return query .toString ();
361+ }
235362
236363 void update () throws IOException {
237364 System .out .println ("Updating datasets..." );
@@ -249,7 +376,6 @@ void list() {
249376 for (Map .Entry <String , Dataset > dataset : datasets .entrySet ()) {
250377 System .out .println (dataset .getKey ());
251378 }
252-
253379 }
254380 void printHelp () {
255381 System .out .println ("Put help here." );
0 commit comments