Skip to content

Commit aea6df3

Browse files
authored
Another round of fixes for null columns handling (#3103)
* Handle dynamic eval exceptions in pg layer. * Fixed update with none in numeric columns.
1 parent 7a7876d commit aea6df3

File tree

4 files changed

+328
-3
lines changed

4 files changed

+328
-3
lines changed

cpp/deeplake_pg/nd_utils.hpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,4 +875,23 @@ inline Datum pointer_to_datum(const void* curr_val, Oid attr_typeid, int32_t att
875875
}
876876
}
877877

878+
template <typename T>
879+
nd::array eval_with_nones(nd::array arr)
880+
{
881+
try {
882+
return nd::eval(arr);
883+
} catch (const nd::invalid_dynamic_eval&) {
884+
}
885+
std::vector<nd::array> result_elements;
886+
result_elements.reserve(arr.size());
887+
for (auto a : arr) {
888+
if (a.is_none()) {
889+
result_elements.push_back(nd::adapt(T()));
890+
} else {
891+
result_elements.push_back(std::move(a));
892+
}
893+
}
894+
return nd::eval(nd::dynamic(std::move(result_elements)));
895+
}
896+
878897
} // namespace pg::utils

cpp/deeplake_pg/table_data_impl.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ inline T table_data::streamer_info::value(int32_t column_number, int64_t row_num
585585
std::scoped_lock lock(column_to_batches[column_number].mutex);
586586
for (auto i = 0; i <= batch_index; ++i) {
587587
if (!batches[i].initialized_.load(std::memory_order_relaxed)) {
588-
batches[i].owner_ = nd::eval(streamers[column_number]->next_batch());
588+
batches[i].owner_ = utils::eval_with_nones<T>(streamers[column_number]->next_batch());
589589
batches[i].data_ = batches[i].owner_.data().data();
590590
batches[i].initialized_.store(true, std::memory_order_release);
591591
}
@@ -607,7 +607,7 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6
607607
std::scoped_lock lock(column_to_batches[column_number].mutex);
608608
for (auto i = 0; i <= batch_index; ++i) {
609609
if (!batches[i].initialized_.load(std::memory_order_relaxed)) {
610-
batches[i].owner_ = nd::eval(streamers[column_number]->next_batch());
610+
batches[i].owner_ = utils::eval_with_nones<T>(streamers[column_number]->next_batch());
611611
batches[i].data_ = batches[i].owner_.data().data();
612612
batches[i].initialized_.store(true, std::memory_order_release);
613613
}

cpp/deeplake_pg/table_storage.cpp

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,33 @@ std::pair<std::string, std::string> split_table_name(const std::string& full_nam
6363
return {full_name.substr(0, dot_pos), full_name.substr(dot_pos + 1)};
6464
}
6565

66+
// Helper function to get default value for NULL numeric/scalar columns
67+
nd::array get_default_value_for_null(Oid base_typeid)
68+
{
69+
switch (base_typeid) {
70+
case INT2OID:
71+
return nd::adapt(static_cast<int16_t>(0));
72+
case INT4OID:
73+
case DATEOID:
74+
return nd::adapt(static_cast<int32_t>(0));
75+
case TIMEOID:
76+
case TIMESTAMPOID:
77+
case TIMESTAMPTZOID:
78+
case INT8OID:
79+
return nd::adapt(static_cast<int64_t>(0));
80+
case FLOAT4OID:
81+
return nd::adapt(static_cast<float>(0.0));
82+
case NUMERICOID:
83+
case FLOAT8OID:
84+
return nd::adapt(static_cast<double>(0.0));
85+
case BOOLOID:
86+
return nd::adapt(false);
87+
default:
88+
// For non-numeric types, use nd::none
89+
return nd::none(nd::dtype::unknown, 0);
90+
}
91+
}
92+
6693
void convert_pg_to_nd(const pg::table_data& table_data,
6794
const std::vector<Datum>& values,
6895
const std::vector<uint8_t>& nulls,
@@ -84,7 +111,8 @@ void convert_pg_to_nd(const pg::table_data& table_data,
84111
const auto column_name = table_data.get_atttypename(i);
85112
// Skip if column is not in the input tuple
86113
if (i >= t_len || nulls[i] == 1) {
87-
row[column_name] = nd::none(nd::dtype::unknown, 0);
114+
// For numeric/scalar columns with NULL value, assign default (0) value
115+
row[column_name] = ::get_default_value_for_null(table_data.get_base_atttypid(i));
88116
continue;
89117
}
90118
row[column_name] =
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
"""
2+
Test adding multiple NUMERIC columns to a deeplake table and updating them.
3+
4+
This test verifies that:
5+
- Creating a table with SERIAL PRIMARY KEY and TEXT columns works
6+
- Adding NUMERIC columns dynamically works correctly
7+
- Inserting rows with NULL numeric values works (stored as 0 in deeplake)
8+
- Querying after multiple schema changes works
9+
- Multiple NUMERIC columns can be added sequentially
10+
- UPDATE operations work correctly on numeric columns
11+
- Multi-column updates work correctly
12+
"""
13+
import pytest
14+
import asyncpg
15+
from test_utils.assertions import Assertions
16+
17+
18+
@pytest.mark.asyncio
19+
async def test_add_multiple_numeric_columns_with_null(db_conn: asyncpg.Connection):
20+
"""
21+
Test adding multiple NUMERIC columns, inserting, and updating values.
22+
23+
Tests:
24+
- Create table with SERIAL PRIMARY KEY and TEXT column using deeplake
25+
- Query with ORDER BY and LIMIT/OFFSET
26+
- Add first NUMERIC column via ALTER TABLE
27+
- Verify column is added and query works
28+
- Insert row with empty string and NULL NUMERIC value
29+
- Add second NUMERIC column via ALTER TABLE
30+
- Verify all columns are present and NULL numeric values are stored as 0
31+
- UPDATE single row with specific numeric values
32+
- UPDATE multiple rows in batch
33+
- UPDATE single column while leaving others unchanged
34+
- UPDATE multiple columns including TEXT and NUMERIC together
35+
- Verify final state of all rows after updates
36+
"""
37+
assertions = Assertions(db_conn)
38+
39+
try:
40+
# Create table with id (SERIAL) and name (TEXT) columns using deeplake
41+
await db_conn.execute("""
42+
CREATE TABLE users (
43+
id SERIAL PRIMARY KEY,
44+
name TEXT NOT NULL
45+
) USING deeplake
46+
""")
47+
48+
# Query on users table (should be empty initially)
49+
rows = await db_conn.fetch("""
50+
SELECT * FROM (SELECT * FROM users ORDER BY "id") sub
51+
LIMIT 20 OFFSET 0
52+
""")
53+
assert len(rows) == 0, f"Expected 0 rows initially, got {len(rows)}"
54+
55+
# Add first numeric column to users
56+
await db_conn.execute("""
57+
ALTER TABLE users ADD COLUMN "uu" NUMERIC
58+
""")
59+
60+
# Verify column was added to PostgreSQL catalog
61+
column_info = await db_conn.fetch("""
62+
SELECT column_name, data_type
63+
FROM information_schema.columns
64+
WHERE table_name = 'users'
65+
ORDER BY ordinal_position
66+
""")
67+
column_names = [col['column_name'] for col in column_info]
68+
assert 'uu' in column_names, \
69+
f"Column 'uu' should exist in catalog. Found: {column_names}"
70+
71+
# Query users table after first schema change (still empty)
72+
rows = await db_conn.fetch("""
73+
SELECT * FROM (SELECT * FROM users ORDER BY "id") sub
74+
LIMIT 20 OFFSET 0
75+
""")
76+
assert len(rows) == 0, f"Expected 0 rows after adding column, got {len(rows)}"
77+
78+
# Insert row with empty name and NULL UUID (numeric column)
79+
await db_conn.execute("""
80+
INSERT INTO users ("name", "uu") VALUES ('', NULL)
81+
""")
82+
83+
# Query users table to verify insert
84+
rows = await db_conn.fetch("""
85+
SELECT * FROM (SELECT * FROM users ORDER BY "id") sub
86+
LIMIT 20 OFFSET 0
87+
""")
88+
assert len(rows) == 1, f"Expected 1 row after insert, got {len(rows)}"
89+
90+
# Verify first row values
91+
row = rows[0]
92+
assert row['id'] == 1, f"Expected id=1, got {row['id']}"
93+
assert row['name'] == '', f"Expected empty string, got '{row['name']}'"
94+
# Numeric NULL values are stored as 0 in deeplake
95+
assert row['uu'] == 0, f"Expected uu=0 (NULL stored as 0), got {row['uu']}"
96+
97+
# Insert another row with same values
98+
await db_conn.execute("""
99+
INSERT INTO users ("name", "uu") VALUES ('', NULL)
100+
""")
101+
102+
# Query to verify both rows
103+
rows = await db_conn.fetch("""
104+
SELECT * FROM (SELECT * FROM users ORDER BY "id") sub
105+
LIMIT 20 OFFSET 0
106+
""")
107+
assert len(rows) == 2, f"Expected 2 rows after second insert, got {len(rows)}"
108+
109+
# Add second numeric column
110+
await db_conn.execute("""
111+
ALTER TABLE users ADD COLUMN "uu23" NUMERIC
112+
""")
113+
114+
# Verify second column was added
115+
column_info = await db_conn.fetch("""
116+
SELECT column_name, data_type
117+
FROM information_schema.columns
118+
WHERE table_name = 'users'
119+
ORDER BY ordinal_position
120+
""")
121+
column_names = [col['column_name'] for col in column_info]
122+
assert 'uu23' in column_names, \
123+
f"Column 'uu23' should exist in catalog. Found: {column_names}"
124+
assert column_names == ['id', 'name', 'uu', 'uu23'], \
125+
f"Expected ['id', 'name', 'uu', 'uu23'], got {column_names}"
126+
127+
# Query users table after second schema change
128+
rows = await db_conn.fetch("SELECT * FROM users ORDER BY id")
129+
assert len(rows) == 2, f"Expected 2 rows after adding uu23, got {len(rows)}"
130+
131+
# Verify both rows have all columns with correct values
132+
for i, row in enumerate(rows, start=1):
133+
assert row['id'] == i, f"Expected id={i}, got {row['id']}"
134+
assert row['name'] == '', f"Expected empty string, got '{row['name']}'"
135+
assert row['uu'] == 0, f"Expected uu=0, got {row['uu']}"
136+
# New column should be 0 for existing rows
137+
assert row['uu23'] == 0, f"Expected uu23=0 (NULL for new column), got {row['uu23']}"
138+
139+
# Verify we can insert with the new column
140+
await db_conn.execute("""
141+
INSERT INTO users ("name", "uu", "uu23") VALUES ('test', 5, 10)
142+
""")
143+
144+
# Verify the new row
145+
new_row = await db_conn.fetchrow("""
146+
SELECT * FROM users WHERE name = 'test'
147+
""")
148+
assert new_row is not None, "Expected to find row with name='test'"
149+
assert new_row['name'] == 'test', f"Expected 'test', got '{new_row['name']}'"
150+
assert new_row['uu'] == 5, f"Expected uu=5, got {new_row['uu']}"
151+
assert new_row['uu23'] == 10, f"Expected uu23=10, got {new_row['uu23']}"
152+
153+
# Verify total row count before updates
154+
await assertions.assert_table_row_count(3, "users")
155+
156+
# Test UPDATE operations on numeric columns
157+
# Update single row - set both numeric columns to specific values
158+
await db_conn.execute("""
159+
UPDATE users SET "uu" = 100, "uu23" = 200 WHERE id = 1
160+
""")
161+
162+
# Verify the update
163+
updated_row = await db_conn.fetchrow("""
164+
SELECT * FROM users WHERE id = 1
165+
""")
166+
assert updated_row is not None, "Expected to find row with id=1"
167+
assert updated_row['uu'] == 100, f"Expected uu=100 after update, got {updated_row['uu']}"
168+
assert updated_row['uu23'] == 200, f"Expected uu23=200 after update, got {updated_row['uu23']}"
169+
assert updated_row['name'] == '', f"Name should remain unchanged, got '{updated_row['name']}'"
170+
171+
# Update multiple rows at once
172+
await db_conn.execute("""
173+
UPDATE users SET "uu" = 50 WHERE id IN (2, 3)
174+
""")
175+
176+
# Verify the batch update
177+
batch_updated = await db_conn.fetch("""
178+
SELECT * FROM users WHERE id IN (2, 3) ORDER BY id
179+
""")
180+
assert len(batch_updated) == 2, f"Expected 2 updated rows, got {len(batch_updated)}"
181+
for row in batch_updated:
182+
assert row['uu'] == 50, f"Expected uu=50 for id={row['id']}, got {row['uu']}"
183+
184+
# Update single column while leaving other unchanged
185+
await db_conn.execute("""
186+
UPDATE users SET "uu23" = 999 WHERE id = 2
187+
""")
188+
189+
# Verify partial column update
190+
partial_updated = await db_conn.fetchrow("""
191+
SELECT * FROM users WHERE id = 2
192+
""")
193+
assert partial_updated['uu'] == 50, \
194+
f"uu should remain 50 after partial update, got {partial_updated['uu']}"
195+
assert partial_updated['uu23'] == 999, \
196+
f"Expected uu23=999 after partial update, got {partial_updated['uu23']}"
197+
198+
# Update another numeric value for row 3
199+
await db_conn.execute("""
200+
UPDATE users SET "uu23" = 333 WHERE id = 3
201+
""")
202+
203+
# Verify the update
204+
row3_updated = await db_conn.fetchrow("""
205+
SELECT * FROM users WHERE id = 3
206+
""")
207+
assert row3_updated['uu'] == 50, \
208+
f"Expected uu=50 (unchanged), got {row3_updated['uu']}"
209+
assert row3_updated['uu23'] == 333, \
210+
f"Expected uu23=333 after update, got {row3_updated['uu23']}"
211+
assert row3_updated['name'] == 'test', \
212+
f"Name should remain 'test', got '{row3_updated['name']}'"
213+
214+
# Update name column along with numeric columns
215+
await db_conn.execute("""
216+
UPDATE users SET "name" = 'updated', "uu" = 777, "uu23" = 888 WHERE id = 1
217+
""")
218+
219+
# Verify multi-column update
220+
multi_updated = await db_conn.fetchrow("""
221+
SELECT * FROM users WHERE id = 1
222+
""")
223+
assert multi_updated['name'] == 'updated', \
224+
f"Expected name='updated', got '{multi_updated['name']}'"
225+
assert multi_updated['uu'] == 777, \
226+
f"Expected uu=777 after multi-update, got {multi_updated['uu']}"
227+
assert multi_updated['uu23'] == 888, \
228+
f"Expected uu23=888 after multi-update, got {multi_updated['uu23']}"
229+
230+
# Final validation: query all rows and verify final state
231+
final_rows = await db_conn.fetch("SELECT * FROM users ORDER BY id")
232+
assert len(final_rows) == 3, f"Expected 3 rows in final state, got {len(final_rows)}"
233+
234+
# Verify row 1 (id=1): updated name and both numeric columns
235+
assert final_rows[0]['id'] == 1
236+
assert final_rows[0]['name'] == 'updated'
237+
assert final_rows[0]['uu'] == 777
238+
assert final_rows[0]['uu23'] == 888
239+
240+
# Verify row 2 (id=2): empty name, uu=50, uu23=999
241+
assert final_rows[1]['id'] == 2
242+
assert final_rows[1]['name'] == ''
243+
assert final_rows[1]['uu'] == 50
244+
assert final_rows[1]['uu23'] == 999
245+
246+
# Verify row 3 (id=3): name='test', uu=50, uu23=333
247+
assert final_rows[2]['id'] == 3
248+
assert final_rows[2]['name'] == 'test'
249+
assert final_rows[2]['uu'] == 50
250+
assert final_rows[2]['uu23'] == 333
251+
252+
print("✓ Test passed: Adding multiple NUMERIC columns and inserting NULL values works correctly")
253+
254+
finally:
255+
# Cleanup
256+
try:
257+
await db_conn.execute("DROP TABLE IF EXISTS users CASCADE")
258+
except:
259+
pass # Connection may be dead after errors
260+
261+
262+
@pytest.mark.asyncio
263+
async def test_update_numeric_column_with_null(db_conn: asyncpg.Connection):
264+
"""Test UPDATE with NULL values on NUMERIC columns."""
265+
try:
266+
await db_conn.execute("""
267+
CREATE TABLE test_null (id SERIAL, value NUMERIC) USING deeplake
268+
""")
269+
270+
await db_conn.execute("INSERT INTO test_null (value) VALUES (100)")
271+
272+
await db_conn.execute("UPDATE test_null SET value = NULL WHERE id = 1")
273+
274+
row = await db_conn.fetchrow("SELECT * FROM test_null WHERE id = 1")
275+
assert row['value'] == 0, f"Expected 0 after NULL update, got {row['value']}"
276+
277+
finally:
278+
await db_conn.execute("DROP TABLE IF EXISTS test_null CASCADE")

0 commit comments

Comments
 (0)