Skip to content

Commit 300ce32

Browse files
authored
Dynamic index name (#19853)
* Dynamic index name Signed-off-by: Arpit Bandejiya <[email protected]> * Remap paths Signed-off-by: Arpit Bandejiya <[email protected]> --------- Signed-off-by: Arpit Bandejiya <[email protected]>
1 parent 546257c commit 300ce32

File tree

11 files changed

+83
-26
lines changed

11 files changed

+83
-26
lines changed

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,42 +5,47 @@
55
* this file be licensed under the Apache-2.0 license or a
66
* compatible open source license.
77
*/
8-
use std::ptr::addr_of_mut;
8+
use arrow_array::ffi::FFI_ArrowArray;
9+
use arrow_array::{Array, StructArray};
10+
use arrow_schema::ffi::FFI_ArrowSchema;
911
use jni::objects::{JByteArray, JClass, JObject};
1012
use jni::sys::{jbyteArray, jlong, jstring};
1113
use jni::JNIEnv;
14+
use std::ptr::addr_of_mut;
1215
use std::sync::Arc;
13-
use arrow_array::{Array, StructArray};
14-
use arrow_array::ffi::FFI_ArrowArray;
15-
use arrow_schema::DataType;
16-
use arrow_schema::ffi::FFI_ArrowSchema;
1716

1817
mod util;
1918
mod row_id_optimizer;
2019
mod listing_table;
20+
mod memory;
2121

2222
use datafusion::execution::context::SessionContext;
2323

24+
use crate::listing_table::{ListingOptions, ListingTable, ListingTableConfig};
2425
use crate::util::{create_object_meta_from_filenames, parse_string_arr, set_object_result_error, set_object_result_ok};
2526
use datafusion::datasource::file_format::csv::CsvFormat;
26-
use datafusion::datasource::listing::{ListingTableUrl};
27+
use datafusion::datasource::file_format::parquet::ParquetFormat;
28+
use datafusion::datasource::listing::ListingTableUrl;
2729
use datafusion::execution::cache::cache_manager::CacheManagerConfig;
2830
use datafusion::execution::cache::cache_unit::DefaultListFilesCache;
2931
use datafusion::execution::cache::CacheAccessor;
3032
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
33+
use datafusion::physical_plan::SendableRecordBatchStream;
3134
use datafusion::prelude::SessionConfig;
3235
use datafusion::DATAFUSION_VERSION;
33-
use datafusion::datasource::file_format::parquet::ParquetFormat;
34-
use datafusion::physical_plan::SendableRecordBatchStream;
3536
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
3637
use datafusion_substrait::substrait::proto::Plan;
3738
use futures::TryStreamExt;
3839
use jni::objects::{JObjectArray, JString};
3940
use object_store::ObjectMeta;
4041
use prost::Message;
4142
use tokio::runtime::Runtime;
42-
use crate::listing_table::{ListingOptions, ListingTable, ListingTableConfig};
43-
use crate::row_id_optimizer::FilterRowIdOptimizer;
43+
use std::thread;
44+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
45+
use std::fs::OpenOptions;
46+
use std::io::Write;
47+
use std::sync::atomic::{AtomicBool, Ordering};
48+
4449

4550
/// Create a new DataFusion session context
4651
#[no_mangle]
@@ -199,12 +204,14 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
199204
mut env: JNIEnv,
200205
_class: JClass,
201206
shard_view_ptr: jlong,
207+
table_name: JString,
202208
substrait_bytes: jbyteArray,
203209
tokio_runtime_env_ptr: jlong,
204210
// callback: JObject,
205211
) -> jlong {
206212
let shard_view = unsafe { &*(shard_view_ptr as *const ShardView) };
207213
let runtime_ptr = unsafe { &*(tokio_runtime_env_ptr as *const Runtime)};
214+
let table_name: String = env.get_string(&table_name).expect("Couldn't get java string!").into();
208215

209216
let table_path = shard_view.table_path();
210217
let files_meta = shard_view.files_meta();
@@ -254,7 +261,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
254261
// Create a new TableProvider
255262
let provider = Arc::new(ListingTable::try_new(config).unwrap());
256263
let shard_id = table_path.prefix().filename().expect("error in fetching Path");
257-
ctx.register_table("index-7", provider)
264+
ctx.register_table(table_name, provider)
258265
.expect("Failed to attach the Table");
259266

260267
});
@@ -273,7 +280,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
273280

