Skip to content
This repository was archived by the owner on Feb 6, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion graphql_compiler/compiler/compiler_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
from .validation import validate_schema_and_query_ast


@dataclass(frozen=True)
@dataclass(init=True, repr=True, eq=False, frozen=True)
class OutputMetadata:
"""Metadata about a query's outputs."""

Expand Down
141 changes: 139 additions & 2 deletions graphql_compiler/query_planning/make_query_plan.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright 2019-present Kensho Technologies, LLC.
from copy import copy
from dataclasses import dataclass
from typing import FrozenSet, List, Optional, Tuple, cast
from typing import FrozenSet, List, Optional, Tuple, Union, cast
from uuid import uuid4

from graphql import print_ast
from graphql.language.ast import (
Expand All @@ -19,9 +20,19 @@
from graphql.pyutils import FrozenList

from ..ast_manipulation import get_only_query_definition
from ..compiler.common import (
compile_graphql_to_cypher,
compile_graphql_to_gremlin,
compile_graphql_to_match,
compile_graphql_to_sql,
)
from ..compiler.sqlalchemy_extensions import print_sqlalchemy_query_string
from ..exceptions import GraphQLValidationError
from ..global_utils import QueryStringWithParameters
from ..schema import FilterDirective, OutputDirective
from ..schema.schema_info import CommonSchemaInfo, SQLAlchemySchemaInfo
from ..schema_transformation.split_query import AstType, SubQueryNode
from .typedefs import BackendType, ProviderMetadata, QueryPlan, SimpleExecute


@dataclass
Expand Down Expand Up @@ -66,6 +77,80 @@ class QueryPlanDescriptor:
output_join_descriptors: List[OutputJoinDescriptor]


def _make_simple_execute_node(
query_and_parameters: QueryStringWithParameters,
schema_info: Union[CommonSchemaInfo, SQLAlchemySchemaInfo],
provider_id: str,
backend_type: BackendType,
) -> SimpleExecute:
"""Generate a SimpleExecuteNode.

Args:
query_and_parameters: the query and parameters for which to create a SimpleExecute.
schema_info: schema information to use for query compilation.
provider_id: the identifier of the provider to be queried.
backend_type: the backend type. Note: if schema_info is of type SQLAlchemySchemaInfo, the
backend_type must be a flavor of SQL.

Returns:
SimpleExecute with all necessary information to execute the given query_and_parameters.

"""
# Ensure that if a SQL schema info object is passed, that the backend type is a SQL dialect.
if isinstance(schema_info, SQLAlchemySchemaInfo):
# Compiled SQL query.
compilation_result = compile_graphql_to_sql(schema_info, query_and_parameters.query_string)
query = print_sqlalchemy_query_string(compilation_result.query, schema_info.dialect)

# Determine if the SQL dialect requires post-processing.
if backend_type == BackendType.mssql:
requires_fold_postprocessing = True
elif backend_type == BackendType.postgresql:
requires_fold_postprocessing = False
else:
raise AssertionError(
f"Received SQLAlchemySchemaInfo, but a non-SQL backend type {backend_type}."
)

provider_metadata = ProviderMetadata(
backend_type=backend_type,
requires_fold_postprocessing=requires_fold_postprocessing,
)
# All other backends use CommonSchemaInfo.
else:
if backend_type == BackendType.cypher:
compilation_result = compile_graphql_to_cypher(
schema_info, query_and_parameters.query_string
)
elif backend_type == BackendType.gremlin:
compilation_result = compile_graphql_to_gremlin(
schema_info, query_and_parameters.query_string
)
elif backend_type == BackendType.match:
compilation_result = compile_graphql_to_match(
schema_info, query_and_parameters.query_string
)
# TODO(selene): add InterpreterAdapter based backends
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure of how to deal with InterpreterAdapters. The public methods are interpret_ir and interpret_query. SimpleExecute is set up to contain a query string, which is the input of interpret_query, but that function also requires the schema to be passed. It seems like to want to use the schema to compile the query in this function, but then our output (the equivalent of the query in SimpleExecute) would be IrAndMetadata rather than a query string. Making the type of query be Union[str, IrAndMetadata] doesn't really seem to make sense, but maybe it would be more tolerable if we renamed query to backend_input or something similar? Thoughts?

else:
raise AssertionError(f"Unknown non-SQL backend_type {backend_type}.")
# Extract the query and create provider_metadata (process is the same across Cypher,
# Gremlin, and Match backends)
query = compilation_result.query
provider_metadata = ProviderMetadata(
backend_type=backend_type, requires_fold_postprocessing=False
)

return SimpleExecute(
str(uuid4()),
provider_id,
provider_metadata,
query,
query_and_parameters.parameters,
compilation_result.output_metadata,
compilation_result.input_metadata,
)


def _make_query_plan_recursive(
sub_query_node: SubQueryNode,
sub_query_plan: SubQueryPlan,
Expand Down Expand Up @@ -253,7 +338,9 @@ def _get_in_collection_filter_directive(input_filter_name: str) -> DirectiveNode
def _get_plan_and_depth_in_dfs_order(query_plan: SubQueryPlan) -> List[Tuple[SubQueryPlan, int]]:
"""Return a list of topologically sorted (query plan, depth) tuples."""

def _get_plan_and_depth_in_dfs_order_helper(query_plan, depth):
def _get_plan_and_depth_in_dfs_order_helper(
query_plan: SubQueryPlan, depth: int
) -> List[Tuple[SubQueryPlan, int]]:
plan_and_depth_in_dfs_order = [(query_plan, depth)]
for child_query_plan in query_plan.child_query_plans:
plan_and_depth_in_dfs_order.extend(
Expand All @@ -269,6 +356,56 @@ def _get_plan_and_depth_in_dfs_order_helper(query_plan, depth):
######


def generate_query_plan(
query_and_parameters: QueryStringWithParameters,
provider_id: str,
*,
desired_page_size: Optional[int] = 5000,
) -> QueryPlan:
"""Generate a query plan for query_and_parameters targeting data source 'provider_id'.

Note: this functionality is intended to replace make_query_plan below. TODO(selene)

Args:
query_and_parameters: the query and parameters for which to create a QueryPlan.
provider_id: the identifier of the provider to be queried.
desired_page_size: target page size. None indicates that pagination should be disabled.
The benefit of pagination with this sync executor is that:
- You can receive results as they arrive (see execute_query_plan)
- The load on the database is more controlled, and timeouts are
less likely.
The desired_page_size is fulfilled on a best-effort basis; setting
a particular desired_page_size does not guarantee pages of precisely
that size. Setting desired_page_size to a number less than 1000 is
not recommended, since the server is unlikely to have sufficiently
granular statistics to accomplish that, and the client is more likely
to fetch many entirely empty pages. Note: this is not currently used,
but will be in the near future! TODO(selene): implement ParallelPaginated

Returns:
QueryPlan for the given query_and_parameters against the data source specified
by the provider_id.

"""
# TODO(selene): look up schema_info and backend_type by provider_id
schema_info = None
backend_type = None
query_plan_node = _make_simple_execute_node(
query_and_parameters, schema_info, provider_id, backend_type
)

version = 1
return QueryPlan(
version,
provider_id,
query_and_parameters.query_string,
query_and_parameters.parameters,
desired_page_size,
query_plan_node.output_metadata,
query_plan_node,
)


def make_query_plan(
root_sub_query_node: SubQueryNode, intermediate_output_names: FrozenSet[str]
) -> QueryPlanDescriptor:
Expand Down
50 changes: 50 additions & 0 deletions graphql_compiler/query_planning/sync_query_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2021-present Kensho Technologies, LLC.
from typing import Any, Callable, ContextManager, Dict, Iterable, Mapping

from ..post_processing.sql_post_processing import post_process_mssql_folds
from ..query_formatting.common import deserialize_multiple_arguments
from .typedefs import QueryExecutionFunc, QueryPlan, SimpleExecute


def _execute_simple_execute_node(
provider_client_makers: Dict[str, Callable[[], ContextManager[QueryExecutionFunc]]],
query_plan_node: SimpleExecute,
) -> Iterable[Mapping[str, Any]]:
"""Execute a SimpleExecute."""
provider_client_maker = provider_client_makers[query_plan_node.provider_id]
arguments = deserialize_multiple_arguments(
query_plan_node.arguments, query_plan_node.input_metadata
)
requires_postprocessing = query_plan_node.provider_metadata.requires_fold_postprocessing

with provider_client_maker() as query_client:
result = query_client(query_plan_node.query, query_plan_node.input_metadata, arguments)

# Apply post processing for MSSQL folds if applicable.
if requires_postprocessing:
list_dict_result = [dict(entry) for entry in result]
post_process_mssql_folds(list_dict_result, query_plan_node.output_metadata)
return list_dict_result

# Otherwise, return result as is.
return result


######
# Public API
######


def execute_query_plan(
provider_client_makers: Dict[str, Callable[[], ContextManager[QueryExecutionFunc]]],
query_plan: QueryPlan,
) -> Iterable[Mapping[str, Any]]:
"""Execute a QueryPlan."""
# TODO(selene): figure out if we want some sort of class to contain provider_client_makers
if isinstance(query_plan.plan_root_node, SimpleExecute):
return _execute_simple_execute_node(provider_client_makers, query_plan.plan_root_node)
else:
raise NotImplementedError(
f"Received plan_root_node with unsupported type "
f"{type(query_plan.plan_root_node)}: {query_plan.plan_root_node}"
)