Skip to content

Commit ddbf6f4

Browse files
authored
Fix: Resolve issues in Postgres name normalization when names are >63 characters (#359)
1 parent 826d689 commit ddbf6f4

File tree

15 files changed

+173
-74
lines changed

15 files changed

+173
-74
lines changed

airbyte/_executors/python.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from typing import TYPE_CHECKING, Literal
1111

1212
from overrides import overrides
13-
from rich import print
13+
from rich import print # noqa: A004 # Allow shadowing the built-in
1414

1515
from airbyte import exceptions as exc
1616
from airbyte._executors.base import Executor

airbyte/_executors/util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import requests
99
import yaml
1010
from requests import HTTPError
11-
from rich import print
11+
from rich import print # noqa: A004 # Allow shadowing the built-in
1212

1313
from airbyte import exceptions as exc
1414
from airbyte._executors.declarative import DeclarativeExecutor

airbyte/_processors/sql/postgres.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33

44
from __future__ import annotations
55

6+
import functools
7+
68
from overrides import overrides
79

10+
from airbyte._util.name_normalizers import LowerCaseNormalizer
811
from airbyte._writers.jsonl import JsonlWriter
912
from airbyte.secrets.base import SecretString
1013
from airbyte.shared.sql_processor import SqlConfig, SqlProcessorBase
@@ -35,6 +38,24 @@ def get_database_name(self) -> str:
3538
return self.database
3639

3740

41+
class PostgresNormalizer(LowerCaseNormalizer):
42+
"""A name normalizer for Postgres.
43+
44+
Postgres has specific field name length limits:
45+
- Tables names are limited to 63 characters.
46+
- Column names are limited to 63 characters.
47+
48+
The postgres normalizer inherits from the default LowerCaseNormalizer class, and
49+
additionally truncates column and table names to 63 characters.
50+
"""
51+
52+
@staticmethod
53+
@functools.cache
54+
def normalize(name: str) -> str:
55+
"""Normalize the name, truncating to 63 characters."""
56+
return LowerCaseNormalizer.normalize(name)[:63]
57+
58+
3859
class PostgresSqlProcessor(SqlProcessorBase):
3960
"""A Postgres implementation of the cache.
4061
@@ -49,3 +70,6 @@ class PostgresSqlProcessor(SqlProcessorBase):
4970
supports_merge_insert = False
5071
file_writer_class = JsonlWriter
5172
sql_config: PostgresConfig
73+
74+
normalizer = PostgresNormalizer
75+
"""A Postgres-specific name normalizer for table and column name normalization."""

airbyte/_util/temp_files.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def as_temp_files(files_contents: list[dict | str]) -> Generator[list[str], Any,
2323
try:
2424
for content in files_contents:
2525
use_json = isinstance(content, dict)
26-
temp_file = tempfile.NamedTemporaryFile(
26+
temp_file = tempfile.NamedTemporaryFile( # noqa: SIM115 # Avoiding context manager
2727
mode="w+t",
2828
delete=False,
2929
encoding="utf-8",

airbyte/_writers/jsonl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def _open_new_file(
3535
"""Open a new file for writing."""
3636
return cast(
3737
IO[str],
38-
gzip.open(
38+
gzip.open( # noqa: SIM115 # Avoiding context manager
3939
file_path,
4040
mode="wt",
4141
encoding="utf-8",

airbyte/cloud/workspaces.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,10 @@ def _deploy_connection(
215215
source_id: str
216216
if isinstance(source, Source):
217217
selected_streams = selected_streams or source.get_selected_streams()
218-
if source._deployed_source_id: # noqa: SLF001
219-
source_id = source._deployed_source_id # noqa: SLF001
220-
else:
221-
source_id = self._deploy_source(source)
218+
source_id = (
219+
source._deployed_source_id # noqa: SLF001 # Access to non-public API
220+
or self._deploy_source(source)
221+
)
222222
else:
223223
source_id = source
224224
if not selected_streams:

airbyte/secrets/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,6 @@ def disable_secret_source(source: SecretManager | SecretSourceEnum) -> None:
7777
return
7878

7979
# Else, remove by name
80-
for s in _SECRETS_SOURCES:
80+
for s in list(_SECRETS_SOURCES).copy():
8181
if s.name == str(source):
8282
_SECRETS_SOURCES.remove(s)

airbyte/shared/sql_processor.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,20 @@ def _get_temp_table_name(
497497
batch_id: str | None = None, # ULID of the batch
498498
) -> str:
499499
"""Return a new (unique) temporary table name."""
500-
batch_id = batch_id or str(ulid.ULID())
501-
return self.normalizer.normalize(f"{stream_name}_{batch_id}")
500+
if not batch_id:
501+
batch_id = str(ulid.ULID())
502+
503+
# Use the first 6 and last 3 characters of the ULID. This gives great uniqueness while
504+
# limiting the table name suffix to 10 characters, including the underscore.
505+
suffix = (
506+
f"{batch_id[:6]}{batch_id[-3:]}"
507+
if len(batch_id) > 9 # noqa: PLR2004 # Allow magic int value
508+
else batch_id
509+
)
510+
511+
# Note: The normalizer may truncate the table name if the database has a name length limit.
512+
# For instance, the Postgres normalizer will enforce a 63-character limit on table names.
513+
return self.normalizer.normalize(f"{stream_name}_{suffix}")
502514

503515
def _fully_qualified(
504516
self,

airbyte/sources/base.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from typing import TYPE_CHECKING, Any, Literal
1010

1111
import yaml
12-
from rich import print
12+
from rich import print # noqa: A004 # Allow shadowing the built-in
1313
from rich.syntax import Syntax
1414

1515
from airbyte_protocol.models import (
@@ -405,9 +405,25 @@ def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
405405

406406
return found[0].json_schema
407407

408-
def get_records(self, stream: str) -> LazyDataset:
408+
def get_records(
409+
self,
410+
stream: str,
411+
*,
412+
normalize_field_names: bool = False,
413+
prune_undeclared_fields: bool = True,
414+
) -> LazyDataset:
409415
"""Read a stream from the connector.
410416
417+
Args:
418+
stream: The name of the stream to read.
419+
normalize_field_names: When `True`, field names will be normalized to lower case, with
420+
special characters removed. This matches the behavior of PyAirbyte caches and most
421+
Airbyte destinations.
422+
prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
423+
which generally matches the behavior of PyAirbyte caches and most Airbyte
424+
destinations, specifically when you expect the catalog may be stale. You can disable
425+
this to keep all fields in the records.
426+
411427
This involves the following steps:
412428
* Call discover to get the catalog
413429
* Generate a configured catalog that syncs the given stream in full_refresh mode
@@ -445,8 +461,8 @@ def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]
445461

446462
stream_record_handler = StreamRecordHandler(
447463
json_schema=self.get_stream_json_schema(stream),
448-
prune_extra_fields=True,
449-
normalize_keys=False,
464+
prune_extra_fields=prune_undeclared_fields,
465+
normalize_keys=normalize_field_names,
450466
)
451467

452468
# This method is non-blocking, so we use "PLAIN" to avoid a live progress display

airbyte/types.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# noqa: A005 # Allow shadowing the built-in 'types' module
12
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
23

34
"""Type conversion methods for SQL Caches."""
@@ -7,7 +8,7 @@
78
from typing import cast
89

910
import sqlalchemy
10-
from rich import print
11+
from rich import print # noqa: A004 # Allow shadowing the built-in
1112

1213

1314
# Compare to documentation here: https://docs.airbyte.com/understanding-airbyte/supported-data-types

0 commit comments

Comments
 (0)