Skip to content
This repository was archived by the owner on Jan 26, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbcat/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def scan_sources(
exclude_schema_regex: Optional[List[str]] = None,
include_table_regex: Optional[List[str]] = None,
exclude_table_regex: Optional[List[str]] = None,
schema: Optional[List[str]] = None,
):
with catalog.managed_session:
if source_names is not None and len(source_names) > 0:
Expand All @@ -134,6 +135,7 @@ def scan_sources(
exclude_schema_regex_str=exclude_schema_regex,
include_table_regex_str=include_table_regex,
exclude_table_regex_str=exclude_table_regex,
specific_schema=schema,
)
LOGGER.info("Scanning {}".format(scanner.name))
try:
Expand Down
36 changes: 24 additions & 12 deletions dbcat/catalog/db.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import re
from contextlib import closing
from typing import Any, Generator, List, Optional, Pattern, Tuple, Type
from typing import Any, Generator, List, Optional, Pattern, Tuple, Type, Union

from databuilder import Scoped
from databuilder.extractor.athena_metadata_extractor import AthenaMetadataExtractor
Expand All @@ -24,6 +24,7 @@
from dbcat.catalog.catalog import Catalog
from dbcat.catalog.models import CatSource
from dbcat.catalog.sqlite_extractor import SqliteMetadataExtractor
from dbcat.catalog.mysql_beta_extractor import MysqlbetaMetadataExtractor

LOGGER = logging.getLogger(__name__)

Expand All @@ -37,16 +38,18 @@ def __init__(
exclude_schema_regex_str: Optional[List[str]] = None,
include_table_regex_str: Optional[List[str]] = None,
exclude_table_regex_str: Optional[List[str]] = None,
specific_schema: Optional[str] = None,
):
self._name = source.name
self._extractor: Extractor
self._conf: ConfigTree
self._catalog = catalog
self._source = source
self._specific_schema = specific_schema
if source.source_type == "bigquery":
self._extractor, self._conf = DbScanner._create_big_query_extractor(source)
elif source.source_type == "mysql":
self._extractor, self._conf = DbScanner._create_mysql_extractor(source)
self._extractor, self._conf = DbScanner._create_mysql_extractor(self, source)
elif source.source_type == "postgresql":
self._extractor, self._conf = DbScanner._create_postgres_extractor(source)
elif source.source_type == "redshift":
Expand Down Expand Up @@ -88,7 +91,7 @@ def __init__(
@property
def name(self):
return self._name

@staticmethod
def _test_regex(
name: str,
Expand Down Expand Up @@ -240,23 +243,32 @@ def _create_big_query_extractor(

@staticmethod
def _create_mysql_extractor(
self,
source: CatSource,
) -> Tuple[MysqlMetadataExtractor, Any]:
where_clause_suffix = """
WHERE
c.table_schema NOT IN ('information_schema', 'performance_schema', 'sys', 'mysql')
"""
) -> Tuple[Union[MysqlMetadataExtractor, MysqlbetaMetadataExtractor], Any]:
if self._specific_schema is None or len(self._specific_schema) == 0:
where_clause_suffix = """
WHERE
c.table_schema NOT IN ('information_schema', 'performance_schema', 'sys', 'mysql')
"""
extractor = MysqlMetadataExtractor()
extract_info = MysqlMetadataExtractor
else:
target_schema = "'" + self._specific_schema.pop() + "'"
where_clause_suffix = 'where c.TABLE_SCHEMA = ' + target_schema
extractor = MysqlbetaMetadataExtractor()
extract_info = MysqlbetaMetadataExtractor

extractor = MysqlMetadataExtractor()
# extractor = MysqlMetadataExtractor()
scope = extractor.get_scope()
conn_string_key = f"{scope}.{SQLAlchemyExtractor().get_scope()}.{SQLAlchemyExtractor.CONN_STRING}"

conf = ConfigFactory.from_dict(
{
conn_string_key: source.conn_string,
f"{scope}.{MysqlMetadataExtractor.CLUSTER_KEY}": source.cluster,
f"{scope}.{MysqlMetadataExtractor.DATABASE_KEY}": source.database,
f"{scope}.{MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}": where_clause_suffix,
f"{scope}.{extract_info.CLUSTER_KEY}": source.cluster,
f"{scope}.{extract_info.DATABASE_KEY}": source.database,
f"{scope}.{extract_info.WHERE_CLAUSE_SUFFIX_KEY}": where_clause_suffix,
}
)

Expand Down
137 changes: 137 additions & 0 deletions dbcat/catalog/mysql_beta_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

import logging
from collections import namedtuple
from itertools import groupby
from typing import (
Any, Dict, Iterator, Union,
)

from pyhocon import ConfigFactory, ConfigTree

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata

TableKey = namedtuple('TableKey', ['schema', 'table_name'])

LOGGER = logging.getLogger(__name__)


class MysqlbetaMetadataExtractor(Extractor):
"""
Extracts mysql table and column metadata from underlying meta store database using SQLAlchemyExtractor
"""
# SELECT statement from mysql information_schema to extract table and column metadata
SQL_STATEMENT = """
SELECT
c.column_name AS col_name,
c.column_comment AS col_description,
c.data_type AS col_type,
c.ordinal_position AS col_sort_order,
c.table_catalog AS cluster,
c.table_schema AS "schema",
c.table_name AS name,
t.table_comment AS description,
case when lower(t.table_type) = "view" then "true" else "false" end AS is_view
FROM
INFORMATION_SCHEMA.TABLES t
left outer JOIN INFORMATION_SCHEMA.COLUMNS AS c
on c.TABLE_SCHEMA = t.TABLE_SCHEMA
and c.TABLE_NAME = t.TABLE_NAME
{where_clause_suffix} ;
"""

# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix'
CLUSTER_KEY = 'cluster_key'
USE_CATALOG_AS_CLUSTER_NAME = 'use_catalog_as_cluster_name'
DATABASE_KEY = 'database_key'

# Default values
DEFAULT_CLUSTER_NAME = 'master'

DEFAULT_CONFIG = ConfigFactory.from_dict(
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: DEFAULT_CLUSTER_NAME, USE_CATALOG_AS_CLUSTER_NAME: True}
)

def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(MysqlbetaMetadataExtractor.DEFAULT_CONFIG)
self._cluster = conf.get_string(MysqlbetaMetadataExtractor.CLUSTER_KEY)

if conf.get_bool(MysqlbetaMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME):
cluster_source = "c.table_catalog"
else:
cluster_source = f"'{self._cluster}'"

self._database = conf.get_string(MysqlbetaMetadataExtractor.DATABASE_KEY, default='mysql')

self.sql_stmt = MysqlbetaMetadataExtractor.SQL_STATEMENT.format(
where_clause_suffix=conf.get_string(MysqlbetaMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY),
cluster_source=cluster_source
)

self._alchemy_extractor = SQLAlchemyExtractor()
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope()) \
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))

