Skip to content

Commit 266b0a5

Browse files
committed
push the json loader to the repo
1 parent 46a7129 commit 266b0a5

30 files changed

+1197
-81
lines changed

astra-db-java-tools/src/main/java/com/datastax/astra/tool/loader/csv/CsvRowMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ public interface CsvRowMapper {
1313
* @param doc
1414
* document to process
1515
*/
16-
Document map(Document doc) ;
16+
Document map(Document doc);
1717

1818
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package com.datastax.astra.tool.loader.json;
2+
3+
import com.datastax.astra.client.collections.Collection;
4+
import com.datastax.astra.client.collections.definition.documents.Document;
5+
import com.fasterxml.jackson.core.JsonFactory;
6+
import com.fasterxml.jackson.core.JsonParser;
7+
import com.fasterxml.jackson.core.JsonToken;
8+
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import lombok.extern.slf4j.Slf4j;
10+
11+
import java.io.File;
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.concurrent.ExecutorService;
16+
import java.util.concurrent.Executors;
17+
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
@Slf4j
21+
public class JsonDocumentLoader {
22+
23+
/**
24+
* Distributed import of CSV file into Astra.
25+
*
26+
* @throws Exception
27+
* exception in processing CSV
28+
*/
29+
public static void load(String fileName, Collection<Document> collection, JsonRecordMapper processor) throws Exception {
30+
load(fileName, JsonLoaderSettings.builder().build(), collection, processor);
31+
}
32+
33+
/**
34+
* Distributed import of CSV file into Astra.
35+
*
36+
* @throws Exception
37+
* exception in processing CSV
38+
*/
39+
public static void load(String fileName, JsonLoaderSettings settings, Collection<Document> collection) throws Exception {
40+
load(fileName, settings, collection, doc -> doc);
41+
}
42+
43+
/**
44+
* Distributed import of a JSON file into Astra.
45+
*
46+
* @param settings
47+
* settings
48+
* @param collection
49+
* collection
50+
* @param processor
51+
* processor
52+
* @throws Exception
53+
* exception in processing CSV
54+
*/
55+
public static void load(String fileName,
56+
JsonLoaderSettings settings,
57+
Collection<Document> collection,
58+
JsonRecordMapper processor)
59+
throws Exception {
60+
AtomicInteger counter = new AtomicInteger();
61+
long top = System.currentTimeMillis();
62+
// Multithreaded executor to process the CSV file
63+
long startTime = System.currentTimeMillis();
64+
ObjectMapper objectMapper = new ObjectMapper();
65+
List<Document> batch = new ArrayList<>(settings.batchSize);
66+
ExecutorService executor = Executors.newFixedThreadPool(settings.threadPoolSize);
67+
68+
try (JsonParser parser = new JsonFactory().createParser(new File(fileName))) {
69+
if (parser.nextToken() != JsonToken.START_ARRAY) {
70+
throw new RuntimeException("JSON file must contain an array at the root");
71+
}
72+
while (parser.nextToken() == JsonToken.START_OBJECT) {
73+
Document doc = objectMapper.readValue(parser, Document.class);
74+
//Document doc = new Document();
75+
//doc.putAll();
76+
counter.incrementAndGet();
77+
batch.add(processor.map(doc));
78+
if (batch.size() == settings.batchSize) {
79+
final List<Document> batchToInsert = new ArrayList<>(batch);
80+
log.info("Enqueuing " + batch.size() + " rows into collection...");
81+
executor.submit(() -> collection.insertMany(batchToInsert));
82+
batch.clear();
83+
}
84+
}
85+
86+
// Process remaining batch
87+
if (!batch.isEmpty()) {
88+
executor.submit(() -> collection.insertMany(batch));
89+
}
90+
} finally {
91+
executor.shutdown();
92+
log.info(counter.get() + " rows enqueued from " + collection + " in " + (System.currentTimeMillis() - startTime) + " ms");
93+
try {
94+
if (!executor.awaitTermination(settings.timeoutSeconds, TimeUnit.SECONDS)) {
95+
executor.shutdownNow();
96+
log.info(counter.get() + " rows processed in " + (System.currentTimeMillis() - startTime) + " ms");
97+
}
98+
} catch (InterruptedException e) {
99+
Thread.currentThread().interrupt();
100+
executor.shutdownNow();
101+
}
102+
}
103+
}
104+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.datastax.astra.tool.loader.json;
2+
3+
import lombok.Builder;
4+
5+
@Builder
6+
public class JsonLoaderSettings {
7+
8+
private static final int BATCH_SIZE = 20;
9+
10+
private static final int THREAD_POOL_SIZE = 5;
11+
12+
private static final int TIMEOUT = 1800;
13+
14+
@Builder.Default
15+
int batchSize = BATCH_SIZE;;
16+
17+
@Builder.Default
18+
int threadPoolSize = THREAD_POOL_SIZE;;
19+
20+
@Builder.Default
21+
int timeoutSeconds = TIMEOUT;
22+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.datastax.astra.tool.loader.json;
2+
3+
import com.datastax.astra.client.collections.definition.documents.Document;
4+
5+
/**
6+
* To import a Json containing an arrray of records.
7+
*/
8+
public interface JsonRecordMapper {
9+
10+
Document map(Document jsonRecord);
11+
}

astra-db-java-tools/src/test/java/com/datastax/astra/samples/CsvLoaderAnoop.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public static void main(String[] args) throws Exception {
2929

3030
System.out.println(collection.find(and(
3131
eq("origin", "55426"),
32-
eq("destination", "61701"))).all());
32+
eq("destination", "61701"))).toList());
3333

3434
}
3535

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.datastax.astra.samples;
2+
3+
import com.datastax.astra.client.DataAPIClient;
4+
import com.datastax.astra.client.collections.Collection;
5+
import com.datastax.astra.client.collections.definition.documents.Document;
6+
import com.datastax.astra.client.databases.Database;
7+
import com.datastax.astra.tool.loader.json.JsonDocumentLoader;
8+
import com.datastax.astra.tool.loader.json.JsonLoaderSettings;
9+
import com.datastax.astra.tool.loader.json.JsonRecordMapper;
10+
import com.dtsx.astra.sdk.utils.JsonUtils;
11+
12+
public class JsonLoaderMtgSets {
13+
14+
public static final String ASTRA_TOKEN = System.getenv("ASTRA_DB_APPLICATION_TOKEN");
15+
public static final String ASTRA_DB_ENDPOINT = "https://7d7388a6-5ba2-431a-942a-250012f785c0-us-east1.apps.astra.datastax.com";
16+
public static final String SOURCE_JSON = "/Users/cedricklunven/dev/datastax/JAVA/astra-db-java/astra-db-java-tools/src/test/resources/demo-set-list.json";
17+
18+
public static void main(String[] args) throws Exception {
19+
DataAPIClient client = new DataAPIClient(ASTRA_TOKEN);
20+
Database db = client.getDatabase(ASTRA_DB_ENDPOINT);
21+
Collection<Document> collection = db.createCollection("mtg_sets");
22+
collection.deleteAll();
23+
24+
JsonDocumentLoader.load(SOURCE_JSON,
25+
JsonLoaderSettings.builder()
26+
.timeoutSeconds(300)
27+
.batchSize(100)
28+
.build(),
29+
collection,
30+
new MtgSetMapper());
31+
}
32+
33+
public static class MtgSetMapper implements JsonRecordMapper {
34+
35+
@Override
36+
public Document map(Document jsonRecord) {
37+
// manipulate each record as you like
38+
jsonRecord.put("_id", jsonRecord.get("code"));
39+
return jsonRecord;
40+
}
41+
}
42+
}

