Skip to content

Conversation

@animodak7
Copy link
Contributor

Description

  1. Implements query then fetch -
    a) reads substrait plan derived from query "SELECT * FROM "index-7" WHERE ___row_id < 10 LIMIT 9"
    b) queries parquet files and returns row_ids
    c) based on row ids and projections creates access plan and executes to get plan recordBatchStream
    d) Using derived source mappers converts it to source json object bytesRef.
  2. Run e2e test using - ./gradlew :plugins:engine-datafusion:test --tests 'org.opensearch.datafusion.DataFusionServiceTests.testQueryThenFetchE2ETest' -Dtests.seed=EF79DE2C926C4ACC -Dtests.locale=ms-BN -Dtests.timezone=America/Belem

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Contributor

❌ Gradle check result for 0047c1a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 991699b: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

/// multiple equivalent orderings, the outer `Vec` will have a
/// single element.
pub file_sort_order: Vec<Vec<SortExpr>>,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not extend the listing table instead of modifying it? Thinking of an Object where default listingOptions can be the child and we override only the imp functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot extend it since there are internal functions which we need to change to make this work such as scan which again use functions restricted to ListingTable object which are not accessible to child, so anyway for these functions we will have to copy them to somewhere child can access. We can think of this but there might be some changes need in datafusion as well, we can plan to to this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think we should open an issue in datafusion once we have an plan


fn add_path_preserving_metadata(&self, file_groups: Vec<FileGroup>) -> Vec<FileGroup> {
let re = Regex::new(r"generation-(\d+)").unwrap();
fn add_path_preserving_metadata(&self, file_groups: Vec<FileGroup>, files_metadata: Arc<Vec<FileMetadata>>) -> Vec<FileGroup> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's return Result<> format for error handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


fn add_path_preserving_metadata(&self, file_groups: Vec<FileGroup>) -> Vec<FileGroup> {
let re = Regex::new(r"generation-(\d+)").unwrap();
fn add_path_preserving_metadata(&self, file_groups: Vec<FileGroup>, files_metadata: Arc<Vec<FileMetadata>>) -> Vec<FileGroup> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we make sure the vec and the Vec have the same order? Does it matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order of file_matadata will defer in creating row_base for now we are dependant on DatafusionReader for files ordering. In function file_group ordering can be different than file_metadata ordering we are mapping row_base based on location.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking for cases of high number of files. We are finding the meta file every time for every iteration. Will it make sense to sort them with the same key and just have validation in the loop for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense will think more on this and will try to optimize in next pr

let row_count = files_metadata.iter()
.find(|meta| { location.contains(meta.object_meta.location.as_ref()) })
.map(|meta| meta.row_group_row_counts().iter().sum::<i64>() as i32)
.unwrap_or_default();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the default here? Should we not always find it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should always find the row base. changed.

Copy link
Contributor

@alchemist51 alchemist51 Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw an error here instead then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to structure it differently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can structure it differently, will write another test case after completing fetch phase will update then.

@github-actions
Copy link
Contributor

❌ Gradle check result for 0bce75a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

❌ Gradle check result for 392e0c1: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants