Skip to content
This repository was archived by the owner on Feb 6, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ sphinx = ">=1.8,<2"

[packages] # Make sure to keep in sync with setup.py requirements.
ciso8601 = ">=2.1.3,<3"
dataclasses-json = ">=0.5.2,<0.6"
funcy = ">=1.7.3,<2"
graphql-core = ">=3.1.2,<3.2" # minor versions sometimes contain breaking changes
six = ">=1.10.0"
sqlalchemy = ">=1.3.0,<2"
sqlalchemy = ">=1.3.0,<1.4"

# The below is necessary to make a few pylint passes work properly, since pylint expects to be able
# to run "import graphql_compiler" in the environment in which it runs.
Expand Down
313 changes: 200 additions & 113 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion graphql_compiler/compiler/compiler_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
from .validation import validate_schema_and_query_ast


@dataclass(frozen=True)
@dataclass(init=True, repr=True, eq=False, frozen=True)
class OutputMetadata:
"""Metadata about a query's outputs."""

Expand Down
1 change: 1 addition & 0 deletions graphql_compiler/query_planning_and_execution/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright 2021-present Kensho Technologies, LLC.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright 2019-present Kensho Technologies, LLC.
from copy import copy
from dataclasses import dataclass
from typing import FrozenSet, List, Optional, Tuple, cast
from typing import FrozenSet, List, Optional, Tuple, Union, cast
from uuid import uuid4

from graphql import print_ast
from graphql.language.ast import (
Expand All @@ -17,11 +18,25 @@
StringValueNode,
)
from graphql.pyutils import FrozenList
from sqlalchemy.dialects.mssql.base import MSDialect

from ..ast_manipulation import get_only_query_definition
from ..compiler.common import (
CYPHER_LANGUAGE,
GREMLIN_LANGUAGE,
MATCH_LANGUAGE,
compile_graphql_to_cypher,
compile_graphql_to_gremlin,
compile_graphql_to_match,
compile_graphql_to_sql,
)
from ..compiler.sqlalchemy_extensions import print_sqlalchemy_query_string
from ..exceptions import GraphQLValidationError
from ..global_utils import QueryStringWithParameters
from ..schema import FilterDirective, OutputDirective
from .split_query import AstType, SubQueryNode
from ..schema.schema_info import CommonSchemaInfo, SQLAlchemySchemaInfo
from ..schema_transformation.split_query import AstType, SubQueryNode
from .typedefs import ProviderMetadata, QueryPlan, SimpleExecute


@dataclass
Expand Down Expand Up @@ -66,44 +81,65 @@ class QueryPlanDescriptor:
output_join_descriptors: List[OutputJoinDescriptor]


def make_query_plan(
root_sub_query_node: SubQueryNode, intermediate_output_names: FrozenSet[str]
) -> QueryPlanDescriptor:
"""Return a QueryPlanDescriptor, whose query ASTs have @filters added.

For each parent of parent and child SubQueryNodes, a new @filter directive will be added
in the child AST. It will be added on the field whose @output directive has the out_name
equal to the child's out name as specified in the QueryConnection. The newly added @filter
will be a 'in_collection' type filter, and the name of the local variable is guaranteed to
be the same as the out_name of the @output on the parent.

ASTs contained in the input node and its children nodes will not be modified.
def _make_simple_execute_node(
query_and_parameters: QueryStringWithParameters,
schema_info: Union[CommonSchemaInfo, SQLAlchemySchemaInfo],
provider_id: str,
backend_type: Optional[str],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not in love with the fact that this is Optional, but I don't have a concrete suggestion in mind. Maybe we can work together to figure something out?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched this to use the BackendType enum so I think it makes sense for it to not be optional. It is still a bit weird because I currently check if schema_info is of type SQLAlchemySchemaInfo that backend_type is either MSSQL or PostgreSQL, but I don't check if it matches the dialect in the schema info. If I add the check, we could run into the problem that you mentioned

SQLAlchemy version adds a new dialect for MSSQL that doesn't inherit from MSDialect — our code immediately becomes wrong in yet another place

so I'm not sure what the best solution is...

) -> SimpleExecute:
"""Generate a SimpleExecuteNode.

Args:
root_sub_query_node: representing the base of a query split into pieces
that we want to turn into a query plan.
intermediate_output_names: names of outputs to be removed at the end.
query_and_parameters: the query and parameters for which to create a SimpleExecute.
schema_info: schema information to use for query compilation.
provider_id: the identifier of the provider to be queried.
backend_type: the backend type. Note: this is inferred from the SQLAlchemySchemaInfo's
dialect for SQL backends.

Returns:
QueryPlanDescriptor containing a tree of SubQueryPlans that wrap around each individual
query AST, the set of intermediate output names that are to be removed at the end, and
information on which outputs are to be connect to which in what manner.
"""
output_join_descriptors: List[OutputJoinDescriptor] = []
SimpleExecute with all necessary information to execute the given query_and_parameters.

root_sub_query_plan = SubQueryPlan(
query_ast=root_sub_query_node.query_ast,
schema_id=root_sub_query_node.schema_id,
parent_query_plan=None,
child_query_plans=[],
)

_make_query_plan_recursive(root_sub_query_node, root_sub_query_plan, output_join_descriptors)
"""
if isinstance(schema_info, SQLAlchemySchemaInfo):
# Compiled SQL query.
compilation_result = compile_graphql_to_sql(schema_info, query_and_parameters.query_string)
query = print_sqlalchemy_query_string(compilation_result.query, schema_info.dialect)

provider_metadata = ProviderMetadata(
backend_type=schema_info.dialect.name,
requires_fold_postprocessing=isinstance(schema_info.dialect, MSDialect),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the reasons why I'd love backend_type to be non-optional and an enum is so that we can make this check stricter. Here's what I'm worried about: imagine a future SQLAlchemy version adds a new dialect for MSSQL that doesn't inherit from MSDialect — our code immediately becomes wrong in yet another place.

Another (and perhaps even better) alternative would be to add requires_fold_postprocessing or something like it to the SQLAlchemySchemaInfo. After all, that object knows the dialect (and therefore the emitted SQL) better than anything else, so realistically it should be the one specifying whether post-processing is required or not. From that perspective, ProviderMetadata would be viewed as simply representing the parts of SQLAlchemySchemaInfo that the client may need to know as part of executing the query, which is appealing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea! I will work on a separate PR to incorporate.

)
else:
if backend_type == CYPHER_LANGUAGE:
compilation_result = compile_graphql_to_cypher(
schema_info, query_and_parameters.query_string
)
elif backend_type == GREMLIN_LANGUAGE:
compilation_result = compile_graphql_to_gremlin(
schema_info, query_and_parameters.query_string
)
elif backend_type == MATCH_LANGUAGE:
compilation_result = compile_graphql_to_match(
schema_info, query_and_parameters.query_string
)
# TODO(selene): add InterpreterAdapter based backends
else:
raise AssertionError(f"Unknown non-SQL backend_type {backend_type}.")
# Extract the query and create provider_metadata (process is the same across Cypher,
# Gremlin, and Match backends)
query = compilation_result.query
provider_metadata = ProviderMetadata(
backend_type=backend_type, requires_fold_postprocessing=False
)

return QueryPlanDescriptor(
root_sub_query_plan=root_sub_query_plan,
intermediate_output_names=intermediate_output_names,
output_join_descriptors=output_join_descriptors,
return SimpleExecute(
str(uuid4()),
provider_id,
provider_metadata,
query,
query_and_parameters.parameters,
compilation_result.output_metadata,
compilation_result.input_metadata,
)


Expand Down Expand Up @@ -291,6 +327,118 @@ def _get_in_collection_filter_directive(input_filter_name: str) -> DirectiveNode
)


def _get_plan_and_depth_in_dfs_order(query_plan: SubQueryPlan) -> List[Tuple[SubQueryPlan, int]]:
"""Return a list of topologically sorted (query plan, depth) tuples."""

def _get_plan_and_depth_in_dfs_order_helper(
query_plan: SubQueryPlan, depth: int
) -> List[Tuple[SubQueryPlan, int]]:
plan_and_depth_in_dfs_order = [(query_plan, depth)]
for child_query_plan in query_plan.child_query_plans:
plan_and_depth_in_dfs_order.extend(
_get_plan_and_depth_in_dfs_order_helper(child_query_plan, depth + 1)
)
return plan_and_depth_in_dfs_order

return _get_plan_and_depth_in_dfs_order_helper(query_plan, 0)


######
# Public API
######


def generate_query_plan(
query_and_parameters: QueryStringWithParameters,
provider_id: str,
*,
desired_page_size: Optional[int] = 5000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of on the fence about setting a default here. The right number will depend on each user's statistics collection configuration — aside from some special cases, we can't generate pages that are more fine-grained than the statistics backing them. Within Kensho, we always know and control what that configuration might be. But in general, we might not.

On the flip side, not setting a number would make pagination disabled by default, and that's also a default value — and not a good one. Even if the statistics aren't fine-grained enough, the compiler will emit advisories when paginating which should be an obvious and "soft" way to communicate the issue. And 5000 is a pretty decent compromise between large page sizes (for high throughput) and not blowing up the server or client.

What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the alternative is that we make this a required param? Do you think it would be better to force the user to make a conscious decision here since it does depend on user stats?

) -> QueryPlan:
"""Generate a query plan for query_and_parameters targeting data source 'provider_id'.

Note: this functionality is intended to replace make_query_plan below. TODO(selene)

Args:
query_and_parameters: the query and parameters for which to create a QueryPlan.
provider_id: the identifier of the provider to be queried.
desired_page_size: target page size. None indicates that pagination should be disabled.
The benefit of pagination with this sync executor is that:
- You can receive results as they arrive (see execute_query_plan)
- The load on the database is more controlled, and timeouts are
less likely.
The desired_page_size is fulfilled on a best-effort basis; setting
a particular desired_page_size does not guarantee pages of precisely
that size. Setting desired_page_size to a number less than 1000 is
not recommended, since the server is unlikely to have sufficiently
granular statistics to accomplish that, and the client is more likely
to fetch many entirely empty pages. Note: this is not currently used,
but will be in the near future! TODO(selene): implement ParallelPaginated

Returns:
QueryPlan for the given query_and_parameters against the data source specified
by the provider_id.

"""
# TODO(selene): look up schema_info and backend_type by provider_id
schema_info = None
backend_type = None
query_plan_node = _make_simple_execute_node(
query_and_parameters, schema_info, provider_id, backend_type
)

version = 1
return QueryPlan(
version,
provider_id,
query_and_parameters.query_string,
query_and_parameters.parameters,
desired_page_size,
query_plan_node.output_metadata,
query_plan_node,
)


def make_query_plan(
root_sub_query_node: SubQueryNode, intermediate_output_names: FrozenSet[str]
) -> QueryPlanDescriptor:
"""Return a QueryPlanDescriptor, whose query ASTs have @filters added.

For each parent of parent and child SubQueryNodes, a new @filter directive will be added
in the child AST. It will be added on the field whose @output directive has the out_name
equal to the child's out name as specified in the QueryConnection. The newly added @filter
will be a 'in_collection' type filter, and the name of the local variable is guaranteed to
be the same as the out_name of the @output on the parent.

ASTs contained in the input node and its children nodes will not be modified.

Args:
root_sub_query_node: representing the base of a query split into pieces
that we want to turn into a query plan.
intermediate_output_names: names of outputs to be removed at the end.

Returns:
QueryPlanDescriptor containing a tree of SubQueryPlans that wrap around each individual
query AST, the set of intermediate output names that are to be removed at the end, and
information on which outputs are to be connect to which in what manner.
"""
output_join_descriptors: List[OutputJoinDescriptor] = []

