diff --git a/singlestoredb/fusion/handler.py b/singlestoredb/fusion/handler.py index 031865324..1fcde40d4 100644 --- a/singlestoredb/fusion/handler.py +++ b/singlestoredb/fusion/handler.py @@ -11,6 +11,7 @@ from typing import Iterable from typing import List from typing import Optional +from typing import Set from typing import Tuple from parsimonious import Grammar @@ -23,9 +24,9 @@ CORE_GRAMMAR = r''' ws = ~r"(\s+|(\s*/\*.*\*/\s*)+)" - qs = ~r"\"([^\"]*)\"|'([^\']*)'|`([^\`]*)`|([A-Za-z0-9_\-\.]+)" - number = ~r"[-+]?(\d*\.)?\d+(e[-+]?\d+)?"i - integer = ~r"-?\d+" + qs = ~r"\"([^\"]*)\"|'([^\']*)'|([A-Za-z0-9_\-\.]+)|`([^\`]+)`" ws* + number = ~r"[-+]?(\d*\.)?\d+(e[-+]?\d+)?"i ws* + integer = ~r"-?\d+" ws* comma = ws* "," ws* eq = ws* "=" ws* open_paren = ws* "(" ws* @@ -33,6 +34,10 @@ open_repeats = ws* ~r"[\(\[\{]" ws* close_repeats = ws* ~r"[\)\]\}]" ws* select = ~r"SELECT"i ws+ ~r".+" ws* + table = ~r"(?:([A-Za-z0-9_\-]+)|`([^\`]+)`)(?:\.(?:([A-Za-z0-9_\-]+)|`([^\`]+)`))?" ws* + column = ~r"(?:([A-Za-z0-9_\-]+)|`([^\`]+)`)(?:\.(?:([A-Za-z0-9_\-]+)|`([^\`]+)`))?" ws* + link_name = ~r"(?:([A-Za-z0-9_\-]+)|`([^\`]+)`)(?:\.(?:([A-Za-z0-9_\-]+)|`([^\`]+)`))?" ws* + catalog_name = ~r"(?:([A-Za-z0-9_\-]+)|`([^\`]+)`)(?:\.(?:([A-Za-z0-9_\-]+)|`([^\`]+)`))?" ws* json = ws* json_object ws* json_object = ~r"{\s*" json_members? ~r"\s*}" @@ -65,6 +70,10 @@ '': '', '': '', '': '', + '': '', + '': '', + '': '', + '': '', } BUILTIN_DEFAULTS = { # type: ignore @@ -226,9 +235,13 @@ def build_syntax(grammar: str) -> str: # Split on ';' on a line by itself cmd, end = grammar.split(';', 1) - rules = {} + name = '' + rules: Dict[str, Any] = {} for line in end.split('\n'): line = line.strip() + if line.startswith('&'): + rules[name] += '\n' + line + continue if not line: continue name, value = line.split('=', 1) @@ -239,10 +252,16 @@ def build_syntax(grammar: str) -> str: while re.search(r' [a-z0-9_]+\b', cmd): cmd = re.sub(r' ([a-z0-9_]+)\b', functools.partial(expand_rules, rules), cmd) + def add_indent(m: Any) -> str: + return ' ' + (len(m.group(1)) * ' ') + + # Indent line-continuations + cmd = re.sub(r'^(\&+)\s*', add_indent, cmd, flags=re.M) + cmd = textwrap.dedent(cmd).rstrip() + ';' - cmd = re.sub(r' +', ' ', cmd) - cmd = re.sub(r'^ ', ' ', cmd, flags=re.M) - cmd = re.sub(r'\s+,\.\.\.', ',...', cmd) + cmd = re.sub(r'(\S) +', r'\1 ', cmd) + cmd = re.sub(r'', ',', cmd) + cmd = re.sub(r'\s+,\s*\.\.\.', ',...', cmd) return cmd @@ -399,9 +418,15 @@ def process_grammar( help_txt = build_help(syntax_txt, full_grammar) grammar = build_cmd(grammar) + # Remove line-continuations + grammar = re.sub(r'\n\s*&+', r'', grammar) + # Make sure grouping characters all have whitespace around them grammar = re.sub(r' *(\[|\{|\||\}|\]) *', r' \1 ', grammar) + grammar = re.sub(r'\(', r' open_paren ', grammar) + grammar = re.sub(r'\)', r' close_paren ', grammar) + for line in grammar.split('\n'): if not line.strip(): continue @@ -418,7 +443,7 @@ def process_grammar( sql = re.sub(r'\]\s+\[', r' | ', sql) # Lower-case keywords and make them case-insensitive - sql = re.sub(r'(\b|@+)([A-Z0-9]+)\b', lower_and_regex, sql) + sql = re.sub(r'(\b|@+)([A-Z0-9_]+)\b', lower_and_regex, sql) # Convert literal strings to 'qs' sql = re.sub(r"'[^']+'", r'qs', sql) @@ -461,12 +486,18 @@ def process_grammar( sql = re.sub(r'\s+ws$', r' ws*', sql) sql = re.sub(r'\s+ws\s+\(', r' ws* (', sql) sql = re.sub(r'\)\s+ws\s+', r') ws* ', sql) - sql = re.sub(r'\s+ws\s+', r' ws+ ', sql) + sql = re.sub(r'\s+ws\s+', r' ws* ', sql) sql = re.sub(r'\?\s+ws\+', r'? ws*', sql) # Remove extra ws around eq sql = re.sub(r'ws\+\s*eq\b', r'eq', sql) + # Remove optional groupings when mandatory groupings are specified + sql = re.sub(r'open_paren\s+ws\*\s+open_repeats\?', r'open_paren', sql) + sql = re.sub(r'close_repeats\?\s+ws\*\s+close_paren', r'close_paren', sql) + sql = re.sub(r'open_paren\s+open_repeats\?', r'open_paren', sql) + sql = re.sub(r'close_repeats\?\s+close_paren', r'close_paren', sql) + out.append(f'{op} = {sql}') for k, v in list(rules.items()): @@ -548,6 +579,7 @@ class SQLHandler(NodeVisitor): def __init__(self, connection: Connection): self.connection = connection + self._handled: Set[str] = set() @classmethod def compile(cls, grammar: str = '') -> None: @@ -614,12 +646,16 @@ def execute(self, sql: str) -> result.FusionSQLResult: ) type(self).compile() + self._handled = set() try: params = self.visit(type(self).grammar.parse(sql)) for k, v in params.items(): params[k] = self.validate_rule(k, v) res = self.run(params) + + self._handled = set() + if res is not None: res.format_results(self.connection) return res @@ -666,16 +702,20 @@ def visit_qs(self, node: Node, visited_children: Iterable[Any]) -> Any: """Quoted strings.""" if node is None: return None - return node.match.group(1) or node.match.group(2) or \ - node.match.group(3) or node.match.group(4) + return flatten(visited_children)[0] + + def visit_compound(self, node: Node, visited_children: Iterable[Any]) -> Any: + """Compound name.""" + print(visited_children) + return flatten(visited_children)[0] def visit_number(self, node: Node, visited_children: Iterable[Any]) -> Any: """Numeric value.""" - return float(node.match.group(0)) + return float(flatten(visited_children)[0]) def visit_integer(self, node: Node, visited_children: Iterable[Any]) -> Any: """Integer value.""" - return int(node.match.group(0)) + return int(flatten(visited_children)[0]) def visit_ws(self, node: Node, visited_children: Iterable[Any]) -> Any: """Whitespace and comments.""" @@ -804,19 +844,29 @@ def generic_visit(self, node: Node, visited_children: Iterable[Any]) -> Any: if node.expr_name.endswith('_cmd'): final = merge_dicts(flatten(visited_children)[n_keywords:]) for k, v in type(self).rule_info.items(): - if k.endswith('_cmd') or k.endswith('_'): + if k.endswith('_cmd') or k.endswith('_') or k.startswith('_'): continue - if k not in final: + if k not in final and k not in self._handled: final[k] = BUILTIN_DEFAULTS.get(k, v['default']) return final # Filter out stray empty strings out = [x for x in flatten(visited_children)[n_keywords:] if x] - if repeats or len(out) > 1: - return {node.expr_name: out} + # Remove underscore prefixes from rule name + key_name = re.sub(r'^_+', r'', node.expr_name) - return {node.expr_name: out[0] if out else True} + if repeats or len(out) > 1: + self._handled.add(node.expr_name) + # If all outputs are dicts, merge them + if len(out) > 1 and not repeats: + is_dicts = [x for x in out if isinstance(x, dict)] + if len(is_dicts) == len(out): + return {key_name: merge_dicts(out)} + return {key_name: out} + + self._handled.add(node.expr_name) + return {key_name: out[0] if out else True} if hasattr(node, 'match'): if not visited_children and not node.match.groups(): diff --git a/singlestoredb/fusion/handlers/export.py b/singlestoredb/fusion/handlers/export.py new file mode 100644 index 000000000..8390af0ea --- /dev/null +++ b/singlestoredb/fusion/handlers/export.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +import json +from typing import Any +from typing import Dict +from typing import Optional + +from .. import result +from ...management.export import Catalog +from ...management.export import ExportService +from ...management.export import ExportStatus +from ...management.export import Link +from ..handler import SQLHandler +from ..result import FusionSQLResult +from .utils import get_workspace_group + + +class CreateClusterIdentity(SQLHandler): + """ + CREATE CLUSTER IDENTITY + catalog + storage + ; + + # Catolog + catalog = CATALOG { _catalog_config | _catalog_creds } + _catalog_config = CONFIG '' + _catalog_creds = CREDENTIALS '' + + # Storage + storage = LINK { _link_config | _link_creds } + _link_config = S3 CONFIG '' + _link_creds = CREDENTIALS '' + + Description + ----------- + Create a cluster identity for allowing the export service to access + external cloud resources. + + Arguments + --------- + * ```` and ````: Catalog configuration + and credentials in JSON format. + * ```` and ````: Storage link configuration + and credentials in JSON format. + + Remarks + ------- + * ``FROM
`` specifies the SingleStore table to export. The same name will + be used for the exported table. + * ``CATALOG`` specifies the details of the catalog to connect to. + * ``LINK`` specifies the details of the data storage to connect to. + + Example + ------- + The following statement creates a cluster identity for the catalog + and link:: + + CREATE CLUSTER IDENTITY + CATALOG CONFIG '{ + "type": "GLUE", + "table_format": "ICEBERG", + "id": "13983498723498", + "region": "us-east-1" + }' + LINK S3 CONFIG '{ + "region": "us-east-1", + "endpoint_url": "s3://bucket-name" + + }' + ; + + """ + def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: + # Catalog + catalog_config = json.loads(params['catalog'].get('catalog_config', '{}') or '{}') + catalog_creds = json.loads(params['catalog'].get('catalog_creds', '{}') or '{}') + + # Storage + storage_config = json.loads(params['storage'].get('link_config', '{}') or '{}') + storage_creds = json.loads(params['storage'].get('link_creds', '{}') or '{}') + + wsg = get_workspace_group({}) + + if wsg._manager is None: + raise TypeError('no workspace manager is associated with workspace group') + + out = ExportService( + wsg, + 'none', + 'none', + Catalog.from_config_and_creds(catalog_config, catalog_creds, wsg._manager), + Link.from_config_and_creds('S3', storage_config, storage_creds, wsg._manager), + columns=None, + ).create_cluster_identity() + + res = FusionSQLResult() + res.add_field('RoleARN', result.STRING) + res.set_rows([(out['roleARN'],)]) + + return res + + +CreateClusterIdentity.register(overwrite=True) + + +class CreateExport(SQLHandler): + """ + START EXPORT + from_table + catalog + storage + ; + + # From table + from_table = FROM
+ + # Catolog + catalog = CATALOG [ _catalog_config ] [ _catalog_creds ] + _catalog_config = CONFIG '' + _catalog_creds = CREDENTIALS '' + + # Storage + storage = LINK [ _link_config ] [ _link_creds ] + _link_config = S3 CONFIG '' + _link_creds = CREDENTIALS '' + + Description + ----------- + Create an export configuration. + + Arguments + --------- + * ```` and ````: The catalog configuration. + * ```` and ````: The storage link configuration. + + Remarks + ------- + * ``FROM
`` specifies the SingleStore table to export. The same name will + be used for the exported table. + * ``CATALOG`` specifies the details of the catalog to connect to. + * ``LINK`` specifies the details of the data storage to connect to. + + Examples + -------- + The following statement starts an export operation with the given + catalog and link configurations. The source table to export is + named "customer_data":: + + START EXPORT FROM customer_data + CATALOG CONFIG '{ + "type": "GLUE", + "table_format": "ICEBERG", + "id": "13983498723498", + "region": "us-east-1" + }' + LINK S3 CONFIG '{ + "region": "us-east-1", + "endpoint_url": "s3://bucket-name" + + }' + ; + + """ # noqa + + def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: + # From table + if isinstance(params['from_table'], str): + from_database = None + from_table = params['from_table'] + else: + from_database, from_table = params['from_table'] + + # Catalog + catalog_config = json.loads(params['catalog'].get('catalog_config', '{}') or '{}') + catalog_creds = json.loads(params['catalog'].get('catalog_creds', '{}') or '{}') + + # Storage + storage_config = json.loads(params['storage'].get('link_config', '{}') or '{}') + storage_creds = json.loads(params['storage'].get('link_creds', '{}') or '{}') + + wsg = get_workspace_group({}) + + if from_database is None: + raise ValueError('database name must be specified for source table') + + if wsg._manager is None: + raise TypeError('no workspace manager is associated with workspace group') + + out = ExportService( + wsg, + from_database, + from_table, + Catalog.from_config_and_creds(catalog_config, catalog_creds, wsg._manager), + Link.from_config_and_creds('S3', storage_config, storage_creds, wsg._manager), + columns=None, + ).start() + + res = FusionSQLResult() + res.add_field('ExportID', result.STRING) + res.set_rows([(out.export_id,)]) + + return res + + +CreateExport.register(overwrite=True) + + +class ShowExport(SQLHandler): + """ + SHOW EXPORT export_id; + + # ID of export + export_id = '' + + """ + def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: + wsg = get_workspace_group({}) + out = ExportStatus(params['export_id'], wsg) + + status = out._info() + + res = FusionSQLResult() + res.add_field('ExportID', result.STRING) + res.add_field('Status', result.STRING) + res.add_field('Message', result.STRING) + res.set_rows([ + ( + params['export_id'], + status.get('status', 'Unknown'), + status.get('statusMsg', ''), + ), + ]) + + return res + + +ShowExport.register(overwrite=True) diff --git a/singlestoredb/fusion/handlers/job.py b/singlestoredb/fusion/handlers/job.py index 8098fc5bf..22f58a627 100644 --- a/singlestoredb/fusion/handlers/job.py +++ b/singlestoredb/fusion/handlers/job.py @@ -136,8 +136,8 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: execution_interval_in_mins = None if params.get('execute_every'): - execution_interval_in_mins = params['execute_every'][0]['interval'] - time_unit = params['execute_every'][-1]['time_unit'].upper() + execution_interval_in_mins = params['execute_every']['interval'] + time_unit = params['execute_every']['time_unit'].upper() if time_unit == 'MINUTES': pass elif time_unit == 'HOURS': @@ -292,8 +292,8 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: timeout_in_secs = None if params.get('with_timeout'): - timeout_in_secs = params['with_timeout'][0]['time'] - time_unit = params['with_timeout'][-1]['time_unit'].upper() + timeout_in_secs = params['with_timeout']['time'] + time_unit = params['with_timeout']['time_unit'].upper() if time_unit == 'SECONDS': pass elif time_unit == 'MINUTES': diff --git a/singlestoredb/management/export.py b/singlestoredb/management/export.py new file mode 100644 index 000000000..a7bdde566 --- /dev/null +++ b/singlestoredb/management/export.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python +"""SingleStoreDB export service.""" +from __future__ import annotations + +import abc +import re +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + +from .. import ManagementError +from .utils import vars_to_str +from .workspace import WorkspaceGroup +from .workspace import WorkspaceManager + + +class Link(object): + """Generic storage base class.""" + scheme: str = 'unknown' + + def __str__(self) -> str: + """Return string representation.""" + return vars_to_str(self) + + def __repr__(self) -> str: + """Return string representation.""" + return str(self) + + @abc.abstractmethod + def to_storage_location(self) -> Dict[str, Any]: + raise NotImplementedError + + @classmethod + def from_config_and_creds( + cls, + scheme: str, + config: Dict[str, Any], + credentials: Dict[str, Any], + manager: 'WorkspaceManager', + ) -> 'Link': + out_cls = None + for c in cls.__subclasses__(): + if c.scheme == scheme.upper(): + out_cls = c + break + + if out_cls is None: + raise TypeError(f'No link class found for given information: {scheme}') + + return out_cls.from_config_and_creds(scheme, config, credentials, manager) + + +class S3Link(Link): + """S3 link.""" + + scheme: str = 'S3' + region: str + storage_base_url: str + + def __init__(self, region: str, storage_base_url: str): + self.region = region + self.storage_base_url = storage_base_url + self._manager: Optional[WorkspaceManager] = None + + def to_storage_location(self) -> Dict[str, Any]: + return dict( + storageBaseURL=self.storage_base_url, + storageRegion=self.region, + ) + + @classmethod + def from_config_and_creds( + cls, + scheme: str, + config: Dict[str, Any], + credentials: Dict[str, Any], + manager: 'WorkspaceManager', + ) -> 'S3Link': + assert scheme.upper() == cls.scheme + + params: Dict[str, Any] = {} + params.update(config) + params.update(credentials) + + assert params.get('region'), 'region is required' + assert params.get('endpoint_url'), 'endpoint_url is required' + + out = cls(params['region'], params['endpoint_url']) + out._manager = manager + return out + + +class Catalog(object): + """Generic catalog base class.""" + + catalog_type: str = 'UNKNOWN' + table_format: str = 'UNKNOWN' + + def __str__(self) -> str: + """Return string representation.""" + return vars_to_str(self) + + def __repr__(self) -> str: + """Return string representation.""" + return str(self) + + @classmethod + def from_config_and_creds( + cls, + config: Dict[str, Any], + credentials: Dict[str, Any], + manager: 'WorkspaceManager', + ) -> 'Catalog': + catalog_type = config['type'].upper() + table_format = config['table_format'].upper() + + out_cls = None + for c in cls.__subclasses__(): + if c.catalog_type == catalog_type and c.table_format == table_format: + out_cls = c + break + + if out_cls is None: + raise TypeError(f'No catalog class found for given information: {config}') + + return out_cls.from_config_and_creds(config, credentials, manager) + + @abc.abstractmethod + def to_catalog_info(self) -> Dict[str, Any]: + """Return a catalog info dictionary.""" + raise NotImplementedError + + +class IcebergGlueCatalog(Catalog): + """Iceberg glue catalog.""" + + table_format = 'ICEBERG' + catalog_type = 'GLUE' + + region: str + catalog_id: str + + def __init__(self, region: str, catalog_id: str): + self.region = region + self.catalog_id = catalog_id + self._manager: Optional[WorkspaceManager] = None + + @classmethod + def from_config_and_creds( + cls, + config: Dict[str, Any], + credentials: Dict[str, Any], + manager: 'WorkspaceManager', + ) -> 'IcebergGlueCatalog': + params = {} + params.update(config) + params.update(credentials) + + out = cls( + region=params['region'], + catalog_id=params['id'], + ) + out._manager = manager + return out + + def to_catalog_info(self) -> Dict[str, Any]: + """Return a catalog info dictionary.""" + return dict( + catalogSource=self.catalog_type, + tableFormat=self.table_format, + glueRegion=self.region, + glueCatalogID=self.catalog_id, + ) + + +class ExportService(object): + """Export service.""" + + database: str + table: str + catalog: Catalog + storage_link: Link + columns: Optional[List[str]] + + def __init__( + self, + workspace_group: WorkspaceGroup, + database: str, + table: str, + catalog: Catalog, + storage_link: Link, + columns: Optional[List[str]], + ): + #: Workspace group + self.workspace_group = workspace_group + + #: Name of SingleStoreDB database + self.database = database + + #: Name of SingleStoreDB table + self.table = table + + #: List of columns to export + self.columns = columns + + #: Catalog + self.catalog = catalog + + #: Storage + self.storage_link = storage_link + + self._manager: Optional[WorkspaceManager] = workspace_group._manager + + def __str__(self) -> str: + """Return string representation.""" + return vars_to_str(self) + + def __repr__(self) -> str: + """Return string representation.""" + return str(self) + + def create_cluster_identity(self) -> Dict[str, Any]: + """Create a cluster identity.""" + if self._manager is None: + raise ManagementError( + msg='No workspace manager is associated with this object.', + ) + + if not isinstance(self.catalog, IcebergGlueCatalog): + raise TypeError('Only Iceberg Glue catalog is supported at this time.') + + if not isinstance(self.storage_link, S3Link): + raise TypeError('Only S3 links are supported at this time.') + + out = self._manager._post( + f'workspaceGroups/{self.workspace_group.id}/' + 'egress/createEgressClusterIdentity', + json=dict( + storageBucketName=re.split( + r'/+', self.storage_link.storage_base_url, + )[1], + glueRegion=self.catalog.region, + glueCatalogID=self.catalog.catalog_id, + ), + ) + + return out.json() + + def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus': + """Start the export process.""" + if self._manager is None: + raise ManagementError( + msg='No workspace manager is associated with this object.', + ) + + if not isinstance(self.storage_link, S3Link): + raise TypeError('Only S3 links are supported at this time.') + + out = self._manager._post( + f'workspaceGroups/{self.workspace_group.id}/egress/startTableEgress', + json=dict( + databaseName=self.database, + tableName=self.table, + storageLocation=self.storage_link.to_storage_location(), + catalogInfo=self.catalog.to_catalog_info(), + ), + ) + + return ExportStatus(out.json()['egressID'], self.workspace_group) + + +class ExportStatus(object): + + export_id: str + + def __init__(self, export_id: str, workspace_group: WorkspaceGroup): + self.export_id = export_id + self.workspace_group = workspace_group + self._manager: Optional[WorkspaceManager] = workspace_group._manager + + def _info(self) -> Dict[str, Any]: + """Return export status.""" + if self._manager is None: + raise ManagementError( + msg='No workspace manager is associated with this object.', + ) + + out = self._manager._post( + f'workspaceGroups/{self.workspace_group.id}/egress/tableEgressStatus', + json=dict(egressID=self.export_id), + ) + + return out.json() + + @property + def status(self) -> str: + """Return export status.""" + return self._info().get('status', 'Unknown') + + @property + def message(self) -> str: + """Return export status message.""" + return self._info().get('statusMsg', '') + + def __str__(self) -> str: + return self.status + + def __repr__(self) -> str: + return self.status diff --git a/singlestoredb/tests/test_fusion.py b/singlestoredb/tests/test_fusion.py index 798ab2299..60e22ccda 100644 --- a/singlestoredb/tests/test_fusion.py +++ b/singlestoredb/tests/test_fusion.py @@ -465,10 +465,6 @@ def test_create_drop_workspace_group(self): pass -@unittest.skipIf( - os.environ.get('SINGLESTOREDB_FUSION_ENABLE_HIDDEN', '0') == '0', - 'Hidden Fusion commands are not enabled.', -) @pytest.mark.management class TestJobsFusion(unittest.TestCase):