Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit f4f2c97

Browse files
committed
Make buffering consuemr thread-safe. It's ugly, but it works for now. We'll switch to something else.
1 parent 3838caa commit f4f2c97

File tree

2 files changed

+41
-36
lines changed

2 files changed

+41
-36
lines changed

src/codegen/buffering_consumer.cpp

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,27 +57,30 @@ BufferingConsumer::BufferingConsumer(const std::vector<oid_t> &cols,
5757
for (oid_t col_id : cols) {
5858
output_ais_.push_back(context.Find(col_id));
5959
}
60-
state.output = &tuples_;
6160
}
6261

63-
// Append the array of values (i.e., a tuple) into the consumer's buffer of
64-
// output tuples.
65-
void BufferingConsumer::BufferTuple(char *state, char *tuple,
62+
// Append the array of row attributes to the buffer of tuples
63+
//
64+
// Note: buffering consumers rely on an ugly mutex to protect access to the
65+
// output buffer. This is ugly AF. We don't actually use it for primary
66+
// query processing, so it's okay.
67+
void BufferingConsumer::BufferTuple(char *opaque_state, char *tuple,
6668
uint32_t num_cols) {
67-
BufferingState *buffer_state = reinterpret_cast<BufferingState *>(state);
68-
buffer_state->output->emplace_back(
69-
reinterpret_cast<peloton::type::Value *>(tuple), num_cols);
69+
auto *buffer = reinterpret_cast<Buffer *>(opaque_state);
70+
std::lock_guard<std::mutex> lock{buffer->mutex};
71+
buffer->output.emplace_back(reinterpret_cast<peloton::type::Value *>(tuple),
72+
num_cols);
7073
}
7174

7275
// Create two pieces of state: a pointer to the output tuple vector and an
7376
// on-stack value array representing a single tuple.
74-
void BufferingConsumer::Prepare(CompilationContext &ctx) {
77+
void BufferingConsumer::Prepare(CompilationContext &compilation_ctx) {
7578
// Be sure to call our parent
76-
ExecutionConsumer::Prepare(ctx);
79+
ExecutionConsumer::Prepare(compilation_ctx);
7780

7881
// Install a little char* for the state we need
79-
CodeGen &codegen = ctx.GetCodeGen();
80-
QueryState &query_state = ctx.GetQueryState();
82+
CodeGen &codegen = compilation_ctx.GetCodeGen();
83+
QueryState &query_state = compilation_ctx.GetQueryState();
8184
consumer_state_id_ =
8285
query_state.RegisterState("consumerState", codegen.CharPtrType());
8386
}
@@ -137,12 +140,23 @@ void BufferingConsumer::ConsumeResult(ConsumerContext &ctx,
137140
codegen.CallFunc(output_func, args);
138141
}
139142

143+
auto &query_state = ctx.GetQueryState();
144+
llvm::Value *buffer_ptr =
145+
query_state.LoadStateValue(codegen, consumer_state_id_);
146+
140147
// Append the tuple to the output buffer (by calling BufferTuple(...))
141-
auto *consumer_state = GetStateValue(ctx, consumer_state_id_);
142-
std::vector<llvm::Value *> args = {consumer_state, tuple_buffer_,
148+
std::vector<llvm::Value *> args = {buffer_ptr, tuple_buffer_,
143149
codegen.Const32(output_ais_.size())};
144150
codegen.Call(BufferingConsumerProxy::BufferTuple, args);
145151
}
146152

153+
char *BufferingConsumer::GetConsumerState() {
154+
return reinterpret_cast<char *>(&buffer_);
155+
}
156+
157+
const std::vector<WrappedTuple> &BufferingConsumer::GetOutputTuples() const {
158+
return buffer_.output;
159+
}
160+
147161
} // namespace codegen
148162
} // namespace peloton

src/include/codegen/buffering_consumer.h

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
//
77
// Identification: src/include/codegen/buffering_consumer.h
88
//
9-
// Copyright (c) 2015-2017, Carnegie Mellon University Database Group
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
1010
//
1111
//===----------------------------------------------------------------------===//
1212

1313
#pragma once
1414

1515
#include <vector>
16+
#include <mutex>
1617

1718
#include "codegen/compilation_context.h"
1819
#include "codegen/execution_consumer.h"
@@ -50,51 +51,41 @@ class WrappedTuple : public ContainerTuple<std::vector<peloton::type::Value>> {
5051
//===----------------------------------------------------------------------===//
5152
class BufferingConsumer : public ExecutionConsumer {
5253
public:
53-
struct BufferingState {
54-
std::vector<WrappedTuple> *output;
55-
};
56-
57-
// Constructor
54+
/// Constructor
5855
BufferingConsumer(const std::vector<oid_t> &cols,
5956
const planner::BindingContext &context);
6057

61-
void Prepare(CompilationContext &compilation_context) override;
58+
void Prepare(CompilationContext &compilation_ctx) override;
6259

6360
// Query state
6461
void InitializeQueryState(CompilationContext &) override {}
6562
void TearDownQueryState(CompilationContext &) override {}
6663

67-
// TODO(pmenon): Implement me
68-
bool SupportsParallelExec() const override { return false; }
64+
bool SupportsParallelExec() const override { return true; }
6965

7066
void ConsumeResult(ConsumerContext &ctx, RowBatch::Row &row) const override;
7167

72-
llvm::Value *GetStateValue(ConsumerContext &ctx,
73-
const QueryState::Id &id) const {
74-
auto &query_state = ctx.GetQueryState();
75-
return query_state.LoadStateValue(ctx.GetCodeGen(), id);
76-
}
77-
7868
// Called from compiled query code to buffer the tuple
79-
static void BufferTuple(char *state, char *tuple, uint32_t num_cols);
69+
static void BufferTuple(char *buffer, char *tuple, uint32_t num_cols);
8070

8171
//===--------------------------------------------------------------------===//
8272
// ACCESSORS
8373
//===--------------------------------------------------------------------===//
8474

85-
char *GetConsumerState() override { return reinterpret_cast<char *>(&state); }
75+
char *GetConsumerState() override;
8676

87-
const std::vector<WrappedTuple> &GetOutputTuples() const { return tuples_; }
77+
const std::vector<WrappedTuple> &GetOutputTuples() const;
8878

8979
private:
9080
// The attributes we want to output
9181
std::vector<const planner::AttributeInfo *> output_ais_;
9282

93-
// Buffered output tuples
94-
std::vector<WrappedTuple> tuples_;
95-
96-
// Running buffering state
97-
BufferingState state;
83+
// The thread-safe buffer of output tuples
84+
struct Buffer {
85+
std::mutex mutex;
86+
std::vector<WrappedTuple> output;
87+
};
88+
Buffer buffer_;
9889

9990
// The slot in the runtime state to find our state context
10091
QueryState::Id consumer_state_id_;

0 commit comments

Comments
 (0)