diff --git a/Cargo.lock b/Cargo.lock index da5d19a93..0935ea9e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -178,7 +178,7 @@ dependencies = [ "chrono", "chrono-tz", "half", - "hashbrown", + "hashbrown 0.14.5", "num", ] @@ -351,9 +351,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fec134f64e2bc57411226dfc4e52dec859ddfc7e711fc5e07b612584f000e4aa" +checksum = "7e614738943d3f68c628ae3dbce7c3daffb196665f82f8c8ea6b65de73c79429" dependencies = [ "bzip2", "flate2", @@ -380,9 +380,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.82" +version = "0.1.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" +checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", @@ -406,9 +406,9 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "backtrace" @@ -515,9 +515,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" [[package]] name = "bzip2" @@ -542,9 +542,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.20" +version = "1.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45bcde016d64c21da4be18b655631e5ab6d3107607e71a73a9f53eb48aae23fb" +checksum = "812acba72f0a070b003d3697490d2b55b837230ae7c6c6497f05cc2ddbb8d938" dependencies = [ "jobserver", "libc", @@ -738,7 +738,7 @@ checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", - "hashbrown", + "hashbrown 0.14.5", "lock_api", "once_cell", "parking_lot_core", @@ -781,7 +781,7 @@ dependencies = [ "futures", "glob", "half", - "hashbrown", + "hashbrown 0.14.5", "indexmap", "itertools", "log", @@ -832,7 +832,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "hashbrown", + "hashbrown 0.14.5", "instant", "libc", "num_cpus", @@ -866,7 +866,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "futures", - "hashbrown", + "hashbrown 0.14.5", "log", "object_store", "parking_lot", @@ -923,7 +923,7 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", - "hashbrown", + "hashbrown 0.14.5", "hex", "itertools", "log", @@ -1017,7 +1017,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown", + "hashbrown 0.14.5", "indexmap", "itertools", "log", @@ -1047,7 +1047,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", - "hashbrown", + "hashbrown 0.14.5", "hex", "indexmap", "itertools", @@ -1067,7 +1067,7 @@ dependencies = [ "arrow", "datafusion-common", "datafusion-expr-common", - "hashbrown", + "hashbrown 0.14.5", "rand", ] @@ -1109,7 +1109,7 @@ dependencies = [ "datafusion-physical-expr-common", "futures", "half", - "hashbrown", + "hashbrown 0.14.5", "indexmap", "itertools", "log", @@ -1120,6 +1120,35 @@ dependencies = [ "tokio", ] +[[package]] +name = "datafusion-proto" +version = "42.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585357d621fa03ea85a7fefca79ebc5ef0ee13a7f82be0762a414879a4d190a7" +dependencies = [ + "arrow", + "chrono", + "datafusion", + "datafusion-common", + "datafusion-expr", + "datafusion-proto-common", + "object_store", + "prost", +] + +[[package]] +name = "datafusion-proto-common" +version = "42.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4db6534382f92f528bdb5d925b4214c31ffd84fa7fe1eff3ed0d2f1286851ab8" +dependencies = [ + "arrow", + "chrono", + "datafusion-common", + "object_store", + "prost", +] + [[package]] name = "datafusion-python" version = "41.0.0" @@ -1127,6 +1156,7 @@ dependencies = [ "arrow", "async-trait", "datafusion", + "datafusion-proto", "datafusion-substrait", "futures", "mimalloc", @@ -1242,9 +1272,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.33" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" +checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" dependencies = [ "crc32fast", "miniz_oxide", @@ -1427,6 +1457,12 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" + [[package]] name = "heck" version = "0.4.1" @@ -1487,9 +1523,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.9.4" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9" +checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" [[package]] name = "humantime" @@ -1528,7 +1564,7 @@ dependencies = [ "hyper", "hyper-util", "rustls", - "rustls-native-certs 0.8.0", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -1537,9 +1573,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da62f120a8a37763efb0cf8fdf264b884c7b8b9ac8660b900c8661030c00e6ba" +checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" dependencies = [ "bytes", "futures-channel", @@ -1550,7 +1586,6 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", "tower-service", "tracing", ] @@ -1590,12 +1625,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" +checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.0", ] [[package]] @@ -1733,9 +1768,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.158" +version = "0.2.159" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" [[package]] name = "libflate" @@ -1757,7 +1792,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" dependencies = [ "core2", - "hashbrown", + "hashbrown 0.14.5", "rle-decode-fast", ] @@ -2012,9 +2047,12 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "82881c4be219ab5faaf2ad5e5e5ecdff8c66bd7402ca3160975c93b24961afd1" +dependencies = [ + "portable-atomic", +] [[package]] name = "openssl-probe" @@ -2075,7 +2113,7 @@ dependencies = [ "flate2", "futures", "half", - "hashbrown", + "hashbrown 0.14.5", "lz4_flex", "num", "num-bigint", @@ -2196,26 +2234,6 @@ dependencies = [ "siphasher", ] -[[package]] -name = "pin-project" -version = "1.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.14" @@ -2230,15 +2248,15 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" [[package]] name = "portable-atomic" -version = "1.7.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" [[package]] name = "ppv-lite86" @@ -2280,9 +2298,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.2" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8650aabb6c35b860610e9cff5dc1af886c9e25073b7b1712a68972af4281302" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", "heck 0.5.0", @@ -2401,9 +2419,9 @@ checksum = "b76f1009795ca44bb5aaae8fd3f18953e209259c33d9b059b1f53d58ab7511db" [[package]] name = "quick-xml" -version = "0.36.1" +version = "0.36.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc" +checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" dependencies = [ "memchr", "serde", @@ -2498,18 +2516,18 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.4" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" dependencies = [ "bitflags 2.6.0", ] [[package]] name = "regex" -version = "1.10.6" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" +checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" dependencies = [ "aho-corasick", "memchr", @@ -2519,9 +2537,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" +checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" dependencies = [ "aho-corasick", "memchr", @@ -2536,9 +2554,9 @@ checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" [[package]] name = "regex-syntax" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "regress" @@ -2546,15 +2564,15 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eae2a1ebfecc58aff952ef8ccd364329abe627762f5bf09ff42eb9d98522479" dependencies = [ - "hashbrown", + "hashbrown 0.14.5", "memchr", ] [[package]] name = "reqwest" -version = "0.12.7" +version = "0.12.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" +checksum = "f713147fbe92361e52392c73b8c9e48c04c6625bce969ef54dc901e58e042a7b" dependencies = [ "base64 0.22.1", "bytes", @@ -2576,7 +2594,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls", - "rustls-native-certs 0.7.3", + "rustls-native-certs", "rustls-pemfile", "rustls-pki-types", "serde", @@ -2664,19 +2682,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" -dependencies = [ - "openssl-probe", - "rustls-pemfile", - "rustls-pki-types", - "schannel", - "security-framework", -] - [[package]] name = "rustls-native-certs" version = "0.8.0" @@ -2692,19 +2697,18 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.3" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" dependencies = [ - "base64 0.22.1", "rustls-pki-types", ] [[package]] name = "rustls-pki-types" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" +checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" [[package]] name = "rustls-webpki" @@ -2792,9 +2796,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.11.1" +version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" +checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" dependencies = [ "core-foundation-sys", "libc", @@ -2935,18 +2939,18 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "snafu" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b835cb902660db3415a672d862905e791e54d306c6e8189168c7f3d9ae1c79d" +checksum = "223891c85e2a29c3fe8fb900c1fae5e69c2e42415e3177752e8718475efa5019" dependencies = [ "snafu-derive", ] [[package]] name = "snafu-derive" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5" +checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -3103,9 +3107,9 @@ checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" [[package]] name = "tempfile" -version = "3.12.0" +version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" +checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" dependencies = [ "cfg-if", "fastrand", @@ -3116,18 +3120,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.63" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" +checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.63" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" +checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", @@ -3220,27 +3224,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "futures-core", - "futures-util", - "pin-project", - "pin-project-lite", - "tokio", - "tower-layer", - "tower-service", -] - -[[package]] -name = "tower-layer" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" - [[package]] name = "tower-service" version = "0.3.3" @@ -3369,9 +3352,9 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.15" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" +checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893" [[package]] name = "unicode-ident" @@ -3396,9 +3379,9 @@ checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" [[package]] name = "unicode-width" -version = "0.1.13" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" [[package]] name = "unindent" @@ -3539,9 +3522,9 @@ checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" [[package]] name = "wasm-streams" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" +checksum = "4e072d4e72f700fb3443d8fe94a39315df013eef1104903cdb0a2abd322bbecd" dependencies = [ "futures-util", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index 54e53f4b0..a0723984f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] arrow = { version = "53", features = ["pyarrow"] } datafusion = { version = "42.0.0", features = ["pyarrow", "avro", "unicode_expressions"] } datafusion-substrait = { version = "42.0.0", optional = true } +datafusion-proto = { version = "42.0.0" } prost = "0.13" # keep in line with `datafusion-substrait` prost-types = "0.13" # keep in line with `datafusion-substrait` uuid = { version = "1.9", features = ["v4"] } diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 4f40b2088..63c19b3e1 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -36,7 +36,7 @@ from .catalog import Catalog, Database, Table # The following imports are okay to remain as opaque to the user. -from ._internal import Config, LogicalPlan, ExecutionPlan, runtime +from ._internal import Config, runtime from .record_batch import RecordBatchStream, RecordBatch @@ -53,6 +53,8 @@ WindowFrame, ) +from .plan import LogicalPlan, ExecutionPlan + from . import functions, object_store, substrait __version__ = importlib_metadata.version(__name__) diff --git a/python/datafusion/context.py b/python/datafusion/context.py index 5b52d397b..b08e62d77 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -23,7 +23,6 @@ from ._internal import RuntimeConfig as RuntimeConfigInternal from ._internal import SQLOptions as SQLOptionsInternal from ._internal import SessionContext as SessionContextInternal -from ._internal import LogicalPlan, ExecutionPlan from datafusion.catalog import Catalog, Table from datafusion.dataframe import DataFrame @@ -39,6 +38,7 @@ import pandas import polars import pathlib + from datafusion.plan import LogicalPlan, ExecutionPlan class SessionConfig: @@ -268,8 +268,10 @@ def with_disk_manager_specified(self, *paths: str | pathlib.Path) -> RuntimeConf Returns: A new :py:class:`RuntimeConfig` object with the updated setting. """ - paths = [str(p) for p in paths] - self.config_internal = self.config_internal.with_disk_manager_specified(paths) + paths_list = [str(p) for p in paths] + self.config_internal = self.config_internal.with_disk_manager_specified( + paths_list + ) return self def with_unbounded_memory_pool(self) -> RuntimeConfig: @@ -558,7 +560,7 @@ def create_dataframe_from_logical_plan(self, plan: LogicalPlan) -> DataFrame: Returns: DataFrame representation of the logical plan. """ - return DataFrame(self.ctx.create_dataframe_from_logical_plan(plan)) + return DataFrame(self.ctx.create_dataframe_from_logical_plan(plan._raw_plan)) def from_pylist( self, data: list[dict[str, Any]], name: str | None = None @@ -1034,4 +1036,4 @@ def read_table(self, table: Table) -> DataFrame: def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream: """Execute the ``plan`` and return the results.""" - return RecordBatchStream(self.ctx.execute(plan, partitions)) + return RecordBatchStream(self.ctx.execute(plan._raw_plan, partitions)) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 2328ef8fa..c5ac0bb89 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -24,6 +24,7 @@ from typing import Any, List, TYPE_CHECKING from datafusion.record_batch import RecordBatchStream from typing_extensions import deprecated +from datafusion.plan import LogicalPlan, ExecutionPlan if TYPE_CHECKING: import pyarrow as pa @@ -34,10 +35,6 @@ from datafusion._internal import DataFrame as DataFrameInternal from datafusion.expr import Expr, SortExpr, sort_or_default -from datafusion._internal import ( - LogicalPlan, - ExecutionPlan, -) class DataFrame: @@ -316,7 +313,7 @@ def logical_plan(self) -> LogicalPlan: Returns: Unoptimized logical plan. """ - return self.df.logical_plan() + return LogicalPlan(self.df.logical_plan()) def optimized_logical_plan(self) -> LogicalPlan: """Return the optimized ``LogicalPlan``. @@ -324,7 +321,7 @@ def optimized_logical_plan(self) -> LogicalPlan: Returns: Optimized logical plan. """ - return self.df.optimized_logical_plan() + return LogicalPlan(self.df.optimized_logical_plan()) def execution_plan(self) -> ExecutionPlan: """Return the execution/physical plan. @@ -332,7 +329,7 @@ def execution_plan(self) -> ExecutionPlan: Returns: Execution plan. """ - return self.df.execution_plan() + return ExecutionPlan(self.df.execution_plan()) def repartition(self, num: int) -> DataFrame: """Repartition a DataFrame into ``num`` partitions. diff --git a/python/datafusion/expr.py b/python/datafusion/expr.py index 152aa38d3..8600627ae 100644 --- a/python/datafusion/expr.py +++ b/python/datafusion/expr.py @@ -22,16 +22,18 @@ from __future__ import annotations -from typing import Any, Optional, Type +from typing import Any, Optional, Type, TYPE_CHECKING import pyarrow as pa from datafusion.common import DataTypeMap, NullTreatment, RexType from typing_extensions import deprecated -from ._internal import LogicalPlan from ._internal import expr as expr_internal from ._internal import functions as functions_internal +if TYPE_CHECKING: + from datafusion.plan import LogicalPlan + # The following are imported from the internal representation. We may choose to # give these all proper wrappers, or to simply leave as is. These were added # in order to support passing the `test_imports` unit test. @@ -485,7 +487,7 @@ def rex_call_operator(self) -> str: def column_name(self, plan: LogicalPlan) -> str: """Compute the output column name based on the provided logical plan.""" - return self.expr.column_name(plan) + return self.expr.column_name(plan._raw_plan) def order_by(self, *exprs: Expr | SortExpr) -> ExprFuncBuilder: """Set the ordering for a window or aggregate function. diff --git a/python/datafusion/plan.py b/python/datafusion/plan.py new file mode 100644 index 000000000..3836edec6 --- /dev/null +++ b/python/datafusion/plan.py @@ -0,0 +1,147 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""This module supports physical and logical plans in DataFusion.""" + +from __future__ import annotations + +import datafusion._internal as df_internal + +from typing import List, Any, TYPE_CHECKING + +if TYPE_CHECKING: + from datafusion.context import SessionContext + +__all__ = [ + "LogicalPlan", + "ExecutionPlan", +] + + +class LogicalPlan: + """Logical Plan. + + A `LogicalPlan` is a node in a tree of relational operators (such as + Projection or Filter). + + Represents transforming an input relation (table) to an output relation + (table) with a potentially different schema. Plans form a dataflow tree + where data flows from leaves up to the root to produce the query result. + + `LogicalPlan`s can be created by the SQL query planner, the DataFrame API, + or programmatically (for example custom query languages). + """ + + def __init__(self, plan: df_internal.LogicalPlan) -> None: + """This constructor should not be called by the end user.""" + self._raw_plan = plan + + def to_variant(self) -> Any: + """Convert the logical plan into its specific variant.""" + return self._raw_plan.to_variant() + + def inputs(self) -> List[LogicalPlan]: + """Returns the list of inputs to the logical plan.""" + return [LogicalPlan(p) for p in self._raw_plan.inputs()] + + def __repr__(self) -> str: + """Generate a printable representation of the plan.""" + return self._raw_plan.__repr__() + + def display(self) -> str: + """Print the logical plan.""" + return self._raw_plan.display() + + def display_indent(self) -> str: + """Print an indented form of the logical plan.""" + return self._raw_plan.display_indent() + + def display_indent_schema(self) -> str: + """Print an indented form of the schema for the logical plan.""" + return self._raw_plan.display_indent_schema() + + def display_graphviz(self) -> str: + """Print the graph visualization of the logical plan. + + Returns a `format`able structure that produces lines meant for graphical display + using the `DOT` language. This format can be visualized using software from + [`graphviz`](https://graphviz.org/) + """ + return self._raw_plan.display_graphviz() + + @staticmethod + def from_proto(ctx: SessionContext, data: bytes) -> LogicalPlan: + """Create a LogicalPlan from protobuf bytes. + + Tables created in memory from record batches are currently not supported. + """ + return LogicalPlan(df_internal.LogicalPlan.from_proto(ctx.ctx, data)) + + def to_proto(self) -> bytes: + """Convert a LogicalPlan to protobuf bytes. + + Tables created in memory from record batches are currently not supported. + """ + return self._raw_plan.to_proto() + + +class ExecutionPlan: + """Represent nodes in the DataFusion Physical Plan.""" + + def __init__(self, plan: df_internal.ExecutionPlan) -> None: + """This constructor should not be called by the end user.""" + self._raw_plan = plan + + def children(self) -> List[ExecutionPlan]: + """Get a list of children `ExecutionPlan`s that act as inputs to this plan. + + The returned list will be empty for leaf nodes such as scans, will contain a + single value for unary nodes, or two values for binary nodes (such as joins). + """ + return [ExecutionPlan(e) for e in self._raw_plan.children()] + + def display(self) -> str: + """Print the physical plan.""" + return self._raw_plan.display() + + def display_indent(self) -> str: + """Print an indented form of the physical plan.""" + return self._raw_plan.display_indent() + + def __repr__(self) -> str: + """Print a string representation of the physical plan.""" + return self._raw_plan.__repr__() + + @property + def partition_count(self) -> int: + """Returns the number of partitions in the physical plan.""" + return self._raw_plan.partition_count + + @staticmethod + def from_proto(ctx: SessionContext, data: bytes) -> ExecutionPlan: + """Create an ExecutionPlan from protobuf bytes. + + Tables created in memory from record batches are currently not supported. + """ + return ExecutionPlan(df_internal.ExecutionPlan.from_proto(ctx.ctx, data)) + + def to_proto(self) -> bytes: + """Convert an ExecutionPlan into protobuf bytes. + + Tables created in memory from record batches are currently not supported. + """ + return self._raw_plan.to_proto() diff --git a/python/datafusion/substrait.py b/python/datafusion/substrait.py index 0cdd19a51..dea47acca 100644 --- a/python/datafusion/substrait.py +++ b/python/datafusion/substrait.py @@ -28,10 +28,10 @@ from typing import TYPE_CHECKING from typing_extensions import deprecated import pathlib +from datafusion.plan import LogicalPlan if TYPE_CHECKING: from datafusion.context import SessionContext - from datafusion._internal import LogicalPlan __all__ = [ "Plan", @@ -156,7 +156,9 @@ def to_substrait_plan(logical_plan: LogicalPlan, ctx: SessionContext) -> Plan: Substrait plan. """ return Plan( - substrait_internal.Producer.to_substrait_plan(logical_plan, ctx.ctx) + substrait_internal.Producer.to_substrait_plan( + logical_plan._raw_plan, ctx.ctx + ) ) @@ -181,8 +183,8 @@ def from_substrait_plan(ctx: SessionContext, plan: Plan) -> LogicalPlan: Returns: LogicalPlan. """ - return substrait_internal.Consumer.from_substrait_plan( - ctx.ctx, plan.plan_internal + return LogicalPlan( + substrait_internal.Consumer.from_substrait_plan(ctx.ctx, plan.plan_internal) ) diff --git a/python/datafusion/tests/test_plans.py b/python/datafusion/tests/test_plans.py new file mode 100644 index 000000000..0283a4e6a --- /dev/null +++ b/python/datafusion/tests/test_plans.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion import SessionContext, LogicalPlan, ExecutionPlan +import pytest + + +# Note: We must use CSV because memory tables are currently not supported for +# conversion to/from protobuf. +@pytest.fixture +def df(): + ctx = SessionContext() + return ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv").select("c1") + + +def test_logical_plan_to_proto(ctx, df) -> None: + logical_plan_bytes = df.logical_plan().to_proto() + logical_plan = LogicalPlan.from_proto(ctx, logical_plan_bytes) + + df_round_trip = ctx.create_dataframe_from_logical_plan(logical_plan) + + assert df.collect() == df_round_trip.collect() + + original_execution_plan = df.execution_plan() + execution_plan_bytes = original_execution_plan.to_proto() + execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes) + + assert str(original_execution_plan) == str(execution_plan) diff --git a/src/physical_plan.rs b/src/physical_plan.rs index c97c1a96e..9ef2f0ebb 100644 --- a/src/physical_plan.rs +++ b/src/physical_plan.rs @@ -16,9 +16,13 @@ // under the License. use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties}; +use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec}; +use prost::Message; use std::sync::Arc; -use pyo3::prelude::*; +use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyBytes}; + +use crate::{context::PySessionContext, errors::DataFusionError}; #[pyclass(name = "ExecutionPlan", module = "datafusion", subclass)] #[derive(Debug, Clone)] @@ -54,6 +58,35 @@ impl PyExecutionPlan { format!("{}", d.indent(false)) } + pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyResult> { + let codec = DefaultPhysicalExtensionCodec {}; + let proto = datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan( + self.plan.clone(), + &codec, + )?; + + let bytes = proto.encode_to_vec(); + Ok(PyBytes::new_bound(py, &bytes)) + } + + #[staticmethod] + pub fn from_proto(ctx: PySessionContext, proto_msg: Bound<'_, PyBytes>) -> PyResult { + let bytes: &[u8] = proto_msg.extract()?; + let proto_plan = + datafusion_proto::protobuf::PhysicalPlanNode::decode(bytes).map_err(|e| { + PyRuntimeError::new_err(format!( + "Unable to decode logical node from serialized bytes: {}", + e + )) + })?; + + let codec = DefaultPhysicalExtensionCodec {}; + let plan = proto_plan + .try_into_physical_plan(&ctx.ctx, &ctx.ctx.runtime_env(), &codec) + .map_err(DataFusionError::from)?; + Ok(Self::new(plan)) + } + fn __repr__(&self) -> String { self.display_indent() } diff --git a/src/sql/logical.rs b/src/sql/logical.rs index d00f0af3f..fc398ff89 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use crate::errors::py_unsupported_variant_err; use crate::expr::aggregate::PyAggregate; use crate::expr::analyze::PyAnalyze; use crate::expr::cross_join::PyCrossJoin; @@ -35,8 +34,11 @@ use crate::expr::subquery_alias::PySubqueryAlias; use crate::expr::table_scan::PyTableScan; use crate::expr::unnest::PyUnnest; use crate::expr::window::PyWindowExpr; -use datafusion::logical_expr::LogicalPlan; -use pyo3::prelude::*; +use crate::{context::PySessionContext, errors::py_unsupported_variant_err}; +use datafusion::{error::DataFusionError, logical_expr::LogicalPlan}; +use datafusion_proto::logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec}; +use prost::Message; +use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyBytes}; use crate::expr::logical_node::LogicalNode; @@ -125,6 +127,33 @@ impl PyLogicalPlan { fn display_graphviz(&self) -> String { format!("{}", self.plan.display_graphviz()) } + + pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyResult> { + let codec = DefaultLogicalExtensionCodec {}; + let proto = + datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(&self.plan, &codec)?; + + let bytes = proto.encode_to_vec(); + Ok(PyBytes::new_bound(py, &bytes)) + } + + #[staticmethod] + pub fn from_proto(ctx: PySessionContext, proto_msg: Bound<'_, PyBytes>) -> PyResult { + let bytes: &[u8] = proto_msg.extract()?; + let proto_plan = + datafusion_proto::protobuf::LogicalPlanNode::decode(bytes).map_err(|e| { + PyRuntimeError::new_err(format!( + "Unable to decode logical node from serialized bytes: {}", + e + )) + })?; + + let codec = DefaultLogicalExtensionCodec {}; + let plan = proto_plan + .try_into_logical_plan(&ctx.ctx, &codec) + .map_err(DataFusionError::from)?; + Ok(Self::new(plan)) + } } impl From for LogicalPlan {