274281
let substrait_plan = match Plan::decode(plan_bytes_vec.as_slice()) {
275282
Ok(plan) => {
276-
println!("SUBSTRAIT rust: Decoding is successful, Plan has {} relations", plan.relations.len());
283+
// println!("SUBSTRAIT rust: Decoding is successful, Plan has {} relations", plan.relations.len());
277284
plan
278285
},
279286
Err(e) => {
@@ -286,7 +293,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
286293

287294
let logical_plan = match from_substrait_plan(&ctx.state(), &substrait_plan).await {
288295
Ok(plan) => {
289-
println!("SUBSTRAIT Rust: LogicalPlan: {:?}", plan);
296+
// println!("SUBSTRAIT Rust: LogicalPlan: {:?}", plan);
290297
plan
291298
},
292299
Err(e) => {
@@ -298,7 +305,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
298305
let dataframe = ctx.execute_logical_plan(logical_plan).await.unwrap();
299306
let stream = dataframe.execute_stream().await.unwrap();
300307
let stream_ptr = Box::into_raw(Box::new(stream)) as jlong;
301-
308+
// println!("The memory used currently right now: {:?}", jemalloc_stats::refresh_allocated());
302309
stream_ptr
303310

304311
})

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private static synchronized void loadNativeLibrary() {
113113
* @param substraitPlan the serialized Substrait query plan
114114
* @return stream pointer for result iteration
115115
*/
116-
public static native long executeSubstraitQuery(long cachePtr, byte[] substraitPlan, long runtimePtr);
116+
public static native long executeSubstraitQuery(long cachePtr, String tableName, byte[] substraitPlan, long runtimePtr);
117117

118118
public static native long createDatafusionReader(String path, String[] files);
119119

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public QueryPhaseExecutor<DatafusionContext> getQueryPhaseExecutor() {
8080
public DatafusionContext createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task, BigArrays bigArrays, SearchContext originalContext) throws IOException {
8181
DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, searchShardTarget, task, this, bigArrays, originalContext);
8282
// Parse source
83-
datafusionContext.datafusionQuery(new DatafusionQuery(request.source().queryPlanIR(), new ArrayList<>()));
83+
datafusionContext.datafusionQuery(new DatafusionQuery(request.shardId().getIndexName(), request.source().queryPlanIR(), new ArrayList<>()));
8484
return datafusionContext;
8585
}
8686

@@ -191,10 +191,10 @@ public void collect(RecordBatchStream value) {
191191
collector.collect(stream);
192192
}
193193

194-
logger.info("Final Results:");
195-
for (Map.Entry<String, Object[]> entry : finalRes.entrySet()) {
196-
logger.info("{}: {}", entry.getKey(), java.util.Arrays.toString(entry.getValue()));
197-
}
194+
// logger.info("Final Results:");
195+
// for (Map.Entry<String, Object[]> entry : finalRes.entrySet()) {
196+
// logger.info("{}: {}", entry.getKey(), java.util.Arrays.toString(entry.getValue()));
197+
// }
198198

199199
} catch (Exception exception) {
200200
logger.error("Failed to execute Substrait query plan", exception);

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionQuery.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
import java.util.List;
1313

1414
public class DatafusionQuery {
15+
private String indexName;
1516
private final byte[] substraitBytes;
1617

1718
// List of Search executors which returns a result iterator which contains row id which can be joined in datafusion
1819
private final List<SearchExecutor> searchExecutors;
1920

20-
public DatafusionQuery(byte[] substraitBytes, List<SearchExecutor> searchExecutors) {
21+
public DatafusionQuery(String indexName, byte[] substraitBytes, List<SearchExecutor> searchExecutors) {
22+
this.indexName = indexName;
2123
this.substraitBytes = substraitBytes;
2224
this.searchExecutors = searchExecutors;
2325
}
@@ -29,4 +31,8 @@ public byte[] getSubstraitBytes() {
2931
public List<SearchExecutor> getSearchExecutors() {
3032
return searchExecutors;
3133
}
34+
35+
public String getIndexName() {
36+
return indexName;
37+
}
3238
}

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public String source() {
4040
public void search(DatafusionQuery datafusionQuery, List<SearchResultsCollector<RecordBatchStream>> collectors) throws IOException {
4141
// TODO : call search here to native
4242
// TODO : change RunTimePtr
43-
long nativeStreamPtr = DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.getSubstraitBytes(), 0);
43+
long nativeStreamPtr = DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.toString(), datafusionQuery.getSubstraitBytes(), 0);
4444
RecordBatchStream stream = new DefaultRecordBatchStream(nativeStreamPtr);
4545
while(stream.hasNext()) {
4646
for(SearchResultsCollector<RecordBatchStream> collector : collectors) {
@@ -51,7 +51,7 @@ public void search(DatafusionQuery datafusionQuery, List<SearchResultsCollector<
5151

5252
@Override
5353
public long search(DatafusionQuery datafusionQuery, Long contextPtr) {
54-
return DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.getSubstraitBytes(), contextPtr);
54+
return DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.getIndexName(), datafusionQuery.getSubstraitBytes(), contextPtr);
5555
}
5656

5757
public DatafusionReader getReader() {

plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void testQueryPhaseExecutor() throws IOException {
113113
throw new RuntimeException(e);
114114
}
115115

116-
long streamPointer = datafusionSearcher.search(new DatafusionQuery(protoContent, new ArrayList<>()), service.getTokioRuntimePointer());
116+
long streamPointer = datafusionSearcher.search(new DatafusionQuery("test-index",protoContent, new ArrayList<>()), service.getTokioRuntimePointer());
117117
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
118118
RecordBatchStream stream = new RecordBatchStream(streamPointer, service.getTokioRuntimePointer() , allocator);
119119

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
324324
);
325325
}
326326
executeRequest(task, searchRequest, this::searchAsyncAction, listener);
327-
logger.info("Search request received is {}", searchRequest.source());
327+
// logger.info("Search request received is {}", searchRequest.source());
328328
}
329329

330330
/**

server/src/main/java/org/opensearch/index/engine/exec/WriterFileSet.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,20 @@ public WriterFileSet(StreamInput in) throws IOException {
4141
}
4242
}
4343

44+
public WriterFileSet withDirectory(String newDirectory) {
45+
WriterFileSet newFileSet = new WriterFileSet(Path.of(newDirectory), this.writerGeneration);
46+
47+
// Extract just the filename and reconstruct with new directory
48+
for (String oldFilePath : this.files) {
49+
Path oldPath = Path.of(oldFilePath);
50+
String fileName = oldPath.getFileName().toString();
51+
String newFilePath = Path.of(newDirectory, fileName).toString();
52+
newFileSet.files.add(newFilePath);
53+
}
54+
55+
return newFileSet;
56+
}
57+
4458
/**
4559
* Serialize this WriterFileSet to StreamOutput
4660
*/

server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshot.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.index.engine.exec.FileMetadata;
1919
import org.opensearch.index.engine.exec.RefreshResult;
2020
import org.opensearch.index.engine.exec.WriterFileSet;
21+
import java.nio.file.Path;
2122

2223
import java.io.IOException;
2324
import java.io.Serializable;
@@ -43,6 +44,31 @@ public CatalogSnapshot(RefreshResult refreshResult, long id) {
4344
refreshResult.getRefreshedFiles().forEach((dataFormat, writerFiles) -> dfGroupedSearchableFiles.put(dataFormat.name(), writerFiles));
4445
}
4546

47+
private CatalogSnapshot(long id, Map<String, Collection<WriterFileSet>> dfGroupedSearchableFiles) {
48+
super("catalog_snapshot");
49+
this.id = id;
50+
this.dfGroupedSearchableFiles = dfGroupedSearchableFiles;
51+
}
52+
53+
public CatalogSnapshot remapPaths(Path newShardDataPath) {
54+
Map<String, Collection<WriterFileSet>> remappedFiles = new HashMap<>();
55+
56+
for (Map.Entry<String, Collection<WriterFileSet>> entry : dfGroupedSearchableFiles.entrySet()) {
57+
String dataFormat = entry.getKey();
58+
List<WriterFileSet> remappedFileSets = new ArrayList<>();
59+
60+
for (WriterFileSet fileSet : entry.getValue()) {
61+
// Create new WriterFileSet with updated directory and file paths
62+
WriterFileSet remappedFileSet = fileSet.withDirectory(newShardDataPath.toString());
63+
remappedFileSets.add(remappedFileSet);
64+
}
65+
66+
remappedFiles.put(dataFormat, remappedFileSets);
67+
}
68+
69+
return new CatalogSnapshot(this.id, remappedFiles);
70+
}
71+
4672
public CatalogSnapshot(StreamInput in) throws IOException {
4773
super("catalog_snapshot");
4874
this.id = in.readLong();

server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ public CompositeEngine(MapperService mapperService, PluginsService pluginsServic
7171
this.engine = new CompositeIndexingExecutionEngine(mapperService, pluginsService, shardPath, 0);
7272
this.compositeEngineCommitter = new LuceneCommitEngine(shardPath.getDataPath());
7373
this.catalogSnapshot = ((LuceneCommitEngine)this.compositeEngineCommitter).readCatalogSnapshot();
74+
75+
// remap the paths
76+
this.catalogSnapshot = this.catalogSnapshot.remapPaths(shardPath.getDataPath());
77+
7478
this.mergeHandler = new ParquetMergeHandler(this, this.engine, this.engine.getDataFormat());
7579
mergeScheduler = new MergeScheduler(this.mergeHandler, this);
7680

0 commit comments

Comments
 (0)