diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..8a01025 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,34 @@ +# Ignore the target directory where Rust build artifacts are placed +/target + +# Ignore the local Cargo registry and git db (cached crates) +/cargo/.git +/cargo/registry + +# Ignore build scripts or binaries +*.exe +*.dll +*.so +*.dylib + +# Ignore backup and swap files +*.swp +*.swo +*.bak + +# Ignore any IDE-specific or editor-specific files (e.g., VSCode, IntelliJ, etc.) +**/.vscode +**/.idea +**/.gdb_history +**/.git +**/.DS_Store + +# Ignore test result files +/tests/results/ + +# Ignore unnecessary temporary or system files +**/node_modules +*.log +*.tmp +*.lock +!Cargo.lock diff --git a/Cargo.lock b/Cargo.lock index fcb331c..1304f8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -351,9 +351,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.13" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e614738943d3f68c628ae3dbce7c3daffb196665f82f8c8ea6b65de73c79429" +checksum = "103db485efc3e41214fe4fda9f3dbeae2eb9082f48fd236e6095627a9422066e" dependencies = [ "bzip2", "flate2", @@ -387,12 +387,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "atomic-waker" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" - [[package]] name = "autocfg" version = "1.4.0" @@ -525,9 +519,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.28" +version = "1.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1" +checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" dependencies = [ "jobserver", "libc", @@ -549,7 +543,6 @@ dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", - "serde", "windows-targets", ] @@ -611,16 +604,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" -[[package]] -name = "core-foundation" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1122,31 +1105,6 @@ dependencies = [ "prost 0.13.3", ] -[[package]] -name = "datafusion-python" -version = "41.0.0" -source = "git+https://github.com/apache/datafusion-python#ec8246da3b45e766fe6fb515ade01e0bae73af98" -dependencies = [ - "arrow", - "async-trait", - "datafusion", - "datafusion-proto", - "futures", - "mimalloc", - "object_store", - "parking_lot", - "prost 0.13.3", - "prost-types 0.13.3", - "pyo3", - "pyo3-build-config", - "rand", - "regex-syntax", - "syn 2.0.79", - "tokio", - "url", - "uuid", -] - [[package]] name = "datafusion-sql" version = "42.0.0" @@ -1171,7 +1129,6 @@ dependencies = [ "anyhow", "datafusion", "datafusion-proto", - "datafusion-python", "futures", "log", "pretty_assertions", @@ -1255,12 +1212,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "fnv" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" - [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1392,25 +1343,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" -[[package]] -name = "h2" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" -dependencies = [ - "atomic-waker", - "bytes", - "fnv", - "futures-core", - "futures-sink", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "half" version = "2.4.1" @@ -1471,109 +1403,12 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "http" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - -[[package]] -name = "http-body" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" -dependencies = [ - "bytes", - "http", -] - -[[package]] -name = "http-body-util" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" -dependencies = [ - "bytes", - "futures-util", - "http", - "http-body", - "pin-project-lite", -] - -[[package]] -name = "httparse" -version = "1.9.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" - [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" -[[package]] -name = "hyper" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" -dependencies = [ - "bytes", - "futures-channel", - "futures-util", - "h2", - "http", - "http-body", - "httparse", - "itoa", - "pin-project-lite", - "smallvec", - "tokio", - "want", -] - -[[package]] -name = "hyper-rustls" -version = "0.27.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" -dependencies = [ - "futures-util", - "http", - "hyper", - "hyper-util", - "rustls", - "rustls-native-certs", - "rustls-pki-types", - "tokio", - "tokio-rustls", - "tower-service", -] - -[[package]] -name = "hyper-util" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" -dependencies = [ - "bytes", - "futures-channel", - "futures-util", - "http", - "http-body", - "hyper", - "pin-project-lite", - "socket2", - "tokio", - "tower-service", - "tracing", -] - [[package]] name = "iana-time-zone" version = "0.1.61" @@ -1641,12 +1476,6 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" -[[package]] -name = "ipnet" -version = "2.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" - [[package]] name = "itertools" version = "0.10.5" @@ -1682,9 +1511,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.70" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" dependencies = [ "wasm-bindgen", ] @@ -1761,9 +1590,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.159" +version = "0.2.160" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" +checksum = "f0b21006cd1874ae9e650973c565615676dc4a274c965bb0a73796dac838ce4f" [[package]] name = "libflate" @@ -1795,16 +1624,6 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" -[[package]] -name = "libmimalloc-sys" -version = "0.1.39" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23aa6811d3bd4deb8a84dde645f943476d13b248d818edcf8ce0b2f37f036b44" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -1872,21 +1691,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "mimalloc" -version = "0.1.43" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68914350ae34959d83f732418d51e2427a794055d0b9529f48259ac07af65633" -dependencies = [ - "libmimalloc-sys", -] - -[[package]] -name = "mime" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" - [[package]] name = "miniz_oxide" version = "0.8.0" @@ -1896,18 +1700,6 @@ dependencies = [ "adler2", ] -[[package]] -name = "mio" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" -dependencies = [ - "hermit-abi", - "libc", - "wasi", - "windows-sys 0.52.0", -] - [[package]] name = "multimap" version = "0.8.3" @@ -2014,23 +1806,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25a0c4b3a0e31f8b66f71ad8064521efa773910196e2cde791436f13409f3b45" dependencies = [ "async-trait", - "base64", "bytes", "chrono", "futures", "humantime", - "hyper", "itertools 0.13.0", - "md-5", "parking_lot", "percent-encoding", - "quick-xml", - "rand", - "reqwest", - "ring", - "rustls-pemfile", - "serde", - "serde_json", "snafu", "tokio", "tracing", @@ -2044,12 +1826,6 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" -[[package]] -name = "openssl-probe" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" - [[package]] name = "ordered-float" version = "2.10.1" @@ -2242,9 +2018,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.86" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +checksum = "7c3a7fc5db1e57d5a779a352c8cdb57b29aa4c40cc69c3a68a7fedc815fbf2f9" dependencies = [ "unicode-ident", ] @@ -2337,9 +2113,9 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.22.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15ee168e30649f7f234c3d49ef5a7a6cbf5134289bc46c29ff3155fa3221c225" +checksum = "3d922163ba1f79c04bc49073ba7b32fd5a8d3b76a87c955921234b8e77333c51" dependencies = [ "cfg-if", "indoc", @@ -2355,9 +2131,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.22.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e61cef80755fe9e46bb8a0b8f20752ca7676dcc07a5277d8b7768c6172e529b3" +checksum = "bc38c5feeb496c8321091edf3d63e9a6829eab4b863b4a6a65f26f3e9cc6b179" dependencies = [ "once_cell", "target-lexicon", @@ -2365,9 +2141,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.22.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67ce096073ec5405f5ee2b8b31f03a68e02aa10d5d4f565eca04acc41931fa1c" +checksum = "94845622d88ae274d2729fcefc850e63d7a3ddff5e3ce11bd88486db9f1d357d" dependencies = [ "libc", "pyo3-build-config", @@ -2375,9 +2151,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.22.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2440c6d12bc8f3ae39f1e775266fa5122fd0c8891ce7520fa6048e683ad3de28" +checksum = "e655aad15e09b94ffdb3ce3d217acf652e26bbc37697ef012f5e5e348c716e5e" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -2387,9 +2163,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.22.3" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1be962f0e06da8f8465729ea2cb71a416d2257dff56cbe40a70d3e62a93ae5d1" +checksum = "ae1e3f09eecd94618f60a455a23def79f79eba4dc561a97324bf9ac8c6df30ce" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -2404,64 +2180,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b76f1009795ca44bb5aaae8fd3f18953e209259c33d9b059b1f53d58ab7511db" -[[package]] -name = "quick-xml" -version = "0.36.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" -dependencies = [ - "memchr", - "serde", -] - -[[package]] -name = "quinn" -version = "0.11.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" -dependencies = [ - "bytes", - "pin-project-lite", - "quinn-proto", - "quinn-udp", - "rustc-hash", - "rustls", - "socket2", - "thiserror", - "tokio", - "tracing", -] - -[[package]] -name = "quinn-proto" -version = "0.11.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" -dependencies = [ - "bytes", - "rand", - "ring", - "rustc-hash", - "rustls", - "slab", - "thiserror", - "tinyvec", - "tracing", -] - -[[package]] -name = "quinn-udp" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fe68c2e9e1a1234e218683dbdf9f9dfcb094113c5ac2b938dfcb9bab4c4140b" -dependencies = [ - "libc", - "once_cell", - "socket2", - "tracing", - "windows-sys 0.59.0", -] - [[package]] name = "quote" version = "1.0.37" @@ -2545,66 +2263,6 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" -[[package]] -name = "reqwest" -version = "0.12.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f713147fbe92361e52392c73b8c9e48c04c6625bce969ef54dc901e58e042a7b" -dependencies = [ - "base64", - "bytes", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "http-body-util", - "hyper", - "hyper-rustls", - "hyper-util", - "ipnet", - "js-sys", - "log", - "mime", - "once_cell", - "percent-encoding", - "pin-project-lite", - "quinn", - "rustls", - "rustls-native-certs", - "rustls-pemfile", - "rustls-pki-types", - "serde", - "serde_json", - "serde_urlencoded", - "sync_wrapper", - "tokio", - "tokio-rustls", - "tokio-util", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "wasm-streams", - "web-sys", - "windows-registry", -] - -[[package]] -name = "ring" -version = "0.17.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" -dependencies = [ - "cc", - "cfg-if", - "getrandom", - "libc", - "spin", - "untrusted", - "windows-sys 0.52.0", -] - [[package]] name = "rle-decode-fast" version = "1.0.3" @@ -2617,12 +2275,6 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" -[[package]] -name = "rustc-hash" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" - [[package]] name = "rustc_version" version = "0.4.1" @@ -2645,64 +2297,11 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rustls" -version = "0.23.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" -dependencies = [ - "once_cell", - "ring", - "rustls-pki-types", - "rustls-webpki", - "subtle", - "zeroize", -] - -[[package]] -name = "rustls-native-certs" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" -dependencies = [ - "openssl-probe", - "rustls-pemfile", - "rustls-pki-types", - "schannel", - "security-framework", -] - -[[package]] -name = "rustls-pemfile" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" -dependencies = [ - "rustls-pki-types", -] - -[[package]] -name = "rustls-pki-types" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" - -[[package]] -name = "rustls-webpki" -version = "0.102.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" -dependencies = [ - "ring", - "rustls-pki-types", - "untrusted", -] - [[package]] name = "rustversion" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" [[package]] name = "ryu" @@ -2719,44 +2318,12 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "schannel" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9aaafd5a2b6e3d657ff009d82fbd630b6bd54dd4eb06f21693925cdf80f9b8b" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "security-framework" -version = "2.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" -dependencies = [ - "bitflags 2.6.0", - "core-foundation", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - -[[package]] -name = "security-framework-sys" -version = "2.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "semver" version = "1.0.23" @@ -2801,18 +2368,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_urlencoded" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" -dependencies = [ - "form_urlencoded", - "itoa", - "ryu", - "serde", -] - [[package]] name = "sha2" version = "0.10.8" @@ -2878,22 +2433,6 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" -[[package]] -name = "socket2" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" - [[package]] name = "sqlparser" version = "0.50.0" @@ -2990,15 +2529,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "sync_wrapper" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" -dependencies = [ - "futures-core", -] - [[package]] name = "target-lexicon" version = "0.12.16" @@ -3081,12 +2611,8 @@ checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", - "libc", - "mio", "pin-project-lite", - "socket2", "tokio-macros", - "windows-sys 0.52.0", ] [[package]] @@ -3100,17 +2626,6 @@ dependencies = [ "syn 2.0.79", ] -[[package]] -name = "tokio-rustls" -version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" -dependencies = [ - "rustls", - "rustls-pki-types", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.12" @@ -3137,12 +2652,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "tower-service" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" - [[package]] name = "tracing" version = "0.1.40" @@ -3174,12 +2683,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "try-lock" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" - [[package]] name = "twox-hash" version = "1.6.3" @@ -3255,12 +2758,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" -[[package]] -name = "untrusted" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" - [[package]] name = "url" version = "2.5.2" @@ -3274,9 +2771,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "getrandom", "serde", @@ -3298,15 +2795,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "want" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" -dependencies = [ - "try-lock", -] - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -3315,9 +2803,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" dependencies = [ "cfg-if", "once_cell", @@ -3326,9 +2814,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" dependencies = [ "bumpalo", "log", @@ -3339,23 +2827,11 @@ dependencies = [ "wasm-bindgen-shared", ] -[[package]] -name = "wasm-bindgen-futures" -version = "0.4.43" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" -dependencies = [ - "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "wasm-bindgen-macro" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3363,9 +2839,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.93" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", @@ -3376,28 +2852,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.93" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" - -[[package]] -name = "wasm-streams" -version = "0.4.1" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e072d4e72f700fb3443d8fe94a39315df013eef1104903cdb0a2abd322bbecd" -dependencies = [ - "futures-util", - "js-sys", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" [[package]] name = "web-sys" -version = "0.3.70" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" dependencies = [ "js-sys", "wasm-bindgen", @@ -3433,36 +2896,6 @@ dependencies = [ "windows-targets", ] -[[package]] -name = "windows-registry" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" -dependencies = [ - "windows-result", - "windows-strings", - "windows-targets", -] - -[[package]] -name = "windows-result" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" -dependencies = [ - "windows-targets", -] - -[[package]] -name = "windows-strings" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" -dependencies = [ - "windows-result", - "windows-targets", -] - [[package]] name = "windows-sys" version = "0.52.0" @@ -3581,12 +3014,6 @@ dependencies = [ "syn 2.0.79", ] -[[package]] -name = "zeroize" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" - [[package]] name = "zstd" version = "0.12.4" diff --git a/Cargo.toml b/Cargo.toml index eb91943..848c4d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,10 +31,6 @@ build = "build.rs" [dependencies] datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] } datafusion-proto = "42.0.0" - -# temporarily point to revision until version 42 is released -datafusion-python = { git = "https://github.com/apache/datafusion-python" } - futures = "0.3" log = "0.4" prost = "0.13" diff --git a/build.rs b/build.rs index a6b34dc..ddcf19c 100644 --- a/build.rs +++ b/build.rs @@ -32,14 +32,16 @@ fn main() -> Result<(), String> { // We don't include the proto files in releases so that downstreams // do not need to have PROTOC included - if Path::new("src/proto/datafusion-ray.proto").exists() { + if Path::new("src/proto/datafusion_ray.proto").exists() { + println!("cargo:rerun-if-changed=src/proto/datafusion_common.proto"); println!("cargo:rerun-if-changed=src/proto/datafusion.proto"); - println!("cargo:rerun-if-changed=src/proto/datafusion-ray.proto"); + println!("cargo:rerun-if-changed=src/proto/datafusion_ray.proto"); tonic_build::configure() .extern_path(".datafusion", "::datafusion_proto::protobuf") - .compile(&["src/proto/datafusion-ray.proto"], &["src/proto"]) + .extern_path(".datafusion_common", "::datafusion_proto::protobuf") + .compile(&["src/proto/datafusion_ray.proto"], &["src/proto"]) .map_err(|e| format!("protobuf compilation failed: {e}"))?; - let generated_source_path = out.join("datafusion-ray.protobuf.rs"); + let generated_source_path = out.join("datafusion_ray.protobuf.rs"); let code = std::fs::read_to_string(generated_source_path).unwrap(); let mut file = std::fs::OpenOptions::new() .write(true) diff --git a/datafusion_ray/__init__.py b/datafusion_ray/__init__.py index d29bb29..aafe7d2 100644 --- a/datafusion_ray/__init__.py +++ b/datafusion_ray/__init__.py @@ -25,8 +25,6 @@ ExecutionGraph, QueryStage, execute_partition, - serialize_execution_plan, - deserialize_execution_plan, ) from .context import DatafusionRayContext diff --git a/datafusion_ray/context.py b/datafusion_ray/context.py index b853e5c..5cbaf86 100644 --- a/datafusion_ray/context.py +++ b/datafusion_ray/context.py @@ -25,7 +25,8 @@ import datafusion_ray from datafusion_ray import Context, ExecutionGraph, QueryStage -from typing import List +from typing import List, Any +from datafusion import SessionContext def schedule_execution( @@ -74,7 +75,7 @@ def _get_worker_inputs( return ids, futures # schedule the actual execution workers - plan_bytes = datafusion_ray.serialize_execution_plan(stage.get_execution_plan()) + plan_bytes = stage.get_execution_plan_bytes() futures = [] opt = {} # TODO not sure why we had this but my Ray cluster could not find suitable resource @@ -106,9 +107,7 @@ def execute_query_stage( # execute child stages first child_futures = [] for child_id in stage.get_child_stage_ids(): - child_futures.append( - execute_query_stage.remote(query_stages, child_id) - ) + child_futures.append(execute_query_stage.remote(query_stages, child_id)) # if the query stage has a single output partition then we need to execute for the output # partition, otherwise we need to execute in parallel for each input partition @@ -149,12 +148,12 @@ def _get_worker_inputs( return ids, futures # schedule the actual execution workers - plan_bytes = datafusion_ray.serialize_execution_plan(stage.get_execution_plan()) + plan_bytes = stage.get_execution_plan_bytes() futures = [] opt = {} # TODO not sure why we had this but my Ray cluster could not find suitable resource # until I commented this out - #opt["resources"] = {"worker": 1e-3} + # opt["resources"] = {"worker": 1e-3} opt["num_returns"] = output_partitions_count for part in range(concurrency): ids, inputs = _get_worker_inputs(part) @@ -176,7 +175,7 @@ def execute_query_partition( *input_partitions: list[pa.RecordBatch], ) -> Iterable[pa.RecordBatch]: start_time = time.time() - plan = datafusion_ray.deserialize_execution_plan(plan_bytes) + # plan = datafusion_ray.deserialize_execution_plan(plan_bytes) # print( # "Worker executing plan {} partition #{} with shuffle inputs {}".format( # plan.display(), @@ -190,7 +189,7 @@ def execute_query_partition( # This is delegating to DataFusion for execution, but this would be a good place # to plug in other execution engines by translating the plan into another engine's plan # (perhaps via Substrait, once DataFusion supports converting a physical plan to Substrait) - ret = datafusion_ray.execute_partition(plan, part, partitions) + ret = datafusion_ray.execute_partition(plan_bytes, part, partitions) duration = time.time() - start_time event = { "cat": f"{stage_id}-{part}", @@ -206,9 +205,9 @@ def execute_query_partition( class DatafusionRayContext: - def __init__(self, num_workers: int = 1): - self.ctx = Context(num_workers) - self.num_workers = num_workers + def __init__(self, df_ctx: SessionContext): + self.df_ctx = df_ctx + self.ctx = Context(df_ctx) def register_csv(self, table_name: str, path: str, has_header: bool): self.ctx.register_csv(table_name, path, has_header) @@ -227,7 +226,18 @@ def sql(self, sql: str) -> pa.RecordBatch: self.ctx.sql(sql) return [] - graph = self.ctx.plan(sql) + df = self.df_ctx.sql(sql) + execution_plan = df.execution_plan() + + graph = self.ctx.plan(execution_plan) + final_stage_id = graph.get_final_query_stage().id() + partitions = schedule_execution(graph, final_stage_id, True) + # assert len(partitions) == 1, len(partitions) + result_set = ray.get(partitions[0]) + return result_set + + def plan(self, physical_plan: Any) -> pa.RecordBatch: + graph = self.ctx.plan(physical_plan) final_stage_id = graph.get_final_query_stage().id() partitions = schedule_execution(graph, final_stage_id, True) # assert len(partitions) == 1, len(partitions) diff --git a/datafusion_ray/tests/test_context.py b/datafusion_ray/tests/test_context.py index 3e90ff4..40b2578 100644 --- a/datafusion_ray/tests/test_context.py +++ b/datafusion_ray/tests/test_context.py @@ -15,10 +15,12 @@ # specific language governing permissions and limitations # under the License. -import pytest from datafusion_ray import Context +from datafusion import SessionContext + def test(): - ctx = Context(1) - ctx.register_csv('tips', 'examples/tips.csv', True) + df_ctx = SessionContext() + ctx = Context(df_ctx, False) + df_ctx.register_csv("tips", "examples/tips.csv", has_header=True) ctx.plan("SELECT * FROM tips") diff --git a/examples/tips.py b/examples/tips.py index a4e499e..67ac64e 100644 --- a/examples/tips.py +++ b/examples/tips.py @@ -18,6 +18,7 @@ import os import ray +from datafusion import SessionContext, col, lit, functions as F from datafusion_ray import DatafusionRayContext SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -26,13 +27,29 @@ ray.init() # Create a context and register a table -ctx = DatafusionRayContext(2) +df_ctx = SessionContext() + +ray_ctx = DatafusionRayContext(df_ctx) # Register either a CSV or Parquet file # ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True) -ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet") +df_ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet") -result_set = ctx.sql( +result_set = ray_ctx.sql( "select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker" ) for record_batch in result_set: print(record_batch.to_pandas()) + +# Alternatively, to use the DataFrame API +df = df_ctx.read_parquet(f"{SCRIPT_DIR}/tips.parquet") +df = ( + df.aggregate( + [col("sex"), col("smoker"), col("day"), col("time")], + [F.avg(col("tip") / col("total_bill")).alias("tip_pct")], + ) + .filter(col("day") != lit("Dinner")) + .aggregate([col("sex"), col("smoker")], [F.avg(col("tip_pct")).alias("avg_pct")]) +) + +ray_results = ray_ctx.plan(df.execution_plan()) +df_ctx.create_dataframe([ray_results]).show() diff --git a/k8s/Dockerfile b/k8s/Dockerfile index 20294e9..fea8e9f 100644 --- a/k8s/Dockerfile +++ b/k8s/Dockerfile @@ -1,34 +1,25 @@ FROM rayproject/ray:2.37.0.cabc24-py312 +USER ray + RUN sudo apt update && \ - sudo apt install -y curl build-essential + sudo apt install -y curl build-essential protobuf-compiler # Intall Rust RUN curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain stable -y -WORKDIR /home/ray - -# install dependencies -COPY requirements-in.txt /home/ray/ -RUN python3 -m venv venv && \ - source venv/bin/activate && \ - pip3 install -r requirements-in.txt - -# add sources -RUN mkdir /home/ray/src -RUN mkdir /home/ray/datafusion_ray -COPY src /home/ray/src/ -COPY datafusion_ray /home/ray/datafusion_ray/ -COPY pyproject.toml /home/ray/ -COPY Cargo.* /home/ray/ -COPY build.rs /home/ray/ -COPY README.md /home/ray/ - -# build datafusion_ray -RUN source venv/bin/activate && \ +COPY . /home/ray/datafusion_ray +WORKDIR /home/ray/datafusion_ray + +# Build within container + +RUN sudo chown -R ray:users /home/ray/datafusion_ray && \ + pip3 install -r requirements-in.txt && \ source /home/ray/.cargo/env && \ maturin build --release +# Copy built wheel to target container + FROM rayproject/ray:2.37.0.cabc24-py312 -COPY --from=0 /home/ray/target/wheels/*.whl /home/ray/ +COPY --from=0 /home/ray/datafusion_ray/target/wheels/*.whl /home/ray/ RUN pip3 install /home/ray/*.whl diff --git a/k8s/Dockerfile.aarch64 b/k8s/Dockerfile.aarch64 new file mode 100644 index 0000000..015a7f8 --- /dev/null +++ b/k8s/Dockerfile.aarch64 @@ -0,0 +1,25 @@ +FROM rayproject/ray:2.37.0-py311-aarch64 + +USER ray + +RUN sudo apt update && \ + sudo apt install -y curl build-essential protobuf-compiler + +# Intall Rust +RUN curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain stable -y + +COPY . /home/ray/datafusion_ray +WORKDIR /home/ray/datafusion_ray + +# Build within container + +RUN sudo chown -R ray:users /home/ray/datafusion_ray && \ + pip3 install -r requirements-in.txt && \ + source /home/ray/.cargo/env && \ + maturin build --release + +# Copy built wheel to target container + +FROM rayproject/ray:2.37.0-py311-aarch64 +COPY --from=0 /home/ray/datafusion_ray/target/wheels/*.whl /home/ray/ +RUN pip3 install /home/ray/*.whl diff --git a/pyproject.toml b/pyproject.toml index 7a53bcd..10e097e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,11 @@ classifiers = [ "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] - +dependencies = [ + "datafusion>=42.0.0", + "pyarrow>=11.0.0", + "typing-extensions;python_version<'3.13'", +] [tool.maturin] module-name = "datafusion_ray._datafusion_ray_internal" diff --git a/requirements-in.txt b/requirements-in.txt index 166b032..3fa00a6 100644 --- a/requirements-in.txt +++ b/requirements-in.txt @@ -7,5 +7,6 @@ numpy pyarrow pytest ray==2.37.0 +datafusion>=42.0.0 toml importlib_metadata; python_version < "3.8" diff --git a/src/context.rs b/src/context.rs index 2702e92..22ec550 100644 --- a/src/context.rs +++ b/src/context.rs @@ -17,24 +17,21 @@ use crate::planner::{make_execution_graph, PyExecutionGraph}; use crate::shuffle::{RayShuffleReaderExec, ShuffleCodec}; -use crate::utils::wait_for_future; use datafusion::arrow::pyarrow::FromPyArrow; use datafusion::arrow::pyarrow::ToPyArrow; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::TaskContext; -use datafusion::execution::disk_manager::DiskManagerConfig; -use datafusion::execution::memory_pool::FairSpillPool; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::{displayable, ExecutionPlan}; use datafusion::prelude::*; -use datafusion_proto::bytes::{ - physical_plan_from_bytes_with_extension_codec, physical_plan_to_bytes_with_extension_codec, -}; -use datafusion_python::physical_plan::PyExecutionPlan; +use datafusion_proto::physical_plan::AsExecutionPlan; +use datafusion_proto::protobuf; use futures::StreamExt; +use prost::Message; +use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; -use pyo3::types::{PyList, PyLong, PyTuple}; +use pyo3::types::{PyBytes, PyList, PyLong, PyTuple}; use std::collections::HashMap; use std::sync::Arc; use tokio::runtime::Runtime; @@ -44,75 +41,61 @@ type PyResultSet = Vec; #[pyclass(name = "Context", module = "datafusion_ray", subclass)] pub struct PyContext { - pub(crate) ctx: SessionContext, + pub(crate) py_ctx: PyObject, +} + +pub(crate) fn execution_plan_from_pyany( + py_plan: &Bound, +) -> PyResult> { + let py_proto = py_plan.call_method0("to_proto")?; + let plan_bytes: &[u8] = py_proto.extract()?; + let plan_node = protobuf::PhysicalPlanNode::try_decode(plan_bytes).map_err(|e| { + PyRuntimeError::new_err(format!( + "Unable to decode physical plan protobuf message: {}", + e + )) + })?; + + let codec = ShuffleCodec {}; + let runtime = RuntimeEnv::default(); + let registry = SessionContext::new(); + plan_node + .try_into_physical_plan(®istry, &runtime, &codec) + .map_err(|e| e.into()) } #[pymethods] impl PyContext { #[new] - pub fn new(target_partitions: usize) -> Result { - let config = SessionConfig::default() - .with_target_partitions(target_partitions) - .with_batch_size(16 * 1024) - .with_repartition_aggregations(true) - .with_repartition_windows(true) - .with_repartition_joins(true) - .with_parquet_pruning(true); - - let mem_pool_size = 1024 * 1024 * 1024; - let runtime_config = datafusion::execution::runtime_env::RuntimeConfig::new() - .with_memory_pool(Arc::new(FairSpillPool::new(mem_pool_size))) - .with_disk_manager(DiskManagerConfig::new_specified(vec!["/tmp".into()])); - let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); - let ctx = SessionContext::new_with_config_rt(config, runtime); - Ok(Self { ctx }) - } - - pub fn register_csv( - &self, - name: &str, - path: &str, - has_header: bool, - py: Python, - ) -> PyResult<()> { - let options = CsvReadOptions::default().has_header(has_header); - wait_for_future(py, self.ctx.register_csv(name, path, options))?; - Ok(()) - } - - pub fn register_parquet(&self, name: &str, path: &str, py: Python) -> PyResult<()> { - let options = ParquetReadOptions::default(); - wait_for_future(py, self.ctx.register_parquet(name, path, options))?; - Ok(()) - } - - pub fn register_datalake_table( - &self, - _name: &str, - _path: Vec, - _py: Python, - ) -> PyResult<()> { - // let options = ParquetReadOptions::default(); - // let listing_options = options.to_listing_options(&self.ctx.state().config()); - // wait_for_future(py, self.ctx.register_listing_table(name, path, listing_options, None, None))?; - // Ok(()) - unimplemented!() + pub fn new(session_ctx: PyObject) -> Result { + Ok(Self { + py_ctx: session_ctx, + }) } /// Execute SQL directly against the DataFusion context. Useful for statements /// such as "create view" or "drop view" - pub fn sql(&self, sql: &str, py: Python) -> PyResult<()> { - println!("Executing {}", sql); - let _df = wait_for_future(py, self.ctx.sql(sql))?; + pub fn sql(&self, query: &str, py: Python) -> PyResult<()> { + println!("Executing {}", query); + // let _df = wait_for_future(py, self.ctx.sql(sql))?; + let _df = self.run_sql(query, py); Ok(()) } + fn run_sql(&self, query: &str, py: Python) -> PyResult> { + let args = PyTuple::new_bound(py, [query]); + self.py_ctx.call_method1(py, "sql", args) + } + /// Plan a distributed SELECT query for executing against the Ray workers - pub fn plan(&self, sql: &str, py: Python) -> PyResult { - println!("Planning {}", sql); - let df = wait_for_future(py, self.ctx.sql(sql))?; - let plan = wait_for_future(py, df.create_physical_plan())?; + pub fn plan(&self, plan: &Bound) -> PyResult { + // println!("Planning {}", sql); + // let df = wait_for_future(py, self.ctx.sql(sql))?; + // let py_df = self.run_sql(sql, py)?; + // let py_plan = py_df.call_method0(py, "execution_plan")?; + // let py_plan = py_plan.bind(py); + let plan = execution_plan_from_pyany(plan)?; let graph = make_execution_graph(plan.clone())?; // debug logging @@ -132,43 +115,59 @@ impl PyContext { /// Execute a partition of a query plan. This will typically be executing a shuffle write and write the results to disk pub fn execute_partition( &self, - plan: PyExecutionPlan, + plan: &Bound<'_, PyBytes>, part: usize, inputs: PyObject, py: Python, - ) -> PyResultSet { + ) -> PyResult { execute_partition(plan, part, inputs, py) } } #[pyfunction] pub fn execute_partition( - plan: PyExecutionPlan, + plan_bytes: &Bound<'_, PyBytes>, part: usize, inputs: PyObject, py: Python, -) -> PyResultSet { +) -> PyResult { + let plan = deserialize_execution_plan(plan_bytes)?; _execute_partition(plan, part, inputs) .unwrap() .into_iter() - .map(|batch| batch.to_pyarrow(py).unwrap()) // TODO(@lsf) handle error + .map(|batch| batch.to_pyarrow(py)) .collect() } -// TODO(@lsf) change this to use pickle -#[pyfunction] -pub fn serialize_execution_plan(plan: PyExecutionPlan) -> PyResult> { +pub fn serialize_execution_plan( + plan: Arc, + py: Python, +) -> PyResult> { let codec = ShuffleCodec {}; - Ok(physical_plan_to_bytes_with_extension_codec(plan.plan, &codec)?.to_vec()) + let proto = + datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(plan.clone(), &codec)?; + + let bytes = proto.encode_to_vec(); + Ok(PyBytes::new_bound(py, &bytes)) } -#[pyfunction] -pub fn deserialize_execution_plan(bytes: Vec) -> PyResult { +pub fn deserialize_execution_plan(proto_msg: &Bound) -> PyResult> { + let bytes: &[u8] = proto_msg.extract()?; + let proto_plan = + datafusion_proto::protobuf::PhysicalPlanNode::try_decode(bytes).map_err(|e| { + PyRuntimeError::new_err(format!( + "Unable to decode logical node from serialized bytes: {}", + e + )) + })?; + let ctx = SessionContext::new(); let codec = ShuffleCodec {}; - Ok(PyExecutionPlan::new( - physical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?, - )) + let plan = proto_plan + .try_into_physical_plan(&ctx, &ctx.runtime_env(), &codec) + .map_err(DataFusionError::from)?; + + Ok(plan) } /// Iterate down an ExecutionPlan and set the input objects for RayShuffleReaderExec. @@ -220,7 +219,7 @@ fn _set_inputs_for_ray_shuffle_reader( /// write the results to disk, except for the final query stage, which will return the data. /// inputs is a list of tuples of (stage_id, partition_id, bytes) for each input partition. fn _execute_partition( - plan: PyExecutionPlan, + plan: Arc, part: usize, inputs: PyObject, ) -> Result> { @@ -233,19 +232,19 @@ fn _execute_partition( HashMap::new(), Arc::new(RuntimeEnv::default()), )); + Python::with_gil(|py| { let input_partitions = inputs - .bind(py) - .downcast::() + .downcast_bound::(py) .map_err(|e| DataFusionError::Execution(format!("{}", e)))?; - _set_inputs_for_ray_shuffle_reader(plan.plan.clone(), input_partitions) + _set_inputs_for_ray_shuffle_reader(plan.clone(), input_partitions) })?; // create a Tokio runtime to run the async code let rt = Runtime::new().unwrap(); let fut: JoinHandle>> = rt.spawn(async move { - let mut stream = plan.plan.execute(part, ctx)?; + let mut stream = plan.execute(part, ctx)?; let mut results = vec![]; while let Some(result) = stream.next().await { results.push(result?); diff --git a/src/lib.rs b/src/lib.rs index 93d3f8d..073b2a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,14 +20,13 @@ extern crate core; use pyo3::prelude::*; mod proto; -use crate::context::{deserialize_execution_plan, execute_partition, serialize_execution_plan}; +use crate::context::execute_partition; pub use proto::generated::protobuf; pub mod context; pub mod planner; pub mod query_stage; pub mod shuffle; -pub mod utils; /// A Python module implemented in Rust. #[pymodule] @@ -37,7 +36,5 @@ fn _datafusion_ray_internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_function(wrap_pyfunction!(execute_partition, m)?)?; - m.add_function(wrap_pyfunction!(serialize_execution_plan, m)?)?; - m.add_function(wrap_pyfunction!(deserialize_execution_plan, m)?)?; Ok(()) } diff --git a/src/planner.rs b/src/planner.rs index 93b04a6..1790a8c 100644 --- a/src/planner.rs +++ b/src/planner.rs @@ -352,10 +352,8 @@ mod test { async fn do_test(n: u8) -> TestResult<()> { let tpch_path_env_var = "TPCH_DATA_PATH"; - let data_path = env::var(tpch_path_env_var).expect(&format!( - "Environment variable {} not found", - tpch_path_env_var - )); + let data_path = env::var(tpch_path_env_var) + .unwrap_or_else(|_| panic!("Environment variable {} not found", tpch_path_env_var)); let file = format!("testdata/queries/q{n}.sql"); let sql = fs::read_to_string(&file)?; diff --git a/src/proto/datafusion_ray.proto b/src/proto/datafusion_ray.proto index 6a71005..ecb43d3 100644 --- a/src/proto/datafusion_ray.proto +++ b/src/proto/datafusion_ray.proto @@ -6,6 +6,7 @@ option java_multiple_files = true; option java_package = "datafusion_ray.protobuf"; option java_outer_classname = "RaySqlProto"; +import "datafusion_common.proto"; import "datafusion.proto"; message RaySqlExecNode { @@ -19,7 +20,7 @@ message RayShuffleReaderExecNode { // stage to read from uint32 stage_id = 1; // schema of the shuffle stage - datafusion.Schema schema = 2; + datafusion_common.Schema schema = 2; // this must match the output partitioning of the writer we are reading from datafusion.PhysicalHashRepartition partitioning = 3; } diff --git a/src/proto/generated/protobuf.rs b/src/proto/generated/protobuf.rs index c31d7ac..9437646 100644 --- a/src/proto/generated/protobuf.rs +++ b/src/proto/generated/protobuf.rs @@ -1,7 +1,7 @@ #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RaySqlExecNode { - #[prost(oneof = "ray_sql_exec_node::PlanType", tags = "1, 2, 3, 4")] + #[prost(oneof = "ray_sql_exec_node::PlanType", tags = "3, 4")] pub plan_type: ::core::option::Option, } /// Nested message and enum types in `RaySqlExecNode`. @@ -9,10 +9,6 @@ pub mod ray_sql_exec_node { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] pub enum PlanType { - #[prost(message, tag = "1")] - ShuffleReader(super::ShuffleReaderExecNode), - #[prost(message, tag = "2")] - ShuffleWriter(super::ShuffleWriterExecNode), #[prost(message, tag = "3")] RayShuffleReader(super::RayShuffleReaderExecNode), #[prost(message, tag = "4")] @@ -21,42 +17,6 @@ pub mod ray_sql_exec_node { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ShuffleReaderExecNode { - /// stage to read from - #[prost(uint32, tag = "1")] - pub stage_id: u32, - /// schema of the shuffle stage - #[prost(message, optional, tag = "2")] - pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>, - /// this must match the output partitioning of the writer we are reading from - #[prost(message, optional, tag = "3")] - pub partitioning: ::core::option::Option< - ::datafusion_proto::protobuf::PhysicalHashRepartition, - >, - /// directory for shuffle files - #[prost(string, tag = "4")] - pub shuffle_dir: ::prost::alloc::string::String, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ShuffleWriterExecNode { - /// stage that is writing the shuffle files - #[prost(uint32, tag = "1")] - pub stage_id: u32, - /// plan to execute - #[prost(message, optional, tag = "2")] - pub plan: ::core::option::Option<::datafusion_proto::protobuf::PhysicalPlanNode>, - /// output partitioning schema - #[prost(message, optional, tag = "3")] - pub partitioning: ::core::option::Option< - ::datafusion_proto::protobuf::PhysicalHashRepartition, - >, - /// directory for shuffle files - #[prost(string, tag = "4")] - pub shuffle_dir: ::prost::alloc::string::String, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct RayShuffleReaderExecNode { /// stage to read from #[prost(uint32, tag = "1")] diff --git a/src/query_stage.rs b/src/query_stage.rs index bb9f375..9af81f3 100644 --- a/src/query_stage.rs +++ b/src/query_stage.rs @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. +use crate::context::serialize_execution_plan; use crate::shuffle::{RayShuffleReaderExec, ShuffleCodec}; use datafusion::error::Result; use datafusion::physical_plan::{ExecutionPlan, Partitioning}; use datafusion::prelude::SessionContext; use datafusion_proto::bytes::physical_plan_from_bytes_with_extension_codec; -use datafusion_python::physical_plan::PyExecutionPlan; use pyo3::prelude::*; +use pyo3::types::PyBytes; use std::sync::Arc; #[pyclass(name = "QueryStage", module = "datafusion_ray", subclass)] @@ -51,8 +52,8 @@ impl PyQueryStage { self.stage.id } - pub fn get_execution_plan(&self) -> PyExecutionPlan { - PyExecutionPlan::new(self.stage.plan.clone()) + pub fn get_execution_plan_bytes<'py>(&self, py: Python<'py>) -> PyResult> { + serialize_execution_plan(self.stage.plan.clone(), py) } pub fn get_child_stage_ids(&self) -> Vec { diff --git a/src/utils.rs b/src/utils.rs deleted file mode 100644 index 0ddd1fe..0000000 --- a/src/utils.rs +++ /dev/null @@ -1,30 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use pyo3::Python; -use std::future::Future; -use tokio::runtime::Runtime; - -/// Utility to collect rust futures with GIL released -pub(crate) fn wait_for_future(py: Python, f: F) -> F::Output -where - F: Future + Send, - F::Output: Send, -{ - let rt = Runtime::new().unwrap(); - py.allow_threads(|| rt.block_on(f)) -}