Skip to content
This repository was archived by the owner on Feb 8, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions query_execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ if (ENABLE_DISTRIBUTED)
endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
glog
quickstep_catalog_CatalogDatabaseLite
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_PolicyEnforcerBase
quickstep_queryexecution_QueryExecutionMessages_proto
Expand All @@ -199,6 +200,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
quickstep_queryoptimizer_QueryHandle
quickstep_transaction_DatabaseLockAdmissionControl
quickstep_utility_Macros
tmb
${GFLAGS_LIB_NAME})
Expand Down
6 changes: 3 additions & 3 deletions query_execution/PolicyEnforcerBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class PolicyEnforcerBase {
* @return True if all the queries were admitted, false if at least one query
* was not admitted.
**/
bool admitQueries(const std::vector<QueryHandle*> &query_handles);
virtual bool admitQueries(const std::vector<QueryHandle*> &query_handles);

/**
* @brief Remove a given query that is under execution.
Expand All @@ -92,7 +92,7 @@ class PolicyEnforcerBase {
*
* @param query_id The ID of the query to be removed.
**/
void removeQuery(const std::size_t query_id);
virtual void removeQuery(const std::size_t query_id);

/**
* @brief Process a message sent to the Foreman, which gets passed on to the
Expand All @@ -108,7 +108,7 @@ class PolicyEnforcerBase {
* @return True if there is at least one active or waiting query, false if
* the policy enforcer doesn't have any query.
**/
inline bool hasQueries() const {
virtual bool hasQueries() const {
return !(admitted_queries_.empty() && waiting_queries_.empty());
}

Expand Down
29 changes: 27 additions & 2 deletions query_execution/PolicyEnforcerSingleNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <unordered_map>
#include <vector>

#include "catalog/CatalogDatabaseLite.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryExecutionState.hpp"
#include "query_execution/QueryManagerBase.hpp"
Expand All @@ -43,6 +44,11 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
" can be allocated in a single round of dispatch of messages to"
" the workers.");

void PolicyEnforcerSingleNode::removeQuery(const std::size_t query_id) {
admission_control_.signalTransactionCompletion(query_id);
admitted_queries_.erase(query_id);
}

void PolicyEnforcerSingleNode::getWorkerMessages(
std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
// Iterate over admitted queries until either there are no more
Expand Down Expand Up @@ -86,13 +92,21 @@ void PolicyEnforcerSingleNode::getWorkerMessages(
removeQuery(finished_qid);
}
}
bool PolicyEnforcerSingleNode::hasQueries() const {
return admission_control_.getTotalTransactionsCount() > 0u;
}

bool PolicyEnforcerSingleNode::admitQuery(QueryHandle *query_handle) {
if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
std::vector<std::pair<transaction::ResourceId, transaction::AccessMode>> resource_requests;
resource_requests.push_back(std::make_pair(transaction::ResourceId(catalog_database_->getID()),
transaction::AccessMode::XLockMode()));
if (admission_control_.admitTransaction(query_handle->query_id(), resource_requests)) {
// if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
// Query with the same ID not present, ok to admit.
// TODO(harshad) - Remove the redundancy in storing the admitted queries.
admitted_queries_[query_id].reset(
new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle,
catalog_database_, storage_manager_, bus_));
Expand All @@ -103,9 +117,20 @@ bool PolicyEnforcerSingleNode::admitQuery(QueryHandle *query_handle) {
}
} else {
// This query will have to wait.
waiting_queries_.push(query_handle);
return false;
}
}

bool PolicyEnforcerSingleNode::admitQueries(const std::vector<QueryHandle *> &query_handles) {
DCHECK(!query_handles.empty());

bool all_queries_admitted = true;
for (QueryHandle *curr_query : query_handles) {
if (all_queries_admitted) {
all_queries_admitted = admitQuery(curr_query);
}
}
return all_queries_admitted;
}

} // namespace quickstep
6 changes: 6 additions & 0 deletions query_execution/PolicyEnforcerSingleNode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/WorkerDirectory.hpp"
#include "transaction/DatabaseLockAdmissionControl.hpp"
#include "utility/Macros.hpp"