self.sql_stmt = sql_alch_conf.get_string(SQLAlchemyExtractor.EXTRACT_SQL)

LOGGER.info('SQL for mysql metadata: %s', self.sql_stmt)

self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter: Union[None, Iterator] = None

def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self) -> str:
return 'extractor.mysql_metadata'

def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key):
columns = []

for row in group:
last_row = row
columns.append(ColumnMetadata(row['col_name'], row['col_description'],
row['col_type'], row['col_sort_order']))

yield TableMetadata(self._database, last_row['cluster'],
last_row['schema'],
last_row['name'],
last_row['description'],
columns,
is_view=last_row['is_view'])

def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
"""
row = self._alchemy_extractor.extract()
while row:
yield row
row = self._alchemy_extractor.extract()

def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
:return:
"""
if row:
return TableKey(schema=row['schema'], table_name=row['name'])

return None
8 changes: 7 additions & 1 deletion dbcat/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
match at least one --include switch but no --exclude switches. If --exclude appears without
--include, then tables matching --exclude are excluded from what is otherwise a normal scan.
"""
specific_schema_text = """
Scan a specific schema for mysql databases only.
"""

app = typer.Typer()

Expand All @@ -71,6 +74,9 @@ def scan(
exclude_table: Optional[List[str]] = typer.Option(
None, help=exclude_table_help_text
),
specific_schema: Optional[List[str]] = typer.Option(
None, help=specific_schema_text
),
):
catalog = open_catalog(
app_dir=dbcat.settings.APP_DIR,
Expand All @@ -92,6 +98,7 @@ def scan(
exclude_schema_regex=exclude_schema,
include_table_regex=include_table,
exclude_table_regex=exclude_table,
schema=specific_schema,
)
except NoMatchesError:
typer.echo(
Expand Down Expand Up @@ -335,7 +342,6 @@ def add_bigquery(
username = username,
project_id = project_id,
key_path = key_path,

)
except sqlalchemy.exc.IntegrityError:
typer.echo("Catalog with {} name already exist".format(name))
Expand Down
29 changes: 28 additions & 1 deletion test/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def test_bigquery_extractor(open_catalog_connection):
source_type="bigquery",
username="bq_username",
key_path="bq_keypath",
cred_key="bq_keypath",
project_id="bq_project_id"
)
extractor, conn_conf = DbScanner._create_big_query_extractor(source)
Expand Down Expand Up @@ -128,3 +127,31 @@ def test_sqlite_extractor(load_all_data):
assert len(partial_pii_table.columns) == 2
assert partial_pii_table.columns[0].name == "a"
assert partial_pii_table.columns[1].name == "b"


def test_mysql_extractor(open_catalog_connection):
catalog, conf = open_catalog_connection
with catalog.managed_session:
source = catalog.add_source(
name="mysql1_source",
source_type="mysql",
username="mysql_username",
password="mysql_password",
database="mysql_db_name",
uri="mysql_uri",
port="mysql_port"
)
scanner = DbScanner(
catalog=catalog,
source=source,
)
extractor, conn_conf = DbScanner._create_mysql_extractor(scanner, source)
assert (
conn_conf.get("{}.database_key".format(extractor.get_scope())) == "mysql_db_name"
)
# assert(
# conn_conf.get("{}.where_clause_suffix".format(extractor.get_scope())) == """
# WHERE
# c.table_schema NOT IN ('information_schema', 'performance_schema', 'sys', 'mysql')
# """
# )