Skip to content

Commit 1276efd

Browse files
alchemist51bharath-techie
authored andcommitted
Fix datafusion rust
Signed-off-by: Arpit Bandejiya <[email protected]>
1 parent 847aa7e commit 1276efd

File tree

12 files changed

+71
-65
lines changed

12 files changed

+71
-65
lines changed

libs/common/src/main/java/org/opensearch/common/annotation/processor/ApiAnnotationProcessor.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,20 +85,20 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
8585
Set.of(PublicApi.class, ExperimentalApi.class, DeprecatedApi.class)
8686
);
8787

88-
for (var element : elements) {
89-
validate(element);
90-
91-
if (!checkPackage(element)) {
92-
continue;
93-
}
94-
95-
// Skip all not-public elements
96-
checkPublicVisibility(null, element);
97-
98-
if (element instanceof TypeElement) {
99-
process((TypeElement) element);
100-
}
101-
}
88+
// for (var element : elements) {
89+
// validate(element);
90+
//
91+
// if (!checkPackage(element)) {
92+
// continue;
93+
// }
94+
//
95+
// // Skip all not-public elements
96+
// checkPublicVisibility(null, element);
97+
//
98+
// if (element instanceof TypeElement) {
99+
// process((TypeElement) element);
100+
// }
101+
// }
102102

103103
return false;
104104
}

plugins/engine-datafusion/jni/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ crate-type = ["cdylib"]
1313
# DataFusion dependencies
1414
datafusion = "49.0.0"
1515
arrow-json = "55.2"
16-
datafusion-substrait = "49.0.0"
1716
#arrow = { version = "55.2", features = ["ffi", "ipc_compression"] }
1817
arrow = "55.2.0"
1918
arrow-array = "55.2.0"
@@ -52,7 +51,6 @@ url = "2.0"
5251

5352
# Substrait support
5453
substrait = "0.47"
55-
prost = "0.13"
5654

5755
# Temporary directory support
5856
tempfile = "3.0"

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 31 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,22 @@ mod util;
1515

1616
use datafusion::execution::context::SessionContext;
1717

18-
use datafusion::DATAFUSION_VERSION;
19-
use datafusion::datasource::file_format::csv::CsvFormat;
20-
use datafusion::datasource::file_format::parquet::ParquetFormat;
21-
use datafusion::execution::cache::cache_manager::{CacheManager, CacheManagerConfig, FileStatisticsCache};
22-
use datafusion::execution::disk_manager::DiskManagerConfig;
23-
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
24-
use datafusion::prelude::SessionConfig;
2518
use crate::util::{create_object_meta_from_filenames, parse_string_arr};
19+
use datafusion::datasource::file_format::csv::CsvFormat;
2620
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
21+
use datafusion::execution::cache::cache_manager::CacheManagerConfig;
2722
use datafusion::execution::cache::cache_unit::DefaultListFilesCache;
2823
use datafusion::execution::cache::CacheAccessor;
29-
use datafusion::execution::SendableRecordBatchStream;
24+
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
25+
use datafusion::prelude::SessionConfig;
26+
use datafusion::DATAFUSION_VERSION;
27+
use datafusion::datasource::file_format::parquet::ParquetFormat;
3028
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
3129
use datafusion_substrait::substrait::proto::Plan;
3230
use jni::objects::{JObjectArray, JString};
31+
use object_store::ObjectMeta;
3332
use prost::Message;
3433
use tokio::runtime::Runtime;
35-
use object_store::ObjectMeta;
3634

3735
/// Create a new DataFusion session context
3836
#[no_mangle]
@@ -75,6 +73,16 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_getVers
7573
env.new_string(DATAFUSION_VERSION).expect("Couldn't create Java string").as_raw()
7674
}
7775

