Skip to content

Commit 0f9de0c

Browse files
committed
Fix: db2 i series connection fix(no ssl, port in con args) (#25781)
* db2 i series connection fix(no ssl, port in con args) * column type fixes * pyformat, add tests * fixes
1 parent 8737a32 commit 0f9de0c

File tree

5 files changed

+705
-14
lines changed

5 files changed

+705
-14
lines changed

ingestion/src/metadata/ingestion/source/database/column_type_parser.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,18 @@ class ColumnTypeParser:
217217
"CIDR": "CIDR",
218218
"INET": "INET",
219219
"TSRANGE": "DATETIMERANGE",
220+
# DB2
221+
"XMLVARCHAR": "XML",
222+
"XMLCLOB": "XML",
223+
"XMLFILE": "XML",
224+
"TIMESTMP": "TIMESTAMP",
225+
"LONGVARCHAR": "VARCHAR",
226+
"GRAPHIC": "CHAR",
227+
"VARGRAPHIC": "VARCHAR",
228+
"LONGVARGRAPHIC": "VARCHAR",
229+
"DBCLOB": "CLOB",
230+
"DECFLOAT": "DOUBLE",
231+
"CHARACTER": "CHAR",
220232
# ORACLE
221233
"BINARY_DOUBLE": "DOUBLE",
222234
"BINARY_FLOAT": "FLOAT",

ingestion/src/metadata/ingestion/source/database/db2/connection.py

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
"""
1313
Source connection handler
1414
"""
15+
import importlib
16+
import sys
1517
from pathlib import Path
16-
from typing import Optional
18+
from typing import Any, Dict, Optional
19+
from urllib.parse import quote_plus
1720

1821
from sqlalchemy.engine import Engine
1922

@@ -22,6 +25,7 @@
2225
)
2326
from metadata.generated.schema.entity.services.connections.database.db2Connection import (
2427
Db2Connection,
28+
Db2Scheme,
2529
)
2630
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
2731
TestConnectionResult,
@@ -30,6 +34,7 @@
3034
create_generic_db_connection,
3135
get_connection_args_common,
3236
get_connection_url_common,
37+
get_password_secret,
3338
)
3439
from metadata.ingestion.connections.test_connections import test_connection_db_common
3540
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@@ -44,6 +49,50 @@
4449
logger = ingestion_logger()
4550

4651

