Skip to content

Commit 2ed271a

Browse files
committed
New Architecture
1 parent 46a398d commit 2ed271a

16 files changed

+859
-1161
lines changed

include/pika_command_collector.h

Lines changed: 4 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
* 3. Send commands in batches to the consensus coordinator with batch-level synchronization
3737
* 4. Support asynchronous callback notification of command processing results
3838
* 5. Track performance metrics for batch processing
39-
* 6. Provide intelligent retry mechanisms for failed batches
39+
4040
*/
4141
class PikaCommandCollector {
4242
public:
@@ -46,14 +46,13 @@ class PikaCommandCollector {
4646
/**
4747
* @brief constructor
4848
* @param coordinator consensus coordinator reference
49-
* @param batch_size batch size (number of commands)
50-
* @param batch_max_wait_time forced flush interval (milliseconds)
49+
* @param batch_max_wait_time maximum wait time in milliseconds
5150
*/
5251
// Constructor with raw pointer (original)
53-
PikaCommandCollector(ConsensusCoordinator* coordinator, size_t batch_size = 100, int batch_max_wait_time = 5);
52+
PikaCommandCollector(ConsensusCoordinator* coordinator, int batch_max_wait_time = 5);
5453

5554
// Constructor with shared_ptr (for compatibility with make_shared calls)
56-
PikaCommandCollector(std::shared_ptr<ConsensusCoordinator> coordinator, size_t batch_size = 100, int batch_max_wait_time = 5);
55+
PikaCommandCollector(std::shared_ptr<ConsensusCoordinator> coordinator, int batch_max_wait_time = 5);
5756

5857
~PikaCommandCollector();
5958

@@ -65,31 +64,6 @@ class PikaCommandCollector {
6564
*/
6665
bool AddCommand(const std::shared_ptr<Cmd>& cmd_ptr, CommandCallback callback);
6766

68-
/**
69-
* @brief Called periodically by external systems to process batches
70-
* @param force Force processing even if batch is not full or timeout not reached
71-
* @return Number of commands processed
72-
*/
73-
74-
/**
75-
* @brief Immediately process all currently collected commands
76-
* @return The number of commands processed
77-
*/
78-
size_t FlushCommands(bool force = false);
79-
80-
81-
/**
82-
* @brief Get the current number of pending commands
83-
* @return number of commands
84-
*/
85-
size_t PendingCommands() const;
86-
87-
/**
88-
* @brief Set the batch size
89-
* @param batch_size batch size
90-
*/
91-
void SetBatchSize(size_t batch_size);
92-
9367
/**
9468
* @brief Set the batch max wait time
9569
* @param batch_max_wait_time maximum wait time in milliseconds
@@ -101,91 +75,17 @@ class PikaCommandCollector {
10175
* @return Pair of (total_processed_commands, total_batches)
10276
*/
10377
std::pair<uint64_t, uint64_t> GetBatchStats() const;
104-
105-
/**
106-
* @brief Get average batch processing time in milliseconds
107-
* @return Average processing time or nullopt if no batches processed
108-
*/
109-
std::optional<double> GetAverageBatchTime() const;
110-
111-
private:
112-
113-
/**
114-
* @brief batch processing command
115-
* @param batch command batch
116-
* @return Whether the processing is successful
117-
*/
118-
pstd::Status ProcessBatch(const std::vector<std::shared_ptr<Cmd>>& commands,
119-
const std::vector<CommandCallback>& callbacks);
120-
121-
/**
122-
* @brief Check for conflicts based on command type and key name
123-
* @param cmd_ptr command pointer
124-
* @return true if there is a conflict (should be replaced), false if there is no conflict
125-
*/
126-
bool CheckConflict(const std::shared_ptr<Cmd>& cmd_ptr) const;
127-
128-
/**
129-
* @brief Handle key conflicts and remove conflicting commands
130-
* @param cmd_ptr new command
131-
*/
132-
void HandleConflict(const std::shared_ptr<Cmd>& cmd_ptr);
133-
134-
/**
135-
* @brief Retry batch processing commands
136-
* @param commands List of commands to retry
137-
* @param callbacks Corresponding callback function list
138-
* @param priority Priority level for the retry (higher means more urgent)
139-
* @return Whether the commands were successfully requeued
140-
*/
141-
bool RetryBatch(const std::vector<std::shared_ptr<Cmd>>& commands,
142-
const std::vector<CommandCallback>& callbacks,
143-
int priority = 100);
14478

14579
private:
14680
//Consensus coordinator reference
14781
ConsensusCoordinator* coordinator_;
14882

14983
// Batch processing configuration
150-
std::atomic<size_t> batch_size_;
15184
std::atomic<int> batch_max_wait_time_;
15285

153-
// Retry configuration
154-
std::atomic<int> max_retry_attempts_{3};
155-
std::atomic<int> retry_backoff_ms_{50};
156-
157-
// Command collection and processing
158-
mutable std::mutex mutex_;
159-
160-
// Pending command queue and callbacks
161-
std::list<std::pair<std::shared_ptr<Cmd>, CommandCallback>> pending_commands_;
162-
163-
// Priority queue for retries
164-
std::deque<std::tuple<int, std::vector<std::shared_ptr<Cmd>>, std::vector<CommandCallback>>> retry_queue_;
165-
166-
// Command key mapping, used to handle same-key conflicts
167-
std::unordered_map<std::string, std::list<std::pair<std::shared_ptr<Cmd>, CommandCallback>>::iterator> key_map_;
168-
16986
// Batch statistics
17087
std::atomic<uint64_t> total_processed_{0};
17188
std::atomic<uint64_t> total_batches_{0};
172-
std::atomic<uint64_t> total_retries_{0};
173-
std::atomic<uint64_t> total_conflicts_{0};
174-
std::atomic<uint64_t> total_batch_time_ms_{0};
175-
std::chrono::time_point<std::chrono::steady_clock> batch_start_time_;
176-
177-
// Performance tracking
178-
struct BatchMetrics {
179-
uint64_t batch_size;
180-
uint64_t processing_time_ms;
181-
uint64_t wait_time_ms;
182-
bool successful;
183-
};
184-
185-
// Circular buffer for recent batch metrics
186-
static constexpr size_t kMetricsBufferSize = 100;
187-
std::vector<BatchMetrics> recent_metrics_;
188-
std::mutex metrics_mutex_;
18989
};
19090

19191
#endif // PIKA_COMMAND_COLLECTOR_H_

include/pika_command_queue.h

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
2+
// This source code is licensed under the BSD-style license found in the
3+
// LICENSE file in the root directory of this source tree. An additional grant
4+
// of patent rights can be found in the PATENTS file in the same directory.
5+
6+
#ifndef PIKA_COMMAND_QUEUE_H_
7+
#define PIKA_COMMAND_QUEUE_H_
8+
9+
#include <memory>
10+
#include <queue>
11+
#include <vector>
12+
#include <string>
13+
#include <functional>
14+
#include <atomic>
15+
#include <condition_variable>
16+
#include <mutex>
17+
18+
#include "pstd/include/pstd_mutex.h"
19+
#include "pstd/include/env.h"
20+
#include "include/pika_command.h"
21+
#include "include/pika_define.h"
22+
#include "pstd/include/env.h"
23+
#include "include/pika_define.h"
24+
25+
// Callback function type for command completion notification
26+
using CommandCallback = std::function<void(const LogOffset&, pstd::Status)>;
27+
28+
// Structure representing a batch of commands
29+
struct CommandBatch {
30+
std::vector<std::shared_ptr<Cmd>> commands;
31+
std::vector<CommandCallback> callbacks;
32+
uint64_t batch_id;
33+
uint64_t create_time;
34+
std::string db_name;
35+
std::vector<LogOffset> binlog_offsets; // Binlog offsets for each command
36+
37+
CommandBatch(const std::vector<std::shared_ptr<Cmd>>& cmds,
38+
const std::vector<CommandCallback>& cbs,
39+
const std::string& db)
40+
: commands(cmds), callbacks(cbs), db_name(db) {
41+
static std::atomic<uint64_t> next_id{1};
42+
batch_id = next_id.fetch_add(1);
43+
create_time = pstd::NowMicros();
44+
}
45+
46+
bool Empty() const {
47+
return commands.empty();
48+
}
49+
50+
size_t Size() const {
51+
return commands.size();
52+
}
53+
};
54+
55+
// New structure to group multiple CommandBatches for RocksDB processing
56+
struct BatchGroup {
57+
std::vector<std::shared_ptr<CommandBatch>> batches;
58+
LogOffset end_offset; // Only store the final offset of the last batch
59+
BatchGroup() = default;
60+
BatchGroup(const std::vector<std::shared_ptr<CommandBatch>>& batches,
61+
const LogOffset& final_offset)
62+
: batches(batches), end_offset(final_offset) {}
63+
bool Empty() const { return batches.empty(); }
64+
size_t BatchCount() const { return batches.size(); }
65+
};
66+
67+
// Thread-safe command queue for batched command processing
68+
class CommandQueue {
69+
public:
70+
explicit CommandQueue(size_t max_size);
71+
~CommandQueue();
72+
73+
// Enqueue a command batch (blocking if queue is full)
74+
bool EnqueueBatch(std::shared_ptr<CommandBatch> batch);
75+
76+
// Dequeue a command batch (blocking if queue is empty)
77+
std::shared_ptr<CommandBatch> DequeueBatch();
78+
79+
// Dequeue all available batches (non-blocking)
80+
std::vector<std::shared_ptr<CommandBatch>> DequeueAllBatches();
81+
82+
// Get current queue size
83+
size_t Size() const;
84+
85+
// Check if queue is empty
86+
bool Empty() const;
87+
88+
// Shutdown the queue
89+
void Shutdown();
90+
91+
private:
92+
std::queue<std::shared_ptr<CommandBatch>> cmd_queue_;
93+
mutable std::mutex queue_mutex_;
94+
std::condition_variable queue_cv_;
95+
size_t max_size_;
96+
std::atomic<bool> shutdown_{false};
97+
};
98+
99+
#endif // PIKA_COMMAND_QUEUE_H_

include/pika_consensus.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,6 @@ class ConsensusCoordinator {
155155
pstd::Status Reset(const LogOffset& offset);
156156

157157
pstd::Status ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
158-
// Batch processing of commands
159-
pstd::Status BatchProposeLog(const std::vector<std::shared_ptr<Cmd>>& cmd_ptrs, std::vector<LogOffset>* offsets);
160158
pstd::Status UpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end);
161159
pstd::Status AddSlaveNode(const std::string& ip, int port, int session_id);
162160
pstd::Status RemoveSlaveNode(const std::string& ip, int port);
@@ -268,8 +266,6 @@ class ConsensusCoordinator {
268266
void BatchInternalApplyFollower(const std::vector<std::shared_ptr<Cmd>>& cmd_ptrs);
269267
pstd::Status ProcessCoordination();
270268

271-
// Batch operations for slave entries
272-
pstd::Status BatchPersistAppendBinlog(const std::vector<std::shared_ptr<Cmd>>& cmd_ptrs,const std::vector<BinlogItem>& attributes,std::vector<LogOffset>* offsets);
273269
LogOffset GetCommittedId() {
274270
std::lock_guard l(committed_id_rwlock_);
275271
return committed_id_;
@@ -300,7 +296,7 @@ class ConsensusCoordinator {
300296

301297
private:
302298
std::shared_mutex is_consistency_rwlock_;
303-
bool is_consistency_ = true;
299+
bool is_consistency_ = false;
304300
std::shared_mutex committed_id_rwlock_;
305301
LogOffset committed_id_ = LogOffset();
306302
std::atomic<uint64_t> notification_counter_{0};

include/pika_define.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -364,14 +364,6 @@ const int64_t kPoolSize = 1073741824;
364364
const std::string kBinlogPrefix = "write2file";
365365
const size_t kBinlogPrefixLen = 10;
366366

367-
/*
368-
* PIKA_BATCH_MAGIC: Core identifier for binlog batch processing.
369-
* - Master: Prefixes batched binlogs with this magic in SendBinlog
370-
* - Slave: Detects this magic in HandleBGWorkerWriteBinlog
371-
* to switch between batch and single-binlog parsing modes.
372-
*/
373-
const uint32_t PIKA_BATCH_MAGIC = 0x42544348; // "BTCH" in ASCII
374-
375367
const std::string kPikaMeta = "meta";
376368
const std::string kManifest = "manifest";
377369
const std::string kContext = "context";

0 commit comments

Comments
 (0)