|
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
17 | 17 |
|
| 18 | +use crate::execution::operators::ExecutionError; |
18 | 19 | use arrow::{ |
19 | 20 | array::{cast::AsArray, types::Int32Type, Array, ArrayRef}, |
20 | 21 | compute::{cast_with_options, take, CastOptions}, |
21 | 22 | util::display::FormatOptions, |
22 | 23 | }; |
23 | 24 | use arrow_array::{DictionaryArray, StructArray}; |
24 | 25 | use arrow_schema::DataType; |
| 26 | +use datafusion::prelude::SessionContext; |
25 | 27 | use datafusion_comet_spark_expr::utils::array_with_timezone; |
26 | 28 | use datafusion_comet_spark_expr::EvalMode; |
27 | 29 | use datafusion_common::{Result as DataFusionResult, ScalarValue}; |
| 30 | +use datafusion_execution::object_store::ObjectStoreUrl; |
28 | 31 | use datafusion_expr::ColumnarValue; |
29 | 32 | use std::collections::HashMap; |
30 | 33 | use std::{fmt::Debug, hash::Hash, sync::Arc}; |
@@ -195,3 +198,39 @@ fn cast_struct_to_struct( |
195 | 198 | _ => unreachable!(), |
196 | 199 | } |
197 | 200 | } |
| 201 | + |
| 202 | +// Default object store which is local filesystem |
| 203 | +#[cfg(not(feature = "hdfs"))] |
| 204 | +pub(crate) fn register_object_store( |
| 205 | + session_context: Arc<SessionContext>, |
| 206 | +) -> Result<ObjectStoreUrl, ExecutionError> { |
| 207 | + let object_store = object_store::local::LocalFileSystem::new(); |
| 208 | + let url = ObjectStoreUrl::parse("file://")?; |
| 209 | + session_context |
| 210 | + .runtime_env() |
| 211 | + .register_object_store(url.as_ref(), Arc::new(object_store)); |
| 212 | + Ok(url) |
| 213 | +} |
| 214 | + |
| 215 | +// HDFS object store |
| 216 | +#[cfg(feature = "hdfs")] |
| 217 | +pub(crate) fn register_object_store( |
| 218 | + session_context: Arc<SessionContext>, |
| 219 | +) -> Result<ObjectStoreUrl, ExecutionError> { |
| 220 | + // TODO: read the namenode configuration from file schema or from spark.defaultFS |
| 221 | + let url = ObjectStoreUrl::parse("hdfs://namenode:9000")?; |
| 222 | + if let Some(object_store) = |
| 223 | + datafusion_comet_objectstore_hdfs::object_store::hdfs::HadoopFileSystem::new(url.as_ref()) |
| 224 | + { |
| 225 | + session_context |
| 226 | + .runtime_env() |
| 227 | + .register_object_store(url.as_ref(), Arc::new(object_store)); |
| 228 | + |
| 229 | + return Ok(url); |
| 230 | + } |
| 231 | + |
| 232 | + Err(ExecutionError::GeneralError(format!( |
| 233 | + "HDFS object store cannot be created for {}", |
| 234 | + url |
| 235 | + ))) |
| 236 | +} |
0 commit comments