52+
def _get_ibmi_connection_url(connection: Db2Connection) -> str:
53+
"""
54+
Build connection URL for ibmi scheme.
55+
56+
sqlalchemy-ibmi validates URL options strictly and does NOT support:
57+
- Port in the URL (host:port format) — causes validation error
58+
- SSL parameters like SECURITY=SSL — SSL is negotiated via port (9471)
59+
60+
Port is passed separately via connect_args.
61+
"""
62+
hostname = connection.hostPort.split(":")[0]
63+
64+
url = f"{connection.scheme.value}://"
65+
if connection.username:
66+
url += f"{quote_plus(connection.username)}"
67+
password = get_password_secret(connection)
68+
url += f":{quote_plus(password.get_secret_value())}"
69+
url += "@"
70+
url += hostname
71+
if connection.database:
72+
url += f"/{connection.database}"
73+
return url
74+
75+
76+
def _get_ibmi_connection_args(connection: Db2Connection) -> Dict[str, Any]:
77+
"""
78+
Build connection args for ibmi scheme.
79+
80+
Port must be passed via connect_args since sqlalchemy-ibmi
81+
rejects it in the URL.
82+
"""
83+
args = get_connection_args_common(connection)
84+
host_port = connection.hostPort
85+
if ":" in host_port:
86+
port_str = host_port.split(":")[1]
87+
try:
88+
args["port"] = int(port_str)
89+
except ValueError:
90+
raise ValueError(
91+
f"Invalid port in hostPort '{host_port}'. Expected format: 'hostname:port'"
92+
)
93+
return args
94+
95+
4796
def get_connection(connection: Db2Connection) -> Engine:
4897
"""
4998
Create connection
@@ -55,22 +104,43 @@ def get_connection(connection: Db2Connection) -> Engine:
55104
clidriver_version = check_clidriver_version(clidriver_version)
56105
if clidriver_version:
57106
install_clidriver(clidriver_version.value)
107+
# Invalidate cached clidriver module so the next import
108+
# picks up the freshly installed path
109+
sys.modules.pop("clidriver", None)
58110

59111
# prepare license
60112
# pylint: disable=import-outside-toplevel
61113
if connection.license and connection.licenseFileName:
62114
import clidriver
63115

116+
if clidriver_version:
117+
importlib.reload(clidriver)
118+
119+
license_dir = Path(clidriver.__path__[0], "license")
120+
license_dir.mkdir(parents=True, exist_ok=True)
121+
64122
with open(
65-
Path(clidriver.__path__[0], "license", connection.licenseFileName),
123+
license_dir / connection.licenseFileName,
66124
"w",
67125
encoding=UTF_8,
68126
) as file:
69127
file.write(connection.license.encode(UTF_8).decode("unicode-escape"))
70128

71-
ssl_manager = check_ssl_and_init(connection)
72-
if ssl_manager:
73-
connection = ssl_manager.setup_ssl(connection)
129+
is_ibmi = connection.scheme == Db2Scheme.ibmi
130+
131+
# SSL setup only for db2+ibm_db scheme.
132+
# ibmi scheme negotiates SSL via port (9471), not connection params.
133+
if not is_ibmi:
134+
ssl_manager = check_ssl_and_init(connection)
135+
if ssl_manager:
136+
connection = ssl_manager.setup_ssl(connection)
137+
138+
if is_ibmi:
139+
return create_generic_db_connection(
140+
connection=connection,
141+
get_connection_url_fn=_get_ibmi_connection_url,
142+
get_connection_args_fn=_get_ibmi_connection_args,
143+
)
74144

75145
return create_generic_db_connection(
76146
connection=connection,

ingestion/src/metadata/ingestion/source/database/db2/metadata.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,9 @@
1212
import traceback
1313
from typing import Iterable, Optional
1414

15-
from ibm_db_sa.base import ischema_names
16-
from ibm_db_sa.reflection import DB2Reflector, OS390Reflector
1715
from sqlalchemy.engine.reflection import Inspector
1816
from sqlalchemy.engine.row import LegacyRow
19-
from sqlalchemy.sql.sqltypes import BOOLEAN
17+
from sqlalchemy.sql.sqltypes import BINARY, BOOLEAN, VARBINARY
2018

2119
from metadata.generated.schema.entity.services.connections.database.db2Connection import (
2220
Db2Connection,
@@ -26,18 +24,39 @@
2624
)
2725
from metadata.ingestion.api.steps import InvalidSourceException
2826
from metadata.ingestion.ometa.ometa_api import OpenMetadata
27+
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
2928
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
30-
from metadata.ingestion.source.database.db2.utils import get_unique_constraints
29+
from metadata.ingestion.source.database.db2.utils import (
30+
get_columns_os390,
31+
get_unique_constraints,
32+
)
3133
from metadata.utils.logger import ingestion_logger
3234

3335
logger = ingestion_logger()
3436

37+
# ibm_db_sa is only required for the db2+ibm_db scheme (z/OS, LUW).
38+
# The ibmi scheme uses sqlalchemy-ibmi which does not depend on ibm_db_sa.
39+
try:
40+
from ibm_db_sa.base import ischema_names
41+
from ibm_db_sa.reflection import DB2Reflector, OS390Reflector
3542

36-
ischema_names.update({"BOOLEAN": BOOLEAN})
37-
38-
39-
DB2Reflector.get_unique_constraints = get_unique_constraints
40-
OS390Reflector.get_unique_constraints = get_unique_constraints
43+
ischema_names.update(
44+
{
45+
"BOOLEAN": BOOLEAN,
46+
"BINARY": BINARY,
47+
"VARBINARY": VARBINARY,
48+
"DECFLOAT": create_sqlalchemy_type("DECFLOAT"),
49+
"ROWID": create_sqlalchemy_type("ROWID"),
50+
"XMLVARCHAR": create_sqlalchemy_type("XMLVARCHAR"),
51+
"XMLCLOB": create_sqlalchemy_type("XMLCLOB"),
52+
"XMLFILE": create_sqlalchemy_type("XMLFILE"),
53+
}
54+
)
55+
DB2Reflector.get_unique_constraints = get_unique_constraints
56+
OS390Reflector.get_unique_constraints = get_unique_constraints
57+
OS390Reflector.get_columns = get_columns_os390
58+
except ImportError:
59+
logger.debug("ibm_db_sa not installed - db2+ibm_db scheme unavailable")
4160

4261

4362
class Db2Source(CommonDbSourceService):

ingestion/src/metadata/ingestion/source/database/db2/utils.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from sqlalchemy import and_, join, sql
1818
from sqlalchemy.engine import reflection
19+
from sqlalchemy.sql import sqltypes as sa_types
1920

2021
from metadata.utils.logger import ingestion_logger
2122

@@ -40,6 +41,67 @@ class DB2CLIDriverVersions(Enum):
4041
V12_1_0 = "12.1.0"
4142

4243

44+
@reflection.cache
45+
def get_columns_os390(
46+
self, connection, table_name, schema=None, **kw
47+
): # pylint: disable=unused-argument
48+
"""Override OS390Reflector.get_columns to handle empty/unrecognized types
49+
gracefully instead of emitting SAWarnings."""
50+
current_schema = self.denormalize_name(schema or self.default_schema_name)
51+
table_name = self.denormalize_name(table_name)
52+
syscols = self.sys_columns
53+
54+
query = (
55+
sql.select(
56+
syscols.c.colname,
57+
syscols.c.typename,
58+
syscols.c.defaultval,
59+
syscols.c.nullable,
60+
syscols.c.length,
61+
syscols.c.scale,
62+
syscols.c.generated,
63+
syscols.c.remark,
64+
)
65+
.where(
66+
and_(
67+
syscols.c.tabschema == current_schema,
68+
syscols.c.tabname == table_name,
69+
)
70+
)
71+
.order_by(syscols.c.colno)
72+
)
73+
sa_columns = []
74+
for r in connection.execute(query):
75+
coltype = r[1].strip().upper() if r[1] else ""
76+
if coltype in ["DECIMAL", "NUMERIC"]:
77+
coltype = self.ischema_names.get(coltype)(int(r[4]), int(r[5]))
78+
elif coltype in ["CHARACTER", "CHAR", "VARCHAR", "GRAPHIC", "VARGRAPHIC"]:
79+
coltype = self.ischema_names.get(coltype)(int(r[4]))
80+
elif coltype and coltype in self.ischema_names:
81+
coltype = self.ischema_names[coltype]
82+
else:
83+
if not coltype:
84+
logger.warning(f"Empty type for column '{r[0]}' - ingesting as UNKNOWN")
85+
else:
86+
logger.warning(
87+
f"Did not recognize type '{coltype}' of column '{r[0]}'"
88+
" - ingesting as UNKNOWN"
89+
)
90+
coltype = sa_types.NULLTYPE
91+
92+
sa_columns.append(
93+
{
94+
"name": self.normalize_name(r[0]),
95+
"type": coltype,
96+
"nullable": r[3] == "Y",
97+
"default": r[2] or None,
98+
"autoincrement": r[6] not in (" ", None),
99+
"comment": r[7] or None,
100+
}
101+
)
102+
return sa_columns
103+
104+
43105
@reflection.cache
44106
def get_unique_constraints(
45107
self, connection, table_name, schema=None, **kw

0 commit comments

Comments
 (0)