Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cpp/deeplake_pg/nd_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,4 +875,23 @@ inline Datum pointer_to_datum(const void* curr_val, Oid attr_typeid, int32_t att
}
}

Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function lacks documentation explaining its purpose, parameters, return value, and when it should be used instead of plain nd::eval. Add a docstring explaining that this function evaluates arrays containing NULL values by replacing them with default-constructed values of type T.

Suggested change
/**
* Evaluates a dynamic nd::array that may contain NULL (none) values by
* replacing those NULLs with default-constructed values of type T.
*
* This helper first attempts to evaluate the input array with nd::eval. If
* that succeeds, the result is returned directly. If nd::eval throws
* nd::invalid_dynamic_eval (for example, because the presence of none values
* prevents dynamic evaluation), the array is traversed and any element that
* is none is replaced with nd::adapt(T()), after which nd::eval is applied
* to the reconstructed array.
*
* Use this function instead of calling nd::eval directly when the input
* array may legitimately contain NULL values and you want those values to
* be treated as default-constructed instances of T during evaluation.
*
* @tparam T The value type used to replace NULL (none) elements.
* @param arr A dynamic nd::array that may contain none elements.
* @return The result of evaluating the array after substituting none
* elements with default-constructed values of type T when needed.
*/

Copilot uses AI. Check for mistakes.
template <typename T>
nd::array eval_with_nones(nd::array arr)
{
try {
return nd::eval(arr);
} catch (const nd::invalid_dynamic_eval&) {
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty catch block silently swallows the exception without any logging or documentation explaining why this exception is expected and safe to ignore. Consider adding a comment explaining the expected condition or logging at debug level.

Suggested change
} catch (const nd::invalid_dynamic_eval&) {
} catch (const nd::invalid_dynamic_eval&) {
// nd::eval failed due to an invalid dynamic evaluation (e.g. incompatible shapes).
// This is expected for some inputs; fall back to element-wise evaluation with None handling below.

Copilot uses AI. Check for mistakes.
}
std::vector<nd::array> result_elements;
result_elements.reserve(arr.size());
for (auto a : arr) {
if (a.is_none()) {
result_elements.push_back(nd::adapt(T()));
} else {
result_elements.push_back(std::move(a));
}
}
return nd::eval(nd::dynamic(std::move(result_elements)));
}

} // namespace pg::utils
4 changes: 2 additions & 2 deletions cpp/deeplake_pg/table_data_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ inline T table_data::streamer_info::value(int32_t column_number, int64_t row_num
std::scoped_lock lock(column_to_batches[column_number].mutex);
for (auto i = 0; i <= batch_index; ++i) {
if (!batches[i].initialized_.load(std::memory_order_relaxed)) {
batches[i].owner_ = nd::eval(streamers[column_number]->next_batch());
batches[i].owner_ = utils::eval_with_nones<T>(streamers[column_number]->next_batch());
batches[i].data_ = batches[i].owner_.data().data();
batches[i].initialized_.store(true, std::memory_order_release);
}
Expand All @@ -607,7 +607,7 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6
std::scoped_lock lock(column_to_batches[column_number].mutex);
for (auto i = 0; i <= batch_index; ++i) {
if (!batches[i].initialized_.load(std::memory_order_relaxed)) {
batches[i].owner_ = nd::eval(streamers[column_number]->next_batch());
batches[i].owner_ = utils::eval_with_nones<T>(streamers[column_number]->next_batch());
batches[i].data_ = batches[i].owner_.data().data();
batches[i].initialized_.store(true, std::memory_order_release);
}
Expand Down
30 changes: 29 additions & 1 deletion cpp/deeplake_pg/table_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,33 @@ std::pair<std::string, std::string> split_table_name(const std::string& full_nam
return {full_name.substr(0, dot_pos), full_name.substr(dot_pos + 1)};
}

// Helper function to get default value for NULL numeric/scalar columns
nd::array get_default_value_for_null(Oid base_typeid)
{
switch (base_typeid) {
case INT2OID:
return nd::adapt(static_cast<int16_t>(0));
case INT4OID:
case DATEOID:
return nd::adapt(static_cast<int32_t>(0));
case TIMEOID:
case TIMESTAMPOID:
case TIMESTAMPTZOID:
case INT8OID:
return nd::adapt(static_cast<int64_t>(0));
case FLOAT4OID:
return nd::adapt(static_cast<float>(0.0));
case NUMERICOID:
case FLOAT8OID:
return nd::adapt(static_cast<double>(0.0));
case BOOLOID:
return nd::adapt(false);
default:
// For non-numeric types, use nd::none
return nd::none(nd::dtype::unknown, 0);
}
}

void convert_pg_to_nd(const pg::table_data& table_data,
const std::vector<Datum>& values,
const std::vector<uint8_t>& nulls,
Expand All @@ -84,7 +111,8 @@ void convert_pg_to_nd(const pg::table_data& table_data,
const auto column_name = table_data.get_atttypename(i);
// Skip if column is not in the input tuple
if (i >= t_len || nulls[i] == 1) {
row[column_name] = nd::none(nd::dtype::unknown, 0);
// For numeric/scalar columns with NULL value, assign default (0) value
row[column_name] = ::get_default_value_for_null(table_data.get_base_atttypid(i));
continue;
}
row[column_name] =
Expand Down
278 changes: 278 additions & 0 deletions postgres/tests/py_tests/test_add_numeric_columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
"""
Test adding multiple NUMERIC columns to a deeplake table and updating them.
This test verifies that:
- Creating a table with SERIAL PRIMARY KEY and TEXT columns works
- Adding NUMERIC columns dynamically works correctly
- Inserting rows with NULL numeric values works (stored as 0 in deeplake)
- Querying after multiple schema changes works
- Multiple NUMERIC columns can be added sequentially
- UPDATE operations work correctly on numeric columns
- Multi-column updates work correctly
"""
import pytest
import asyncpg
from test_utils.assertions import Assertions


@pytest.mark.asyncio
async def test_add_multiple_numeric_columns_with_null(db_conn: asyncpg.Connection):
"""
Test adding multiple NUMERIC columns, inserting, and updating values.
Tests:
- Create table with SERIAL PRIMARY KEY and TEXT column using deeplake
- Query with ORDER BY and LIMIT/OFFSET
- Add first NUMERIC column via ALTER TABLE
- Verify column is added and query works
- Insert row with empty string and NULL NUMERIC value
- Add second NUMERIC column via ALTER TABLE
- Verify all columns are present and NULL numeric values are stored as 0
- UPDATE single row with specific numeric values
- UPDATE multiple rows in batch
- UPDATE single column while leaving others unchanged
- UPDATE multiple columns including TEXT and NUMERIC together
- Verify final state of all rows after updates
"""
assertions = Assertions(db_conn)

try:
# Create table with id (SERIAL) and name (TEXT) columns using deeplake
await db_conn.execute("""
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL
) USING deeplake
""")

# Query on users table (should be empty initially)
rows = await db_conn.fetch("""
SELECT * FROM (SELECT * FROM users ORDER BY "id") sub
LIMIT 20 OFFSET 0
""")
assert len(rows) == 0, f"Expected 0 rows initially, got {len(rows)}"

# Add first numeric column to users
await db_conn.execute("""
ALTER TABLE users ADD COLUMN "uu" NUMERIC
""")

# Verify column was added to PostgreSQL catalog
column_info = await db_conn.fetch("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'users'
ORDER BY ordinal_position
""")
column_names = [col['column_name'] for col in column_info]
assert 'uu' in column_names, \
f"Column 'uu' should exist in catalog. Found: {column_names}"

# Query users table after first schema change (still empty)
rows = await db_conn.fetch("""
SELECT * FROM (SELECT * FROM users ORDER BY "id") sub
LIMIT 20 OFFSET 0
""")
assert len(rows) == 0, f"Expected 0 rows after adding column, got {len(rows)}"

# Insert row with empty name and NULL UUID (numeric column)
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment incorrectly refers to 'UUID' when the column is actually NUMERIC. The comment should say 'NULL numeric value' instead of 'NULL UUID (numeric column)'.

Suggested change
# Insert row with empty name and NULL UUID (numeric column)
# Insert row with empty name and NULL numeric value

Copilot uses AI. Check for mistakes.
await db_conn.execute("""
INSERT INTO users ("name", "uu") VALUES ('', NULL)
""")

# Query users table to verify insert
rows = await db_conn.fetch("""
SELECT * FROM (SELECT * FROM users ORDER BY "id") sub
LIMIT 20 OFFSET 0
""")
assert len(rows) == 1, f"Expected 1 row after insert, got {len(rows)}"

# Verify first row values
row = rows[0]
assert row['id'] == 1, f"Expected id=1, got {row['id']}"
assert row['name'] == '', f"Expected empty string, got '{row['name']}'"
# Numeric NULL values are stored as 0 in deeplake
assert row['uu'] == 0, f"Expected uu=0 (NULL stored as 0), got {row['uu']}"

# Insert another row with same values
await db_conn.execute("""
INSERT INTO users ("name", "uu") VALUES ('', NULL)
""")

# Query to verify both rows
rows = await db_conn.fetch("""
SELECT * FROM (SELECT * FROM users ORDER BY "id") sub
LIMIT 20 OFFSET 0
""")
assert len(rows) == 2, f"Expected 2 rows after second insert, got {len(rows)}"

# Add second numeric column
await db_conn.execute("""
ALTER TABLE users ADD COLUMN "uu23" NUMERIC
""")

# Verify second column was added
column_info = await db_conn.fetch("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'users'
ORDER BY ordinal_position
""")
column_names = [col['column_name'] for col in column_info]
assert 'uu23' in column_names, \
f"Column 'uu23' should exist in catalog. Found: {column_names}"
assert column_names == ['id', 'name', 'uu', 'uu23'], \
f"Expected ['id', 'name', 'uu', 'uu23'], got {column_names}"

# Query users table after second schema change
rows = await db_conn.fetch("SELECT * FROM users ORDER BY id")
assert len(rows) == 2, f"Expected 2 rows after adding uu23, got {len(rows)}"

# Verify both rows have all columns with correct values
for i, row in enumerate(rows, start=1):
assert row['id'] == i, f"Expected id={i}, got {row['id']}"
assert row['name'] == '', f"Expected empty string, got '{row['name']}'"
assert row['uu'] == 0, f"Expected uu=0, got {row['uu']}"
# New column should be 0 for existing rows
assert row['uu23'] == 0, f"Expected uu23=0 (NULL for new column), got {row['uu23']}"

# Verify we can insert with the new column
await db_conn.execute("""
INSERT INTO users ("name", "uu", "uu23") VALUES ('test', 5, 10)
""")

# Verify the new row
new_row = await db_conn.fetchrow("""
SELECT * FROM users WHERE name = 'test'
""")
assert new_row is not None, "Expected to find row with name='test'"
assert new_row['name'] == 'test', f"Expected 'test', got '{new_row['name']}'"
assert new_row['uu'] == 5, f"Expected uu=5, got {new_row['uu']}"
assert new_row['uu23'] == 10, f"Expected uu23=10, got {new_row['uu23']}"

# Verify total row count before updates
await assertions.assert_table_row_count(3, "users")

# Test UPDATE operations on numeric columns
# Update single row - set both numeric columns to specific values
await db_conn.execute("""
UPDATE users SET "uu" = 100, "uu23" = 200 WHERE id = 1
""")

# Verify the update
updated_row = await db_conn.fetchrow("""
SELECT * FROM users WHERE id = 1
""")
assert updated_row is not None, "Expected to find row with id=1"
assert updated_row['uu'] == 100, f"Expected uu=100 after update, got {updated_row['uu']}"
assert updated_row['uu23'] == 200, f"Expected uu23=200 after update, got {updated_row['uu23']}"
assert updated_row['name'] == '', f"Name should remain unchanged, got '{updated_row['name']}'"

# Update multiple rows at once
await db_conn.execute("""
UPDATE users SET "uu" = 50 WHERE id IN (2, 3)
""")

# Verify the batch update
batch_updated = await db_conn.fetch("""
SELECT * FROM users WHERE id IN (2, 3) ORDER BY id
""")
assert len(batch_updated) == 2, f"Expected 2 updated rows, got {len(batch_updated)}"
for row in batch_updated:
assert row['uu'] == 50, f"Expected uu=50 for id={row['id']}, got {row['uu']}"

# Update single column while leaving other unchanged
await db_conn.execute("""
UPDATE users SET "uu23" = 999 WHERE id = 2
""")

# Verify partial column update
partial_updated = await db_conn.fetchrow("""
SELECT * FROM users WHERE id = 2
""")
assert partial_updated['uu'] == 50, \
f"uu should remain 50 after partial update, got {partial_updated['uu']}"
assert partial_updated['uu23'] == 999, \
f"Expected uu23=999 after partial update, got {partial_updated['uu23']}"

# Update another numeric value for row 3
await db_conn.execute("""
UPDATE users SET "uu23" = 333 WHERE id = 3
""")

# Verify the update
row3_updated = await db_conn.fetchrow("""
SELECT * FROM users WHERE id = 3
""")
assert row3_updated['uu'] == 50, \
f"Expected uu=50 (unchanged), got {row3_updated['uu']}"
assert row3_updated['uu23'] == 333, \
f"Expected uu23=333 after update, got {row3_updated['uu23']}"
assert row3_updated['name'] == 'test', \
f"Name should remain 'test', got '{row3_updated['name']}'"

# Update name column along with numeric columns
await db_conn.execute("""
UPDATE users SET "name" = 'updated', "uu" = 777, "uu23" = 888 WHERE id = 1
""")

# Verify multi-column update
multi_updated = await db_conn.fetchrow("""
SELECT * FROM users WHERE id = 1
""")
assert multi_updated['name'] == 'updated', \
f"Expected name='updated', got '{multi_updated['name']}'"
assert multi_updated['uu'] == 777, \
f"Expected uu=777 after multi-update, got {multi_updated['uu']}"
assert multi_updated['uu23'] == 888, \
f"Expected uu23=888 after multi-update, got {multi_updated['uu23']}"

# Final validation: query all rows and verify final state
final_rows = await db_conn.fetch("SELECT * FROM users ORDER BY id")
assert len(final_rows) == 3, f"Expected 3 rows in final state, got {len(final_rows)}"

# Verify row 1 (id=1): updated name and both numeric columns
assert final_rows[0]['id'] == 1
assert final_rows[0]['name'] == 'updated'
assert final_rows[0]['uu'] == 777
assert final_rows[0]['uu23'] == 888

# Verify row 2 (id=2): empty name, uu=50, uu23=999
assert final_rows[1]['id'] == 2
assert final_rows[1]['name'] == ''
assert final_rows[1]['uu'] == 50
assert final_rows[1]['uu23'] == 999

# Verify row 3 (id=3): name='test', uu=50, uu23=333
assert final_rows[2]['id'] == 3
assert final_rows[2]['name'] == 'test'
assert final_rows[2]['uu'] == 50
assert final_rows[2]['uu23'] == 333

print("✓ Test passed: Adding multiple NUMERIC columns and inserting NULL values works correctly")

finally:
# Cleanup
try:
await db_conn.execute("DROP TABLE IF EXISTS users CASCADE")
except:
pass # Connection may be dead after errors


@pytest.mark.asyncio
async def test_update_numeric_column_with_null(db_conn: asyncpg.Connection):
"""Test UPDATE with NULL values on NUMERIC columns."""
try:
await db_conn.execute("""
CREATE TABLE test_null (id SERIAL, value NUMERIC) USING deeplake
""")

await db_conn.execute("INSERT INTO test_null (value) VALUES (100)")

await db_conn.execute("UPDATE test_null SET value = NULL WHERE id = 1")

row = await db_conn.fetchrow("SELECT * FROM test_null WHERE id = 1")
assert row['value'] == 0, f"Expected 0 after NULL update, got {row['value']}"

finally:
await db_conn.execute("DROP TABLE IF EXISTS test_null CASCADE")