astra-db-java-tools/src/test/resources/demo-set-list.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

astra-db-java/src/main/java/com/datastax/astra/client/admin/DataAPIDatabaseAdmin.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ public DataAPIDatabaseAdmin(Database db, AdminOptions options) {
7171
this.db = db;
7272
this.options.commandType(CommandType.KEYSPACE_ADMIN);
7373
String apiVersion = options.getDataAPIClientOptions().getApiVersion();
74+
switch(options.getDataAPIClientOptions().getDestination()) {
75+
case ASTRA:
76+
case ASTRA_TEST:
77+
case ASTRA_DEV:
78+
if (db.getRootEndpoint().endsWith(".com")) {
79+
this.apiEndpoint += "/api/json";
80+
}
81+
break;
82+
default:
83+
break;
84+
}
7485
if (!db.getRootEndpoint().endsWith(apiVersion)) {
7586
this.apiEndpoint += "/" + apiVersion;
7687
}

astra-db-java/src/main/java/com/datastax/astra/client/collections/Collection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,8 @@ public CollectionCursor<T, T> find(Filter filter, CollectionFindOptions options)
10041004
* the query filter
10051005
* @param options
10061006
* options of find one
1007+
* @param newDocType
1008+
* new class for return objects if projected
10071009
* @return
10081010
* the find iterable interface
10091011
*/

0 commit comments

Comments
 (0)