#include "tmb/id_typedefs.h"
Expand Down Expand Up @@ -87,6 +88,9 @@ class PolicyEnforcerSingleNode final : public PolicyEnforcerBase {
**/
void getWorkerMessages(
std::vector<std::unique_ptr<WorkerMessage>> *worker_messages);
bool admitQueries(const std::vector<QueryHandle *> &query_handles) override;
void removeQuery(const std::size_t query_id) override;
bool hasQueries() const override;

private:
void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override {
Expand All @@ -101,6 +105,8 @@ class PolicyEnforcerSingleNode final : public PolicyEnforcerBase {

tmb::MessageBus *bus_;

transaction::DatabaseLockAdmissionControl admission_control_;

DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerSingleNode);
};

Expand Down
1 change: 1 addition & 0 deletions query_optimizer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ target_link_libraries(quickstep_queryoptimizer_LIPFilterGenerator
quickstep_utility_lipfilter_LIPFilter_proto)
target_link_libraries(quickstep_queryoptimizer_LogicalGenerator
glog
quickstep_catalog_CatalogRelation
quickstep_parser_ParseStatement
quickstep_queryoptimizer_OptimizerContext
quickstep_queryoptimizer_logical_Logical
Expand Down
3 changes: 2 additions & 1 deletion query_optimizer/LogicalGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <vector>

#include "parser/ParseStatement.hpp"

#include "query_optimizer/OptimizerContext.hpp"
#include "query_optimizer/Validator.hpp"
#include "query_optimizer/logical/Logical.hpp"
Expand Down Expand Up @@ -58,6 +57,8 @@ L::LogicalPtr LogicalGenerator::generatePlan(
optimizePlan();
DVLOG(4) << "Optimized logical plan:\n" << logical_plan_->toString();

referenced_base_relations_ = resolver.getReferencedBaseRelations();

return logical_plan_;
}

Expand Down
12 changes: 12 additions & 0 deletions query_optimizer/LogicalGenerator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_GENERATOR_HPP_
#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_GENERATOR_HPP_

#include <vector>

#include "query_optimizer/logical/Logical.hpp"
#include "utility/Macros.hpp"

namespace quickstep {

class CatalogDatabase;
class CatalogRelation;
class ParseStatement;

namespace optimizer {
Expand Down Expand Up @@ -69,6 +72,13 @@ class LogicalGenerator {
logical::LogicalPtr generatePlan(const CatalogDatabase &catalog_database,
const ParseStatement &parse_statement);

/**
* @brief Return the referenced base relations in this query.
*/
const std::vector<const CatalogRelation *> getReferencedBaseRelations() const {
return referenced_base_relations_;
}

private:
/**
* @brief Applies rules to the logical plan.
Expand All @@ -78,6 +88,8 @@ class LogicalGenerator {
OptimizerContext *optimizer_context_;
logical::LogicalPtr logical_plan_;

std::vector<const CatalogRelation*> referenced_base_relations_;

DISALLOW_COPY_AND_ASSIGN(LogicalGenerator);
};

Expand Down
2 changes: 2 additions & 0 deletions query_optimizer/Optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ void Optimizer::generateQueryHandle(const ParseStatement &parse_statement,
execution_generator.generatePlan(
physical_generator.generatePlan(
logical_generator.generatePlan(*catalog_database, parse_statement)));

query_handle->setReferencedBaseRelations(logical_generator.getReferencedBaseRelations());
}

} // namespace optimizer
Expand Down
1 change: 1 addition & 0 deletions query_optimizer/Optimizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
namespace quickstep {

class CatalogDatabase;
class CatalogRelation;
class ParseStatement;
class QueryHandle;

Expand Down
20 changes: 20 additions & 0 deletions query_optimizer/QueryHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <vector>

#include "catalog/Catalog.pb.h"
#include "catalog/CatalogTypedefs.hpp"
Expand Down Expand Up @@ -178,6 +179,21 @@ class QueryHandle {
query_result_relation_ = relation;
}

/**
* @brief Get the referenced base relations in this query.
**/
const std::vector<const CatalogRelation *>& getReferencedBaseRelations() const {
return referenced_base_relations_;
}

/**
* @brief Set the referenced base relations in this query.
* @param referenced_base_relations_ The list of referenced base relations in this query.
**/
void setReferencedBaseRelations(const std::vector<const CatalogRelation *> &referenced_base_relations) {
referenced_base_relations_ = referenced_base_relations;
}

#ifdef QUICKSTEP_DISTRIBUTED
/**
* @brief Whether the query will be executed in the single node.
Expand Down Expand Up @@ -214,6 +230,10 @@ class QueryHandle {
// and deleted by the Cli shell.
const CatalogRelation *query_result_relation_;

std::vector<const CatalogRelation*> referenced_base_relations_;

private:

#ifdef QUICKSTEP_DISTRIBUTED
// Indicate whether the query should be executed on the default Shiftboss for
// correctness purpose.
Expand Down
2 changes: 2 additions & 0 deletions query_optimizer/resolver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
quickstep_queryoptimizer_logical_InsertTuple
quickstep_queryoptimizer_logical_Logical
quickstep_queryoptimizer_logical_MultiwayCartesianJoin
quickstep_queryoptimizer_logical_PatternMatcher
quickstep_queryoptimizer_logical_Project
quickstep_queryoptimizer_logical_Sample
quickstep_queryoptimizer_logical_SetOperation
Expand All @@ -119,6 +120,7 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
quickstep_queryoptimizer_logical_UpdateTable
quickstep_queryoptimizer_logical_WindowAggregate
quickstep_queryoptimizer_resolver_NameResolver
quickstep_queryoptimizer_rules_ReferencedBaseRelations
quickstep_storage_StorageBlockLayout_proto
quickstep_storage_StorageConstants
quickstep_types_IntType
Expand Down
Loading