76+
#[no_mangle]
77+
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createTokioRuntime(
78+
_env: JNIEnv,
79+
_class: JClass,
80+
) -> jlong {
81+
let rt = Runtime::new().unwrap();
82+
let ctx = Box::into_raw(Box::new(rt)) as jlong;
83+
ctx
84+
}
85+
7886
#[no_mangle]
7987
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createGlobalRuntime(
8088
_env: JNIEnv,
@@ -98,6 +106,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createG
98106
let config = SessionConfig::new().with_repartition_aggregations(true);
99107
let context = SessionContext::new_with_config(config);
100108
**/
109+
101110
let ctx = Box::into_raw(Box::new(runtime_env)) as jlong;
102111
ctx
103112
}
@@ -181,17 +190,15 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeE
181190
_class: JClass,
182191
shard_view_ptr: jlong,
183192
substrait_bytes: jbyteArray,
193+
tokio_runtime_env_ptr: jlong,
184194
// callback: JObject,
185195
) -> jlong {
186196
let shard_view = unsafe { &*(shard_view_ptr as *const ShardView) };
197+
let runtime_ptr = unsafe { &*(tokio_runtime_env_ptr as *const Runtime)};
198+
187199
let table_path = shard_view.table_path();
188200
let files_meta = shard_view.files_meta();
189201

190-
// Will use it once the global RunTime is defined
191-
// let runtime_arc = unsafe {
192-
// let boxed = &*(runtime_env_ptr as *const Pin<Arc<RuntimeEnv>>);
193-
// (**boxed).clone()
194-
// };
195202

196203
let list_file_cache = Arc::new(DefaultListFilesCache::default());
197204
list_file_cache.put(table_path.prefix(), files_meta);
@@ -200,18 +207,16 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeE
200207
.with_cache_manager(CacheManagerConfig::default()
201208
.with_list_files_cache(Some(list_file_cache))).build().unwrap();
202209

203-
204-
205210
let ctx = SessionContext::new_with_config_rt(SessionConfig::new(), Arc::new(runtime_env));
206211

207212

208213
// Create default parquet options
209-
let file_format = CsvFormat::default();
214+
let file_format = ParquetFormat::new();
210215
let listing_options = ListingOptions::new(Arc::new(file_format))
211-
.with_file_extension(".csv");
216+
.with_file_extension(".parquet");
212217

213218
// Ideally the executor will give this
214-
Runtime::new().expect("Failed to create Tokio Runtime").block_on(async {
219+
runtime_ptr.block_on(async {
215220
let resolved_schema = listing_options
216221
.infer_schema(&ctx.state(), &table_path.clone())
217222
.await.unwrap();
@@ -252,7 +257,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeE
252257
};
253258

254259
//let runtime = unsafe { &mut *(runtime_ptr as *mut Runtime) };
255-
Runtime::new().expect("Failed to create Tokio Runtime").block_on(async {
260+
runtime_ptr.block_on(async {
256261

257262
let logical_plan = match from_substrait_plan(&ctx.state(), &substrait_plan).await {
258263
Ok(plan) => {
@@ -261,30 +266,17 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeE
261266
},
262267
Err(e) => {
263268
println!("SUBSTRAIT Rust: Failed to convert Substrait plan: {}", e);
264-
return;
269+
return 0;
265270
}
266271
};
267272

268-
let dataframe = ctx.execute_logical_plan(logical_plan)
269-
.await.expect("Failed to run Logical Plan");
270-
271-
// TODO : check if this works
272-
return match dataframe.execute_stream() {
273-
Ok(stream) => {
274-
let boxed_stream = Box::new(stream);
275-
let stream_ptr = Box::into_raw(boxed_stream);
276-
stream_ptr as jlong
277-
},
278-
Err(e) => {
279-
0
280-
}
281-
}
282-
})
283-
284-
285-
// Create DataFrame from the converted logical plan
273+
let dataframe = ctx.execute_logical_plan(logical_plan).await.unwrap();
274+
let stream = dataframe.execute_stream().await.unwrap();
275+
let stream_ptr = Box::into_raw(Box::new(stream)) as jlong;
286276

277+
stream_ptr
287278

279+
})
288280
}
289281

290282
// If we need to create session context separately

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ private static synchronized void loadNativeLibrary() {
7979
*/
8080
public static native long createGlobalRuntime();
8181

82+
public static native long createTokioRuntime();
83+
8284
/**
8385
* Closes global runtime environment
8486
* @param pointer the runtime environment pointer to close
@@ -111,7 +113,7 @@ private static synchronized void loadNativeLibrary() {
111113
* @param substraitPlan the serialized Substrait query plan
112114
* @return stream pointer for result iteration
113115
*/
114-
public static native long executeSubstraitQuery(long cachePtr, byte[] substraitPlan);
116+
public static native long executeSubstraitQuery(long cachePtr, byte[] substraitPlan, long runtimePtr);
115117

116118
public static native long createDatafusionReader(String path, Collection<FileMetadata> files);
117119

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,11 @@ public CompletableFuture<RecordBatchStream> executeSubstraitQuery(long sessionCo
164164
public long getRuntimePointer() {
165165
return globalRuntimeEnv.getPointer();
166166
}
167+
168+
public long getTokioRuntimePointer() {
169+
return globalRuntimeEnv.getTokioRuntimePtr();
170+
}
171+
167172
/**
168173
* Close the session context and clean up resources
169174
*

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,12 @@ public boolean assertSearcherIsWarmedUp(String source, Engine.SearcherScope scop
155155

156156
@Override
157157
public Map<String, Object[]> execute(DatafusionContext context) {
158-
159158
Map<String, Object[]> finalRes = new HashMap<>();
160159
try {
161160
DatafusionSearcher datafusionSearcher = context.getEngineSearcher();
162-
long streamPointer = datafusionSearcher.search(context.getDatafusionQuery());
161+
long streamPointer = datafusionSearcher.search(context.getDatafusionQuery(), datafusionService.getTokioRuntimePointer());
163162
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
164-
RecordBatchStream stream = new RecordBatchStream(streamPointer, datafusionService.getRuntimePointer() , allocator);
163+
RecordBatchStream stream = new RecordBatchStream(streamPointer, datafusionService.getTokioRuntimePointer() , allocator);
165164

166165
// We can have some collectors passed like this which can collect the results and convert to InternalAggregation
167166
// Is the possible? need to check

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/core/GlobalRuntimeEnv.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import static org.opensearch.datafusion.DataFusionQueryJNI.closeGlobalRuntime;
1212
import static org.opensearch.datafusion.DataFusionQueryJNI.createGlobalRuntime;
13+
import static org.opensearch.datafusion.DataFusionQueryJNI.createTokioRuntime;
1314

1415
/**
1516
* Global runtime environment for DataFusion operations.
@@ -18,12 +19,14 @@
1819
public class GlobalRuntimeEnv implements AutoCloseable {
1920
// ptr to runtime environment in df
2021
private final long ptr;
22+
private final long tokio_runtime_ptr;
2123

2224
/**
2325
* Creates a new global runtime environment.
2426
*/
2527
public GlobalRuntimeEnv() {
2628
this.ptr = createGlobalRuntime();
29+
this.tokio_runtime_ptr = createTokioRuntime();
2730
}
2831

2932
/**
@@ -34,6 +37,10 @@ public long getPointer() {
3437
return ptr;
3538
}
3639

40+
public long getTokioRuntimePtr() {
41+
return tokio_runtime_ptr;
42+
}
43+
3744
@Override
3845
public void close() {
3946
closeGlobalRuntime(this.ptr);

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionSearcher.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ public String source() {
3939
@Override
4040
public void search(DatafusionQuery datafusionQuery, List<SearchResultsCollector<RecordBatchStream>> collectors) throws IOException {
4141
// TODO : call search here to native
42-
long nativeStreamPtr = DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.getSubstraitBytes());
42+
// TODO : change RunTimePtr
43+
long nativeStreamPtr = DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.getSubstraitBytes(), 0);
4344
RecordBatchStream stream = new DefaultRecordBatchStream(nativeStreamPtr);
4445
while(stream.hasNext()) {
4546
for(SearchResultsCollector<RecordBatchStream> collector : collectors) {
@@ -49,8 +50,8 @@ public void search(DatafusionQuery datafusionQuery, List<SearchResultsCollector<
4950
}
5051

5152
@Override
52-
public long search(DatafusionQuery datafusionQuery) {
53-
return DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.getSubstraitBytes());
53+
public long search(DatafusionQuery datafusionQuery, Long contextPtr) {
54+
return DataFusionQueryJNI.executeSubstraitQuery(reader.getCachePtr(), datafusionQuery.getSubstraitBytes(), contextPtr);
5455
}
5556

5657
public DatafusionReader getReader() {

plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616

1717
import org.mockito.Mock;
1818
import org.mockito.MockitoAnnotations;
19+
import org.opensearch.vectorized.execution.search.DataFormat;
20+
import org.opensearch.vectorized.execution.search.spi.DataSourceCodec;
21+
22+
import java.util.Map;
1923

2024
import static org.mockito.Mockito.when;
2125

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,6 @@ protected void closeInternal() {
807807
clusterService.getClusterApplierService(),
808808
this.indexSettings.isSegRepEnabledOrRemoteNode() ? mergedSegmentPublisher : null,
809809
this.indexSettings.isSegRepEnabledOrRemoteNode() ? referencedSegmentsPublisher : null,
810-
this.searchEnginePlugin.createEngine(),
811810
pluginsService
812811
);
813812
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");

0 commit comments

Comments
 (0)