Skip to content
7 changes: 5 additions & 2 deletions airbyte/_processors/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ def to_sql_type(
return self.get_string_type()
if isinstance(sql_type, sqlalchemy.types.BIGINT):
return sqlalchemy_types.Integer() # All integers are 64-bit in BigQuery
if isinstance(sql_type, sqlalchemy.types.DECIMAL):
# Convert SQLAlchemy DECIMAL to BigQuery NUMERIC type
# BigQuery NUMERIC type has precision of 38 and scale of 9 by default
return sqlalchemy_types.Numeric(precision=38, scale=9)

return sql_type

Expand Down Expand Up @@ -287,8 +291,7 @@ def _swap_temp_table_with_final_table(
deletion_name = f"{final_table_name}_deleteme"
commands = "\n".join(
[
f"ALTER TABLE {self._fully_qualified(final_table_name)} "
f"RENAME TO {deletion_name};",
f"ALTER TABLE {self._fully_qualified(final_table_name)} RENAME TO {deletion_name};",
f"ALTER TABLE {self._fully_qualified(temp_table_name)} "
f"RENAME TO {final_table_name};",
f"DROP TABLE {self._fully_qualified(deletion_name)};",
Expand Down
57 changes: 57 additions & 0 deletions tests/integration_tests/test_bigquery_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from __future__ import annotations

import pytest
from sqlalchemy import types as sqlalchemy_types

import airbyte as ab
from airbyte._processors.sql.bigquery import BigQueryTypeConverter
from airbyte._util import text_util


@pytest.mark.requires_creds
Expand All @@ -24,3 +27,57 @@ def test_bigquery_props(
assert new_bigquery_cache.get_database_name() == new_bigquery_cache.project_name, (
"Database name should be the same as project name."
)


@pytest.mark.requires_creds
def test_decimal_type_conversion(
new_bigquery_cache: ab.BigQueryCache,
) -> None:
"""Test that DECIMAL(38,9) types are correctly converted to BigQuery NUMERIC types."""
table_name = f"test_decimal_{text_util.generate_random_suffix()}"

try:
# Verify type conversion
converter = BigQueryTypeConverter()
converted_type = converter.to_sql_type({"type": "number", "format": "decimal"})

# Check that the converted type is a NUMERIC type with correct precision and scale
assert isinstance(converted_type, sqlalchemy_types.Numeric), (
"DECIMAL type should be converted to NUMERIC"
)
assert converted_type.precision == 38, "Precision should be 38"
assert converted_type.scale == 9, "Scale should be 9"

# Ensure schema exists before creating table
new_bigquery_cache._ensure_schema_exists()

# Create a test table with a DECIMAL column
sql = f"""
CREATE TABLE {new_bigquery_cache.schema_name}.{table_name} (
id INT64,
amount NUMERIC(38, 9)
)
"""
new_bigquery_cache.execute_sql(sql)

# Insert test data
sql = f"""
INSERT INTO {new_bigquery_cache.schema_name}.{table_name} (id, amount)
VALUES (1, 123.456789)
"""
new_bigquery_cache.execute_sql(sql)

# Verify we can read the data back
sql = f"SELECT amount FROM {new_bigquery_cache.schema_name}.{table_name} WHERE id = 1"
result = new_bigquery_cache.execute_sql(sql).fetchone()
assert result is not None, "Should be able to read NUMERIC data"
assert isinstance(result[0], (float, int, str)), (
"NUMERIC data should be readable"
)

finally:
# Clean up
cleanup_sql = (
f"DROP TABLE IF EXISTS {new_bigquery_cache.schema_name}.{table_name}"
)
new_bigquery_cache.execute_sql(cleanup_sql)