root_sub_query_plan = SubQueryPlan(
query_ast=root_sub_query_node.query_ast,
schema_id=root_sub_query_node.schema_id,
parent_query_plan=None,
child_query_plans=[],
)

_make_query_plan_recursive(root_sub_query_node, root_sub_query_plan, output_join_descriptors)

return QueryPlanDescriptor(
root_sub_query_plan=root_sub_query_plan,
intermediate_output_names=intermediate_output_names,
output_join_descriptors=output_join_descriptors,
)


def print_query_plan(query_plan_descriptor: QueryPlanDescriptor, indentation_depth: int = 4) -> str:
"""Return a string describing query plan."""
query_plan_strings = [""]
Expand All @@ -311,17 +459,3 @@ def print_query_plan(query_plan_descriptor: QueryPlanDescriptor, indentation_dep
query_plan_strings.append(str(query_plan_descriptor.intermediate_output_names) + "\n")

return "".join(query_plan_strings)


def _get_plan_and_depth_in_dfs_order(query_plan: SubQueryPlan) -> List[Tuple[SubQueryPlan, int]]:
"""Return a list of topologically sorted (query plan, depth) tuples."""

def _get_plan_and_depth_in_dfs_order_helper(query_plan, depth):
plan_and_depth_in_dfs_order = [(query_plan, depth)]
for child_query_plan in query_plan.child_query_plans:
plan_and_depth_in_dfs_order.extend(
_get_plan_and_depth_in_dfs_order_helper(child_query_plan, depth + 1)
)
return plan_and_depth_in_dfs_order

return _get_plan_and_depth_in_dfs_order_helper(query_plan, 0)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2021-present Kensho Technologies, LLC.
from typing import Any, Callable, ContextManager, Dict, Iterable, Mapping

from ..post_processing.sql_post_processing import post_process_mssql_folds
from ..query_formatting.common import deserialize_multiple_arguments
from .typedefs import QueryExecutionFunc, QueryPlan, SimpleExecute


def _execute_simple_execute_node(
provider_client_makers: Dict[str, Callable[[], ContextManager[QueryExecutionFunc]]],
query_plan_node: SimpleExecute,
) -> Iterable[Mapping[str, Any]]:
"""Execute a SimpleExecute."""
provider_client_maker = provider_client_makers[query_plan_node.provider_id]
arguments = deserialize_multiple_arguments(
query_plan_node.arguments, query_plan_node.input_metadata
)
requires_postprocessing = query_plan_node.provider_metadata.requires_fold_postprocessing

with provider_client_maker() as query_client:
result = query_client(query_plan_node.query, query_plan_node.input_metadata, arguments)

# Apply post processing for MSSQL folds if applicable.
if requires_postprocessing:
list_dict_result = [dict(entry) for entry in result]
post_process_mssql_folds(list_dict_result, query_plan_node.output_metadata)
return list_dict_result

# Otherwise, return result as is.
return result


######
# Public API
######


def execute_query_plan(
provider_client_makers: Dict[str, Callable[[], ContextManager[QueryExecutionFunc]]],
query_plan: QueryPlan,
) -> Iterable[Mapping[str, Any]]:
"""Execute a QueryPlan."""
# TODO(selene): figure out if we want some sort of class to contain provider_client_makers
if isinstance(query_plan.plan_root_node, SimpleExecute):
return _execute_simple_execute_node(provider_client_makers, query_plan.plan_root_node)
else:
raise NotImplementedError(
f"Received plan_root_node with unsupported type "
f"{type(query_plan.plan_root_node)}: {query_plan.plan_root_node}"
)
Loading