Skip to content

Commit 428839d

Browse files
committed
Resolve Config optimization conflicts for improved read/write concurrency
- Released Config read guard before converting values to Python objects in get and get_all - Ensures locks are held only while collecting scalar entries, not during expensive Python object conversion - Added regression test that runs Config.get_all and Config.set concurrently to guard against read/write contention regressions - Improves overall performance by reducing lock contention in multi-threaded scenarios
1 parent a905154 commit 428839d

File tree

2 files changed

+41
-10
lines changed

2 files changed

+41
-10
lines changed

python/tests/test_concurrency.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,30 @@ def worker(index: int) -> None:
6565
_run_in_threads(worker, count=12)
6666

6767

68+
def test_config_set_during_get_all() -> None:
69+
"""Ensure config writes proceed while another thread reads all entries."""
70+
71+
config = Config()
72+
key = "datafusion.execution.batch_size"
73+
74+
def reader() -> None:
75+
for _ in range(200):
76+
# get_all should not hold the lock while converting to Python objects
77+
config.get_all()
78+
79+
def writer() -> None:
80+
for index in range(200):
81+
config.set(key, str(1024 + index))
82+
83+
with ThreadPoolExecutor(max_workers=2) as executor:
84+
reader_future = executor.submit(reader)
85+
writer_future = executor.submit(writer)
86+
reader_future.result(timeout=10)
87+
writer_future.result(timeout=10)
88+
89+
assert config.get(key) is not None
90+
91+
6892
def test_case_builder_reuse_from_multiple_threads() -> None:
6993
"""Ensure the case builder can be safely reused across threads."""
7094

src/config.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,20 @@ impl PyConfig {
5050

5151
/// Get a configuration option
5252
pub fn get<'py>(&self, key: &str, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
53-
let entries = {
53+
let value = {
5454
let options = self.config.read();
55-
options.entries()
55+
options
56+
.entries()
57+
.iter()
58+
.find(|entry| entry.key == key)
59+
.map(|entry| entry.value.clone())
5660
};
5761

58-
for entry in entries {
59-
if entry.key == key {
60-
return Ok(entry.value.into_pyobject(py)?);
61-
}
62+
if let Some(value) = value {
63+
Ok(value.into_pyobject(py)?)
64+
} else {
65+
Ok(None::<String>.into_pyobject(py)?)
6266
}
63-
Ok(None::<String>.into_pyobject(py)?)
6467
}
6568

6669
/// Set a configuration option
@@ -75,12 +78,16 @@ impl PyConfig {
7578
pub fn get_all(&self, py: Python) -> PyResult<PyObject> {
7679
let entries = {
7780
let options = self.config.read();
78-
options.entries()
81+
options
82+
.entries()
83+
.into_iter()
84+
.map(|entry| (entry.key.to_string(), entry.value.clone()))
85+
.collect::<Vec<_>>()
7986
};
8087

8188
let dict = PyDict::new(py);
82-
for entry in entries {
83-
dict.set_item(entry.key, entry.value.into_pyobject(py)?)?;
89+
for (key, value) in entries {
90+
dict.set_item(key, value.into_pyobject(py)?)?;
8491
}
8592
Ok(dict.into())
8693
}

0 commit comments

Comments
 (0)