diff --git a/docs/source/user-guide/dataframe/index.rst b/docs/source/user-guide/dataframe/index.rst index 1387db0bd..74e1b7586 100644 --- a/docs/source/user-guide/dataframe/index.rst +++ b/docs/source/user-guide/dataframe/index.rst @@ -228,6 +228,43 @@ Core Classes * :py:meth:`~datafusion.SessionContext.from_pandas` - Create from Pandas DataFrame * :py:meth:`~datafusion.SessionContext.from_arrow` - Create from Arrow data + ``SessionContext`` can automatically resolve SQL table names that match + in-scope Python data objects. When automatic lookup is enabled, a query + such as ``ctx.sql("SELECT * FROM pdf")`` will register a pandas or + PyArrow object named ``pdf`` without calling + :py:meth:`~datafusion.SessionContext.from_pandas` or + :py:meth:`~datafusion.SessionContext.from_arrow` explicitly. This uses + the Arrow PyCapsule Interface, so the corresponding library (``pandas`` + for pandas objects, ``pyarrow`` for Arrow objects) must be installed. + + .. code-block:: python + + import pandas as pd + import pyarrow as pa + from datafusion import SessionContext + + ctx = SessionContext(auto_register_python_objects=True) + + # pandas dataframe - requires pandas to be installed + pdf = pd.DataFrame({"value": [1, 2, 3]}) + + # or pyarrow object - requires pyarrow to be installed + arrow_table = pa.table({"value": [1, 2, 3]}) + + # If automatic registration is enabled, then we can query these objects directly + df = ctx.sql("SELECT SUM(value) AS total FROM pdf") + # or + df = ctx.sql("SELECT SUM(value) AS total FROM arrow_table") + + # without calling ctx.from_pandas() or ctx.from_arrow() explicitly + + Automatic lookup is disabled by default. Enable it by passing + ``auto_register_python_objects=True`` when constructing the session or by + configuring :py:class:`~datafusion.SessionConfig` with + :py:meth:`~datafusion.SessionConfig.with_python_table_lookup`. Use + :py:meth:`~datafusion.SessionContext.set_python_table_lookup` to toggle the + behaviour at runtime. + See: :py:class:`datafusion.SessionContext` Expression Classes diff --git a/docs/source/user-guide/sql.rst b/docs/source/user-guide/sql.rst index 6fa7f0c6a..d11f277bd 100644 --- a/docs/source/user-guide/sql.rst +++ b/docs/source/user-guide/sql.rst @@ -36,4 +36,29 @@ DataFusion also offers a SQL API, read the full reference `here 2") + print(result.to_pandas()) + +The feature inspects the call stack for variables whose names match missing +tables and registers them if they expose Arrow data (including pandas and +Polars DataFrames). Existing contexts can enable or disable the behavior at +runtime through :py:meth:`SessionContext.set_python_table_lookup` or by passing +``auto_register_python_objects`` when constructing the session. diff --git a/python/datafusion/context.py b/python/datafusion/context.py index b6e728b51..5d8440bd4 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -19,7 +19,10 @@ from __future__ import annotations +import inspect +import re import warnings +import weakref from typing import TYPE_CHECKING, Any, Protocol try: @@ -101,6 +104,7 @@ def __init__(self, config_options: dict[str, str] | None = None) -> None: config_options: Configuration options. """ self.config_internal = SessionConfigInternal(config_options) + self._python_table_lookup = False def with_create_default_catalog_and_schema( self, enabled: bool = True @@ -270,6 +274,11 @@ def with_parquet_pruning(self, enabled: bool = True) -> SessionConfig: self.config_internal = self.config_internal.with_parquet_pruning(enabled) return self + def with_python_table_lookup(self, enabled: bool = True) -> SessionConfig: + """Enable implicit table lookup for Python objects when running SQL.""" + self._python_table_lookup = enabled + return self + def set(self, key: str, value: str) -> SessionConfig: """Set a configuration option. @@ -483,6 +492,8 @@ def __init__( self, config: SessionConfig | None = None, runtime: RuntimeEnvBuilder | None = None, + *, + auto_register_python_objects: bool | None = None, ) -> None: """Main interface for executing queries with DataFusion. @@ -493,6 +504,12 @@ def __init__( Args: config: Session configuration options. runtime: Runtime configuration options. + auto_register_python_objects: Automatically register referenced + Python objects (such as pandas or PyArrow data) when ``sql`` + queries reference them by name. When omitted, this defaults to + the value configured via + :py:meth:`~datafusion.SessionConfig.with_python_table_lookup` + (``False`` unless explicitly enabled). Example usage: @@ -504,10 +521,22 @@ def __init__( ctx = SessionContext() df = ctx.read_csv("data.csv") """ - config = config.config_internal if config is not None else None - runtime = runtime.config_internal if runtime is not None else None + self.ctx = SessionContextInternal( + config.config_internal if config is not None else None, + runtime.config_internal if runtime is not None else None, + ) + + # Determine the final value for python table lookup + if auto_register_python_objects is not None: + auto_python_table_lookup = auto_register_python_objects + else: + # Default to session config value or False if not configured + auto_python_table_lookup = getattr(config, "_python_table_lookup", False) - self.ctx = SessionContextInternal(config, runtime) + self._auto_python_table_lookup = bool(auto_python_table_lookup) + self._python_table_bindings: dict[ + str, tuple[weakref.ReferenceType[Any] | None, int] + ] = {} def __repr__(self) -> str: """Print a string representation of the Session Context.""" @@ -534,8 +563,27 @@ def enable_url_table(self) -> SessionContext: klass = self.__class__ obj = klass.__new__(klass) obj.ctx = self.ctx.enable_url_table() + obj._auto_python_table_lookup = getattr( + self, "_auto_python_table_lookup", False + ) + obj._python_table_bindings = getattr(self, "_python_table_bindings", {}).copy() return obj + def set_python_table_lookup(self, enabled: bool = True) -> SessionContext: + """Enable or disable automatic registration of Python objects in SQL. + + Args: + enabled: When ``True``, SQL queries automatically attempt to + resolve missing table names by looking up Python objects in the + caller's scope. Use ``False`` to require explicit registration + of any referenced tables. + + Returns: + The current :py:class:`SessionContext` instance for chaining. + """ + self._auto_python_table_lookup = enabled + return self + def register_object_store( self, schema: str, store: Any, host: str | None = None ) -> None: @@ -600,9 +648,34 @@ def sql(self, query: str, options: SQLOptions | None = None) -> DataFrame: Returns: DataFrame representation of the SQL query. """ - if options is None: - return DataFrame(self.ctx.sql(query)) - return DataFrame(self.ctx.sql_with_options(query, options.options_internal)) + + def _execute_sql() -> DataFrame: + if options is None: + return DataFrame(self.ctx.sql(query)) + return DataFrame(self.ctx.sql_with_options(query, options.options_internal)) + + auto_lookup_enabled = getattr(self, "_auto_python_table_lookup", False) + + if auto_lookup_enabled: + self._refresh_python_table_bindings() + + while True: + try: + return _execute_sql() + except Exception as err: # noqa: PERF203 + if not auto_lookup_enabled: + raise + + missing_tables = self._extract_missing_table_names(err) + if not missing_tables: + raise + + registered = self._register_python_tables(missing_tables) + if not registered: + raise + + # Retry to allow registering additional tables referenced in the query. + continue def sql_with_options(self, query: str, options: SQLOptions) -> DataFrame: """Create a :py:class:`~datafusion.dataframe.DataFrame` from SQL query text. @@ -619,6 +692,144 @@ def sql_with_options(self, query: str, options: SQLOptions) -> DataFrame: """ return self.sql(query, options) + @staticmethod + def _extract_missing_table_names(err: Exception) -> list[str]: + def _normalize(names: list[Any]) -> list[str]: + tables: list[str] = [] + for raw_name in names: + if not raw_name: + continue + raw_str = str(raw_name) + tables.append(raw_str.rsplit(".", 1)[-1]) + return tables + + missing_tables = getattr(err, "missing_table_names", None) + if missing_tables is not None: + if isinstance(missing_tables, str): + candidates: list[Any] = [missing_tables] + else: + try: + candidates = list(missing_tables) + except TypeError: + candidates = [missing_tables] + + return _normalize(candidates) + + message = str(err) + matches = set() + for pattern in (r"table '([^']+)' not found", r"No table named '([^']+)'"): + matches.update(re.findall(pattern, message)) + + return _normalize(list(matches)) + + def _register_python_tables(self, tables: list[str]) -> bool: + registered_any = False + for table_name in tables: + if not table_name or self.table_exist(table_name): + continue + + python_obj = self._lookup_python_object(table_name) + if python_obj is None: + continue + + if self._register_python_object(table_name, python_obj): + registered_any = True + + return registered_any + + @staticmethod + def _lookup_python_object(name: str) -> Any | None: + frame = inspect.currentframe() + try: + frame = frame.f_back if frame is not None else None + lower_name = name.lower() + + def _match(mapping: dict[str, Any]) -> Any | None: + value = mapping.get(name) + if value is not None: + return value + + for key, candidate in mapping.items(): + if ( + isinstance(key, str) + and key.lower() == lower_name + and candidate is not None + ): + return candidate + + return None + + while frame is not None: + for scope in (frame.f_locals, frame.f_globals): + match = _match(scope) + if match is not None: + return match + frame = frame.f_back + finally: + del frame + return None + + def _refresh_python_table_bindings(self) -> None: + bindings = getattr(self, "_python_table_bindings", {}) + for table_name, (obj_ref, cached_id) in list(bindings.items()): + cached_obj = obj_ref() if obj_ref is not None else None + current_obj = self._lookup_python_object(table_name) + weakref_dead = obj_ref is not None and cached_obj is None + id_mismatch = current_obj is not None and id(current_obj) != cached_id + + if not (weakref_dead or id_mismatch): + continue + + self.deregister_table(table_name) + + if current_obj is None: + bindings.pop(table_name, None) + continue + + if self._register_python_object(table_name, current_obj): + continue + + bindings.pop(table_name, None) + + def _register_python_object(self, name: str, obj: Any) -> bool: + registered = False + + if isinstance(obj, DataFrame): + self.register_view(name, obj) + registered = True + elif isinstance(obj, (pa.Table, pa.RecordBatch, pa.RecordBatchReader)): + self.from_arrow(obj, name=name) + registered = True + else: + exports_arrow_capsule = hasattr(obj, "__arrow_c_stream__") or hasattr( + obj, "__arrow_c_array__" + ) + + if exports_arrow_capsule: + self.from_arrow(obj, name=name) + registered = True + elif ( + obj.__class__.__module__.startswith("polars.") + and obj.__class__.__name__ == "DataFrame" + ): + self.from_polars(obj, name=name) + registered = True + elif ( + obj.__class__.__module__.startswith("pandas.") + and obj.__class__.__name__ == "DataFrame" + ): + self.from_pandas(obj, name=name) + registered = True + + if registered: + try: + reference: weakref.ReferenceType[Any] | None = weakref.ref(obj) + except TypeError: + reference = None + self._python_table_bindings[name] = (reference, id(obj)) + + return registered + def create_dataframe( self, partitions: list[list[pa.RecordBatch]], @@ -756,6 +967,7 @@ def register_table(self, name: str, table: Table) -> None: def deregister_table(self, name: str) -> None: """Remove a table from the session.""" self.ctx.deregister_table(name) + self._python_table_bindings.pop(name, None) def catalog_names(self) -> set[str]: """Returns the list of catalogs in this context.""" diff --git a/python/tests/test_context.py b/python/tests/test_context.py index 6dbcc0d5e..60f8fe132 100644 --- a/python/tests/test_context.py +++ b/python/tests/test_context.py @@ -17,6 +17,7 @@ import datetime as dt import gzip import pathlib +from uuid import uuid4 import pyarrow as pa import pyarrow.dataset as ds @@ -255,6 +256,181 @@ def test_from_pylist(ctx): assert df.collect()[0].num_rows == 3 +def test_sql_missing_table_without_auto_register(ctx): + ctx.set_python_table_lookup(False) + arrow_table = pa.Table.from_pydict({"value": [1, 2, 3]}) # noqa: F841 + + with pytest.raises(Exception, match="not found|No table named") as excinfo: + ctx.sql("SELECT * FROM arrow_table").collect() + + # Test that our extraction method works correctly + missing_tables = ctx._extract_missing_table_names(excinfo.value) + assert "arrow_table" in missing_tables + + +def test_sql_missing_table_exposes_missing_table_names(ctx): + ctx.set_python_table_lookup(False) + + with pytest.raises(Exception) as excinfo: + ctx.sql("SELECT * FROM missing_table").collect() + + missing_tables = getattr(excinfo.value, "missing_table_names", None) + assert missing_tables is not None + normalized = [str(name).rsplit(".", 1)[-1] for name in missing_tables] + assert normalized == ["missing_table"] + + +def test_extract_missing_table_names_from_attribute(): + class MissingTablesError(Exception): + def __init__(self) -> None: + super().__init__("custom error") + self.missing_table_names = ( + "catalog.schema.arrow_table", + "plain_table", + ) + + err = MissingTablesError() + missing_tables = SessionContext._extract_missing_table_names(err) + assert missing_tables == ["arrow_table", "plain_table"] + + +def test_sql_auto_register_arrow_table(): + ctx = SessionContext(auto_register_python_objects=True) + arrow_table = pa.Table.from_pydict({"value": [1, 2, 3]}) # noqa: F841 + + result = ctx.sql( + "SELECT SUM(value) AS total FROM arrow_table", + ).collect() + + assert ctx.table_exist("arrow_table") + assert result[0].column(0).to_pylist()[0] == 6 + + +def test_sql_auto_register_multiple_tables_single_query(): + ctx = SessionContext(auto_register_python_objects=True) + + customers = pa.Table.from_pydict( # noqa: F841 + {"customer_id": [1, 2], "name": ["Alice", "Bob"]} + ) + orders = pa.Table.from_pydict( # noqa: F841 + {"order_id": [100, 200], "customer_id": [1, 2]} + ) + + result = ctx.sql( + """ + SELECT c.customer_id, o.order_id + FROM customers c + JOIN orders o ON c.customer_id = o.customer_id + ORDER BY o.order_id + """ + ).collect() + + actual = pa.Table.from_batches(result) + expected = pa.Table.from_pydict({"customer_id": [1, 2], "order_id": [100, 200]}) + + assert actual.equals(expected) + assert ctx.table_exist("customers") + assert ctx.table_exist("orders") + + +def test_sql_auto_register_arrow_outer_scope(): + ctx = SessionContext() + ctx.set_python_table_lookup(True) + arrow_table = pa.Table.from_pydict({"value": [1, 2, 3, 4]}) # noqa: F841 + + def run_query(): + return ctx.sql( + "SELECT COUNT(*) AS total_rows FROM arrow_table", + ).collect() + + result = run_query() + assert result[0].column(0).to_pylist()[0] == 4 + + +def test_sql_auto_register_skips_none_shadowing(): + ctx = SessionContext(auto_register_python_objects=True) + mytable = pa.Table.from_pydict({"value": [1, 2, 3]}) # noqa: F841 + + def run_query(): + mytable = None # noqa: F841 + return ctx.sql( + "SELECT SUM(value) AS total FROM mytable", + ).collect() + + batches = run_query() + assert batches[0].column(0).to_pylist()[0] == 6 + + +def test_sql_auto_register_case_insensitive_lookup(): + ctx = SessionContext(auto_register_python_objects=True) + MyTable = pa.Table.from_pydict({"value": [2, 3]}) # noqa: N806,F841 + + batches = ctx.sql( + "SELECT SUM(value) AS total FROM mytable", + ).collect() + + assert batches[0].column(0).to_pylist()[0] == 5 + + +def test_sql_auto_register_pandas_dataframe(monkeypatch): + pd = pytest.importorskip("pandas") + + ctx = SessionContext(auto_register_python_objects=True) + pandas_df = pd.DataFrame({"value": [1, 2, 3, 4]}) + + if not ( + hasattr(pandas_df, "__arrow_c_stream__") + or hasattr(pandas_df, "__arrow_c_array__") + ): + pytest.skip("pandas does not expose Arrow capsule export") + + def fail_from_pandas(*args, **kwargs): + msg = "from_pandas should not be called during auto-registration" + raise AssertionError(msg) + + monkeypatch.setattr(SessionContext, "from_pandas", fail_from_pandas) + + result = ctx.sql( + "SELECT AVG(value) AS avg_value FROM pandas_df", + ).collect() + + assert pytest.approx(result[0].column(0).to_pylist()[0]) == 2.5 + + +def test_sql_auto_register_refreshes_reassigned_dataframe(): + pd = pytest.importorskip("pandas") + + ctx = SessionContext(auto_register_python_objects=True) + pandas_df = pd.DataFrame({"value": [1, 2, 3]}) + + first = ctx.sql( + "SELECT SUM(value) AS total FROM pandas_df", + ).collect() + + assert first[0].column(0).to_pylist()[0] == 6 + + pandas_df = pd.DataFrame({"value": [10, 20]}) # noqa: F841 + + second = ctx.sql( + "SELECT SUM(value) AS total FROM pandas_df", + ).collect() + + assert second[0].column(0).to_pylist()[0] == 30 + + +def test_sql_auto_register_polars_dataframe(): + pl = pytest.importorskip("polars") + + ctx = SessionContext(auto_register_python_objects=True) + polars_df = pl.DataFrame({"value": [2, 4, 6]}) # noqa: F841 + + result = ctx.sql( + "SELECT MIN(value) AS min_value FROM polars_df", + ).collect() + + assert result[0].column(0).to_pylist()[0] == 2 + + def test_from_pydict(ctx): # create a dataframe from Python dictionary data = {"a": [1, 2, 3], "b": [4, 5, 6]} @@ -285,6 +461,48 @@ def test_from_pandas(ctx): assert df.collect()[0].num_rows == 3 +def test_sql_from_local_arrow_table(ctx): + ctx.set_python_table_lookup(True) # Enable implicit table lookup + arrow_table = pa.Table.from_pydict({"a": [1, 2], "b": ["x", "y"]}) # noqa: F841 + + result = ctx.sql("SELECT * FROM arrow_table ORDER BY a").collect() + actual = pa.Table.from_batches(result) + expected = pa.Table.from_pydict({"a": [1, 2], "b": ["x", "y"]}) + + assert actual.equals(expected) + + +def test_sql_from_local_pandas_dataframe(ctx): + ctx.set_python_table_lookup(True) # Enable implicit table lookup + pd = pytest.importorskip("pandas") + pandas_df = pd.DataFrame({"a": [3, 1], "b": ["z", "y"]}) # noqa: F841 + + result = ctx.sql("SELECT * FROM pandas_df ORDER BY a").collect() + actual = pa.Table.from_batches(result) + expected = pa.Table.from_pydict({"a": [1, 3], "b": ["y", "z"]}) + + assert actual.equals(expected) + + +def test_sql_from_local_polars_dataframe(ctx): + ctx.set_python_table_lookup(True) # Enable implicit table lookup + pl = pytest.importorskip("polars") + polars_df = pl.DataFrame({"a": [2, 1], "b": ["beta", "alpha"]}) # noqa: F841 + + result = ctx.sql("SELECT * FROM polars_df ORDER BY a").collect() + actual = pa.Table.from_batches(result) + expected = pa.Table.from_pydict({"a": [1, 2], "b": ["alpha", "beta"]}) + + assert actual.equals(expected) + + +def test_sql_from_local_unsupported_object(ctx): + unsupported = object() # noqa: F841 + + with pytest.raises(Exception, match="table 'unsupported' not found"): + ctx.sql("SELECT * FROM unsupported").collect() + + def test_from_polars(ctx): # create a dataframe from Polars dataframe pd = pytest.importorskip("polars") @@ -484,8 +702,6 @@ def test_table_exist(ctx): def test_table_not_found(ctx): - from uuid import uuid4 - with pytest.raises(KeyError): ctx.table(f"not-found-{uuid4()}") @@ -634,6 +850,43 @@ def test_sql_with_options_no_statements(ctx): ctx.sql_with_options(sql, options=options) +def test_session_config_python_table_lookup_enables_auto_registration(): + pd = pytest.importorskip("pandas") + + ctx = SessionContext(config=SessionConfig().with_python_table_lookup(enabled=True)) + pdf = pd.DataFrame({"value": [1, 2, 3]}) + assert len(pdf) == 3 + + batches = ctx.sql("SELECT SUM(value) AS total FROM pdf").collect() + assert batches[0].column(0).to_pylist()[0] == 6 + + +def test_sql_auto_register_arrow(): + ctx = SessionContext(auto_register_python_objects=True) + arrow_table = pa.table({"value": [1, 2, 3, 4]}) + assert arrow_table.num_rows == 4 + + batches = ctx.sql("SELECT COUNT(*) AS cnt FROM arrow_table").collect() + assert batches[0].column(0).to_pylist()[0] == 4 + + +def test_sql_auto_register_disabled(): + pd = pytest.importorskip("pandas") + + ctx = SessionContext() + pdf = pd.DataFrame({"value": [1, 2, 3]}) + assert len(pdf) == 3 + + with pytest.raises(Exception) as excinfo: + ctx.sql("SELECT * FROM pdf").collect() + + assert "not found" in str(excinfo.value) + + ctx.set_python_table_lookup(True) + batches = ctx.sql("SELECT COUNT(*) AS cnt FROM pdf").collect() + assert batches[0].column(0).to_pylist()[0] == 3 + + @pytest.fixture def batch(): return pa.RecordBatch.from_arrays( diff --git a/src/context.rs b/src/context.rs index 36133a33d..9af58ac39 100644 --- a/src/context.rs +++ b/src/context.rs @@ -34,7 +34,7 @@ use pyo3::prelude::*; use crate::catalog::{PyCatalog, PyTable, RustWrappedPyCatalogProvider}; use crate::dataframe::PyDataFrame; use crate::dataset::Dataset; -use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionResult}; +use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError, PyDataFusionResult}; use crate::expr::sort_expr::PySortExpr; use crate::physical_plan::PyExecutionPlan; use crate::record_batch::PyRecordBatchStream; @@ -59,6 +59,7 @@ use datafusion::datasource::listing::{ }; use datafusion::datasource::MemTable; use datafusion::datasource::TableProvider; +use datafusion::error::DataFusionError; use datafusion::execution::context::{ DataFilePaths, SQLOptions, SessionConfig, SessionContext, TaskContext, }; @@ -435,8 +436,11 @@ impl PySessionContext { /// Returns a PyDataFrame whose plan corresponds to the SQL statement. pub fn sql(&mut self, query: &str, py: Python) -> PyDataFusionResult { let result = self.ctx.sql(query); - let df = wait_for_future(py, result)??; - Ok(PyDataFrame::new(df)) + match wait_for_future(py, result) { + Ok(Ok(df)) => Ok(PyDataFrame::new(df)), + Ok(Err(err)) => Err(py_datafusion_error_with_missing_tables(py, err)), + Err(py_err) => Err(PyDataFusionError::PythonError(py_err)), + } } #[pyo3(signature = (query, options=None))] @@ -452,8 +456,11 @@ impl PySessionContext { SQLOptions::new() }; let result = self.ctx.sql_with_options(query, options); - let df = wait_for_future(py, result)??; - Ok(PyDataFrame::new(df)) + match wait_for_future(py, result) { + Ok(Ok(df)) => Ok(PyDataFrame::new(df)), + Ok(Err(err)) => Err(py_datafusion_error_with_missing_tables(py, err)), + Err(py_err) => Err(PyDataFusionError::PythonError(py_err)), + } } #[pyo3(signature = (partitions, name=None, schema=None))] @@ -1188,6 +1195,78 @@ impl PySessionContext { } } +fn py_datafusion_error_with_missing_tables(py: Python, err: DataFusionError) -> PyDataFusionError { + let missing_tables = collect_missing_table_names(&err); + let py_err: PyErr = PyDataFusionError::from(err).into(); + + if !missing_tables.is_empty() { + if let Ok(py_names) = PyList::new(py, &missing_tables) { + let _ = py_err + .value(py) + .setattr("missing_table_names", py_names.into_any()); + } + } + + PyDataFusionError::PythonError(py_err) +} + +fn collect_missing_table_names(err: &DataFusionError) -> Vec { + let mut names = HashSet::new(); + collect_missing_table_names_recursive(err, &mut names); + + let mut collected: Vec = names.into_iter().collect(); + collected.sort(); + collected +} + +fn collect_missing_table_names_recursive(err: &DataFusionError, acc: &mut HashSet) { + match err { + DataFusionError::Plan(message) + | DataFusionError::Execution(message) + | DataFusionError::Configuration(message) + | DataFusionError::NotImplemented(message) + | DataFusionError::ResourcesExhausted(message) + | DataFusionError::Internal(message) => { + parse_missing_table_names_in_message(message, acc); + } + DataFusionError::Context(_, inner) | DataFusionError::Diagnostic(_, inner) => { + collect_missing_table_names_recursive(inner, acc); + } + _ => {} + } +} + +fn parse_missing_table_names_in_message(message: &str, acc: &mut HashSet) { + const LOOKUPS: [(&str, char); 8] = [ + ("table '", '\''), + ("view '", '\''), + ("table \"", '"'), + ("view \"", '"'), + ("table named '", '\''), + ("view named '", '\''), + ("table named \"", '"'), + ("view named \"", '"'), + ]; + + let lower = message.to_ascii_lowercase(); + for (needle, terminator) in LOOKUPS { + let mut search_start = 0usize; + while let Some(relative) = lower[search_start..].find(needle) { + let start = search_start + relative + needle.len(); + let remainder = &message[start..]; + if let Some(end) = remainder.find(terminator) { + let name = &remainder[..end]; + if !name.is_empty() { + acc.insert(name.to_string()); + } + search_start = start + end + 1; + } else { + break; + } + } + } +} + pub fn parse_file_compression_type( file_compression_type: Option, ) -> Result {