diff --git a/Pipfile.lock b/Pipfile.lock index 5f4c91e6e..0c0607eea 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -475,23 +475,31 @@ }, "mypy": { "hashes": [ - "sha256:0a0d102247c16ce93c97066443d11e2d36e6cc2a32d8ccc1f705268970479324", - "sha256:0d34d6b122597d48a36d6c59e35341f410d4abfa771d96d04ae2c468dd201abc", - "sha256:2170492030f6faa537647d29945786d297e4862765f0b4ac5930ff62e300d802", - "sha256:2842d4fbd1b12ab422346376aad03ff5d0805b706102e475e962370f874a5122", - "sha256:2b21ba45ad9ef2e2eb88ce4aeadd0112d0f5026418324176fd494a6824b74975", - "sha256:72060bf64f290fb629bd4a67c707a66fd88ca26e413a91384b18db3876e57ed7", - "sha256:af4e9ff1834e565f1baa74ccf7ae2564ae38c8df2a85b057af1dbbc958eb6666", - "sha256:bd03b3cf666bff8d710d633d1c56ab7facbdc204d567715cb3b9f85c6e94f669", - "sha256:c614194e01c85bb2e551c421397e49afb2872c88b5830e3554f0519f9fb1c178", - "sha256:cf4e7bf7f1214826cf7333627cb2547c0db7e3078723227820d0a2490f117a01", - "sha256:da56dedcd7cd502ccd3c5dddc656cb36113dd793ad466e894574125945653cea", - "sha256:e86bdace26c5fe9cf8cb735e7cedfe7850ad92b327ac5d797c656717d2ca66de", - "sha256:e97e9c13d67fbe524be17e4d8025d51a7dca38f90de2e462243ab8ed8a9178d1", - "sha256:eea260feb1830a627fb526d22fbb426b750d9f5a47b624e8d5e7e004359b219c" + "sha256:0d0a87c0e7e3a9becdfbe936c981d32e5ee0ccda3e0f07e1ef2c3d1a817cf73e", + "sha256:25adde9b862f8f9aac9d2d11971f226bd4c8fbaa89fb76bdadb267ef22d10064", + "sha256:28fb5479c494b1bab244620685e2eb3c3f988d71fd5d64cc753195e8ed53df7c", + "sha256:2f9b3407c58347a452fc0736861593e105139b905cca7d097e413453a1d650b4", + "sha256:33f159443db0829d16f0a8d83d94df3109bb6dd801975fe86bacb9bf71628e97", + "sha256:3f2aca7f68580dc2508289c729bd49ee929a436208d2b2b6aab15745a70a57df", + "sha256:499c798053cdebcaa916eef8cd733e5584b5909f789de856b482cd7d069bdad8", + "sha256:4eec37370483331d13514c3f55f446fc5248d6373e7029a29ecb7b7494851e7a", + "sha256:552a815579aa1e995f39fd05dde6cd378e191b063f031f2acfe73ce9fb7f9e56", + "sha256:5873888fff1c7cf5b71efbe80e0e73153fe9212fafdf8e44adfe4c20ec9f82d7", + "sha256:61a3d5b97955422964be6b3baf05ff2ce7f26f52c85dd88db11d5e03e146a3a6", + "sha256:674e822aa665b9fd75130c6c5f5ed9564a38c6cea6a6432ce47eafb68ee578c5", + "sha256:7ce3175801d0ae5fdfa79b4f0cfed08807af4d075b402b7e294e6aa72af9aa2a", + "sha256:9743c91088d396c1a5a3c9978354b61b0382b4e3c440ce83cf77994a43e8c521", + "sha256:9f94aac67a2045ec719ffe6111df543bac7874cee01f41928f6969756e030564", + "sha256:a26f8ec704e5a7423c8824d425086705e381b4f1dfdef6e3a1edab7ba174ec49", + "sha256:abf7e0c3cf117c44d9285cc6128856106183938c68fd4944763003decdcfeb66", + "sha256:b09669bcda124e83708f34a94606e01b614fa71931d356c1f1a5297ba11f110a", + "sha256:cd07039aa5df222037005b08fbbfd69b3ab0b0bd7a07d7906de75ae52c4e3119", + "sha256:d23e0ea196702d918b60c8288561e722bf437d82cb7ef2edcd98cfa38905d506", + "sha256:d65cc1df038ef55a99e617431f0553cd77763869eebdf9042403e16089fe746c", + "sha256:d7da2e1d5f558c37d6e8c1246f1aec1e7349e4913d8fb3cb289a35de573fe2eb" ], "index": "pypi", - "version": "==0.790" + "version": "==0.812" }, "mypy-extensions": { "hashes": [ diff --git a/graphql_compiler/compiler/compiler_frontend.py b/graphql_compiler/compiler/compiler_frontend.py index 185ceece0..3d84b8e39 100644 --- a/graphql_compiler/compiler/compiler_frontend.py +++ b/graphql_compiler/compiler/compiler_frontend.py @@ -133,7 +133,7 @@ from .validation import validate_schema_and_query_ast -@dataclass(frozen=True) +@dataclass(init=True, repr=True, eq=False, frozen=True) class OutputMetadata: """Metadata about a query's outputs.""" diff --git a/graphql_compiler/query_planning/make_query_plan.py b/graphql_compiler/query_planning/make_query_plan.py index 5de9128fe..fea2be85f 100644 --- a/graphql_compiler/query_planning/make_query_plan.py +++ b/graphql_compiler/query_planning/make_query_plan.py @@ -1,7 +1,8 @@ # Copyright 2019-present Kensho Technologies, LLC. from copy import copy from dataclasses import dataclass -from typing import FrozenSet, List, Optional, Tuple, cast +from typing import FrozenSet, List, Optional, Tuple, Union, cast +from uuid import uuid4 from graphql import print_ast from graphql.language.ast import ( @@ -19,9 +20,19 @@ from graphql.pyutils import FrozenList from ..ast_manipulation import get_only_query_definition +from ..compiler.common import ( + compile_graphql_to_cypher, + compile_graphql_to_gremlin, + compile_graphql_to_match, + compile_graphql_to_sql, +) +from ..compiler.sqlalchemy_extensions import print_sqlalchemy_query_string from ..exceptions import GraphQLValidationError +from ..global_utils import QueryStringWithParameters from ..schema import FilterDirective, OutputDirective +from ..schema.schema_info import CommonSchemaInfo, SQLAlchemySchemaInfo from ..schema_transformation.split_query import AstType, SubQueryNode +from .typedefs import BackendType, ProviderMetadata, QueryPlan, SimpleExecute @dataclass @@ -66,6 +77,80 @@ class QueryPlanDescriptor: output_join_descriptors: List[OutputJoinDescriptor] +def _make_simple_execute_node( + query_and_parameters: QueryStringWithParameters, + schema_info: Union[CommonSchemaInfo, SQLAlchemySchemaInfo], + provider_id: str, + backend_type: BackendType, +) -> SimpleExecute: + """Generate a SimpleExecuteNode. + + Args: + query_and_parameters: the query and parameters for which to create a SimpleExecute. + schema_info: schema information to use for query compilation. + provider_id: the identifier of the provider to be queried. + backend_type: the backend type. Note: if schema_info is of type SQLAlchemySchemaInfo, the + backend_type must be a flavor of SQL. + + Returns: + SimpleExecute with all necessary information to execute the given query_and_parameters. + + """ + # Ensure that if a SQL schema info object is passed, that the backend type is a SQL dialect. + if isinstance(schema_info, SQLAlchemySchemaInfo): + # Compiled SQL query. + compilation_result = compile_graphql_to_sql(schema_info, query_and_parameters.query_string) + query = print_sqlalchemy_query_string(compilation_result.query, schema_info.dialect) + + # Determine if the SQL dialect requires post-processing. + if backend_type == BackendType.mssql: + requires_fold_postprocessing = True + elif backend_type == BackendType.postgresql: + requires_fold_postprocessing = False + else: + raise AssertionError( + f"Received SQLAlchemySchemaInfo, but a non-SQL backend type {backend_type}." + ) + + provider_metadata = ProviderMetadata( + backend_type=backend_type, + requires_fold_postprocessing=requires_fold_postprocessing, + ) + # All other backends use CommonSchemaInfo. + else: + if backend_type == BackendType.cypher: + compilation_result = compile_graphql_to_cypher( + schema_info, query_and_parameters.query_string + ) + elif backend_type == BackendType.gremlin: + compilation_result = compile_graphql_to_gremlin( + schema_info, query_and_parameters.query_string + ) + elif backend_type == BackendType.match: + compilation_result = compile_graphql_to_match( + schema_info, query_and_parameters.query_string + ) + # TODO(selene): add InterpreterAdapter based backends + else: + raise AssertionError(f"Unknown non-SQL backend_type {backend_type}.") + # Extract the query and create provider_metadata (process is the same across Cypher, + # Gremlin, and Match backends) + query = compilation_result.query + provider_metadata = ProviderMetadata( + backend_type=backend_type, requires_fold_postprocessing=False + ) + + return SimpleExecute( + str(uuid4()), + provider_id, + provider_metadata, + query, + query_and_parameters.parameters, + compilation_result.output_metadata, + compilation_result.input_metadata, + ) + + def _make_query_plan_recursive( sub_query_node: SubQueryNode, sub_query_plan: SubQueryPlan, @@ -253,7 +338,9 @@ def _get_in_collection_filter_directive(input_filter_name: str) -> DirectiveNode def _get_plan_and_depth_in_dfs_order(query_plan: SubQueryPlan) -> List[Tuple[SubQueryPlan, int]]: """Return a list of topologically sorted (query plan, depth) tuples.""" - def _get_plan_and_depth_in_dfs_order_helper(query_plan, depth): + def _get_plan_and_depth_in_dfs_order_helper( + query_plan: SubQueryPlan, depth: int + ) -> List[Tuple[SubQueryPlan, int]]: plan_and_depth_in_dfs_order = [(query_plan, depth)] for child_query_plan in query_plan.child_query_plans: plan_and_depth_in_dfs_order.extend( @@ -269,6 +356,56 @@ def _get_plan_and_depth_in_dfs_order_helper(query_plan, depth): ###### +def generate_query_plan( + query_and_parameters: QueryStringWithParameters, + provider_id: str, + *, + desired_page_size: Optional[int] = 5000, +) -> QueryPlan: + """Generate a query plan for query_and_parameters targeting data source 'provider_id'. + + Note: this functionality is intended to replace make_query_plan below. TODO(selene) + + Args: + query_and_parameters: the query and parameters for which to create a QueryPlan. + provider_id: the identifier of the provider to be queried. + desired_page_size: target page size. None indicates that pagination should be disabled. + The benefit of pagination with this sync executor is that: + - You can receive results as they arrive (see execute_query_plan) + - The load on the database is more controlled, and timeouts are + less likely. + The desired_page_size is fulfilled on a best-effort basis; setting + a particular desired_page_size does not guarantee pages of precisely + that size. Setting desired_page_size to a number less than 1000 is + not recommended, since the server is unlikely to have sufficiently + granular statistics to accomplish that, and the client is more likely + to fetch many entirely empty pages. Note: this is not currently used, + but will be in the near future! TODO(selene): implement ParallelPaginated + + Returns: + QueryPlan for the given query_and_parameters against the data source specified + by the provider_id. + + """ + # TODO(selene): look up schema_info and backend_type by provider_id + schema_info = None + backend_type = None + query_plan_node = _make_simple_execute_node( + query_and_parameters, schema_info, provider_id, backend_type + ) + + version = 1 + return QueryPlan( + version, + provider_id, + query_and_parameters.query_string, + query_and_parameters.parameters, + desired_page_size, + query_plan_node.output_metadata, + query_plan_node, + ) + + def make_query_plan( root_sub_query_node: SubQueryNode, intermediate_output_names: FrozenSet[str] ) -> QueryPlanDescriptor: diff --git a/graphql_compiler/query_planning/sync_query_execution.py b/graphql_compiler/query_planning/sync_query_execution.py new file mode 100644 index 000000000..e8453bf90 --- /dev/null +++ b/graphql_compiler/query_planning/sync_query_execution.py @@ -0,0 +1,50 @@ +# Copyright 2021-present Kensho Technologies, LLC. +from typing import Any, Callable, ContextManager, Dict, Iterable, Mapping + +from ..post_processing.sql_post_processing import post_process_mssql_folds +from ..query_formatting.common import deserialize_multiple_arguments +from .typedefs import QueryExecutionFunc, QueryPlan, SimpleExecute + + +def _execute_simple_execute_node( + provider_client_makers: Dict[str, Callable[[], ContextManager[QueryExecutionFunc]]], + query_plan_node: SimpleExecute, +) -> Iterable[Mapping[str, Any]]: + """Execute a SimpleExecute.""" + provider_client_maker = provider_client_makers[query_plan_node.provider_id] + arguments = deserialize_multiple_arguments( + query_plan_node.arguments, query_plan_node.input_metadata + ) + requires_postprocessing = query_plan_node.provider_metadata.requires_fold_postprocessing + + with provider_client_maker() as query_client: + result = query_client(query_plan_node.query, query_plan_node.input_metadata, arguments) + + # Apply post processing for MSSQL folds if applicable. + if requires_postprocessing: + list_dict_result = [dict(entry) for entry in result] + post_process_mssql_folds(list_dict_result, query_plan_node.output_metadata) + return list_dict_result + + # Otherwise, return result as is. + return result + + +###### +# Public API +###### + + +def execute_query_plan( + provider_client_makers: Dict[str, Callable[[], ContextManager[QueryExecutionFunc]]], + query_plan: QueryPlan, +) -> Iterable[Mapping[str, Any]]: + """Execute a QueryPlan.""" + # TODO(selene): figure out if we want some sort of class to contain provider_client_makers + if isinstance(query_plan.plan_root_node, SimpleExecute): + return _execute_simple_execute_node(provider_client_makers, query_plan.plan_root_node) + else: + raise NotImplementedError( + f"Received plan_root_node with unsupported type " + f"{type(query_plan.plan_root_node)}: {query_plan.plan_root_node}" + )