-
Notifications
You must be signed in to change notification settings - Fork 305
feat(datafusion): Implement insert_into
for IcebergTableProvider
#1600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
} else { | ||
self.table.clone() | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me wonder if storing table directly in the IcebergTableProvider
is correct... We could get a stale table if the provider doesn't have a catalog table.
Iceberg-java has a refresh()
interface which uses TableOperation
to refresh metadata. In iceberg-rs we don't have TableOperation
and need to rely on catalog to refresh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a PR to add that functionality (including the refresh): #1297
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @phillipleblanc , thanks for pointing me to your change! Your change makes sense to me, but I was thinking of adding something like this to impl Table
directly
pub async fn refresh(&mut self, catalog: &dyn Catalog) -> Result<Self>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this maybe a problem, but I don't think we should do it here? Creating another issue to track this maybe a better approach.
_insert_op: InsertOp, | ||
) -> DFResult<Arc<dyn ExecutionPlan>> { | ||
if !self | ||
.table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should refresh the table here and every otherself.table
usages in IcebergTableProvider
, but I think we should fix that in a separate PR if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CTTY for this pr! Generally LGTM, just one minor point.
} else { | ||
self.table.clone() | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this maybe a problem, but I don't think we should do it here? Creating another issue to track this maybe a better approach.
_insert_op: InsertOp, | ||
) -> DFResult<Arc<dyn ExecutionPlan>> { | ||
if !self | ||
.table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable to me.
@@ -432,3 +432,370 @@ async fn test_metadata_table() -> Result<()> { | |||
|
|||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn test_insert_into() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for these tests, it would be much easier if we have sql logic tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will jump back on this after #1621 is completed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CTTY for this pr, LGTM!
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## What changes are included in this PR? <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> * Previous #1600 added few integration tests, but need to include the new changes introduced in memory catalog loader change #1623 . * This caused build failures ## Are these changes tested? Yes, existing tests <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? -->
Which issue does this PR close?
insert_into
forIcebergTableProvider
#1540What changes are included in this PR?
catalog
toIcebergTableProvider
as optionalIcebergTableProvider::scan
insert_into
forIcebergTableProvider
using write node and commit node for non-partitioned tablesAre these changes tested?
Added tests