1+ // Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
2+ // This source code is licensed under the BSD-style license found in the
3+ // LICENSE file in the root directory of this source tree. An additional grant
4+ // of patent rights can be found in the PATENTS file in the same directory.
5+
6+ #ifndef PIKA_COMMAND_COLLECTOR_H_
7+ #define PIKA_COMMAND_COLLECTOR_H_
8+
9+ #include < atomic>
10+ #include < condition_variable>
11+ #include < deque>
12+ #include < functional>
13+ #include < list>
14+ #include < map>
15+ #include < memory>
16+ #include < mutex>
17+ #include < string>
18+ #include < thread>
19+ #include < unordered_map>
20+ #include < vector>
21+ #include < chrono>
22+ #include < optional>
23+
24+ #include " include/pika_command.h"
25+ #include " include/pika_define.h"
26+ #include " pstd/include/pstd_status.h"
27+
28+ #include " include/pika_consensus.h"
29+
30+ /* *
31+ * @brief PikaCommandCollector is used to collect write commands and process them in batches
32+ *
33+ * Main functions:
34+ * 1. Collect write commands and process them in optimized batches after reaching the threshold
35+ * 2. Handle the conflict of the same key (the later command will overwrite the earlier command)
36+ * 3. Send commands in batches to the consensus coordinator with batch-level synchronization
37+ * 4. Support asynchronous callback notification of command processing results
38+ * 5. Track performance metrics for batch processing
39+ * 6. Provide intelligent retry mechanisms for failed batches
40+ */
41+ class PikaCommandCollector {
42+ public:
43+ // Callback function type after command processing is completed
44+ using CommandCallback = std::function<void (const LogOffset& offset, pstd::Status status)>;
45+
46+ /* *
47+ * @brief constructor
48+ * @param coordinator consensus coordinator reference
49+ * @param batch_size batch size (number of commands)
50+ * @param batch_max_wait_time forced flush interval (milliseconds)
51+ */
52+ // Constructor with raw pointer (original)
53+ PikaCommandCollector (ConsensusCoordinator* coordinator, size_t batch_size = 100 , int batch_max_wait_time = 5 );
54+
55+ // Constructor with shared_ptr (for compatibility with make_shared calls)
56+ PikaCommandCollector (std::shared_ptr<ConsensusCoordinator> coordinator, size_t batch_size = 100 , int batch_max_wait_time = 5 );
57+
58+ ~PikaCommandCollector ();
59+
60+ /* *
61+ * @brief Add command to collector
62+ * @param cmd_ptr command pointer
63+ * @param callback callback function after processing is completed
64+ * @return whether the addition was successful
65+ */
66+ bool AddCommand (const std::shared_ptr<Cmd>& cmd_ptr, CommandCallback callback);
67+
68+ /* *
69+ * @brief Called periodically by external systems to process batches
70+ * @param force Force processing even if batch is not full or timeout not reached
71+ * @return Number of commands processed
72+ */
73+
74+ /* *
75+ * @brief Immediately process all currently collected commands
76+ * @return The number of commands processed
77+ */
78+ size_t FlushCommands (bool force = false );
79+
80+
81+ /* *
82+ * @brief Get the current number of pending commands
83+ * @return number of commands
84+ */
85+ size_t PendingCommands () const ;
86+
87+ /* *
88+ * @brief Set the batch size
89+ * @param batch_size batch size
90+ */
91+ void SetBatchSize (size_t batch_size);
92+
93+ /* *
94+ * @brief Set the batch max wait time
95+ * @param batch_max_wait_time maximum wait time in milliseconds
96+ */
97+ void SetBatchMaxWaitTime (int batch_max_wait_time);
98+
99+ /* *
100+ * @brief Get batch processing statistics
101+ * @return Pair of (total_processed_commands, total_batches)
102+ */
103+ std::pair<uint64_t , uint64_t > GetBatchStats () const ;
104+
105+ /* *
106+ * @brief Get average batch processing time in milliseconds
107+ * @return Average processing time or nullopt if no batches processed
108+ */
109+ std::optional<double > GetAverageBatchTime () const ;
110+
111+ private:
112+
113+ /* *
114+ * @brief batch processing command
115+ * @param batch command batch
116+ * @return Whether the processing is successful
117+ */
118+ pstd::Status ProcessBatch (const std::vector<std::shared_ptr<Cmd>>& commands,
119+ const std::vector<CommandCallback>& callbacks);
120+
121+ /* *
122+ * @brief Check for conflicts based on command type and key name
123+ * @param cmd_ptr command pointer
124+ * @return true if there is a conflict (should be replaced), false if there is no conflict
125+ */
126+ bool CheckConflict (const std::shared_ptr<Cmd>& cmd_ptr) const ;
127+
128+ /* *
129+ * @brief Handle key conflicts and remove conflicting commands
130+ * @param cmd_ptr new command
131+ */
132+ void HandleConflict (const std::shared_ptr<Cmd>& cmd_ptr);
133+
134+ /* *
135+ * @brief Retry batch processing commands
136+ * @param commands List of commands to retry
137+ * @param callbacks Corresponding callback function list
138+ * @param priority Priority level for the retry (higher means more urgent)
139+ * @return Whether the commands were successfully requeued
140+ */
141+ bool RetryBatch (const std::vector<std::shared_ptr<Cmd>>& commands,
142+ const std::vector<CommandCallback>& callbacks,
143+ int priority = 100 );
144+
145+ private:
146+ // Consensus coordinator reference
147+ ConsensusCoordinator* coordinator_;
148+
149+ // Batch processing configuration
150+ std::atomic<size_t > batch_size_;
151+ std::atomic<int > batch_max_wait_time_;
152+
153+ // Retry configuration
154+ std::atomic<int > max_retry_attempts_{3 };
155+ std::atomic<int > retry_backoff_ms_{50 };
156+
157+ // Command collection and processing
158+ mutable std::mutex mutex_;
159+
160+ // Pending command queue and callbacks
161+ std::list<std::pair<std::shared_ptr<Cmd>, CommandCallback>> pending_commands_;
162+
163+ // Priority queue for retries
164+ std::deque<std::tuple<int , std::vector<std::shared_ptr<Cmd>>, std::vector<CommandCallback>>> retry_queue_;
165+
166+ // Command key mapping, used to handle same-key conflicts
167+ std::unordered_map<std::string, std::list<std::pair<std::shared_ptr<Cmd>, CommandCallback>>::iterator> key_map_;
168+
169+ // Batch statistics
170+ std::atomic<uint64_t > total_processed_{0 };
171+ std::atomic<uint64_t > total_batches_{0 };
172+ std::atomic<uint64_t > total_retries_{0 };
173+ std::atomic<uint64_t > total_conflicts_{0 };
174+ std::atomic<uint64_t > total_batch_time_ms_{0 };
175+ std::chrono::time_point<std::chrono::steady_clock> batch_start_time_;
176+
177+ // Performance tracking
178+ struct BatchMetrics {
179+ uint64_t batch_size;
180+ uint64_t processing_time_ms;
181+ uint64_t wait_time_ms;
182+ bool successful;
183+ };
184+
185+ // Circular buffer for recent batch metrics
186+ static constexpr size_t kMetricsBufferSize = 100 ;
187+ std::vector<BatchMetrics> recent_metrics_;
188+ std::mutex metrics_mutex_;
189+ };
190+
191+ #endif // PIKA_COMMAND_COLLECTOR_H_
0 commit comments