-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathchunkserver_entry.h
More file actions
374 lines (307 loc) · 14.8 KB
/
chunkserver_entry.h
File metadata and controls
374 lines (307 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
/*
Copyright 2013-2015 Skytechnology sp. z o.o.
Copyright 2023 Leil Storage OÜ
This file is part of SaunaFS.
SaunaFS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, version 3.
SaunaFS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SaunaFS. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "common/platform.h"
#include <cstdint>
#include <list>
#include <memory>
#include <vector>
#include "chunkserver-common/disk_utils.h"
#include "chunkserver/bgjobs.h"
#include "chunkserver/io_buffers.h"
#include "common/aligned_allocator.h"
#include "common/chunk_part_type.h"
#include "common/network_address.h"
#include "common/slice_traits.h"
#include "protocol/cltocs.h"
class GetBlocksHighLevelOp;
class ReadHighLevelOp;
class WriteHighLevelOp;
using AlignedVectorForIO = std::vector<uint8_t, AlignedAllocator<uint8_t, disk::kIoBlockSize>>;
// 4 K + 64 K
// [4K .... HEADER]+[Up to SFSBLOCKSIZE of aligned data ...]
constexpr uint32_t kIOAlignedPacketSize = disk::kIoBlockSize + SFSBLOCKSIZE;
// Starting point to have the actual data aligned to 4 K
constexpr uint32_t kIOAlignedOffset = disk::kIoBlockSize - cltocs::writeData::kPrefixSize;
// Alias for better readability
#define kInvalidPacket nullptr
/**
* @brief Encapsulates the data associated with a packet.
*
* Including pointers to the packet data, the number of bytes left to process,
* and an optional output buffer for writing data.
*/
struct PacketStruct {
uint8_t *startPtr = nullptr;
uint32_t bytesLeft = 0;
std::vector<uint8_t> packet;
std::shared_ptr<OutputBuffer> outputBuffer;
};
/**
* @brief Represents a single connection to a chunkserver.
*
* This struct manages the state and data associated with a connection to a
* chunkserver. It includes information about the connection's state, mode,
* sockets, and various buffers used for reading and writing data. It also
* maintains metadata for managing the connection's lifecycle and handling
* retries, timeouts, and partial writes.
*
* @details
* The `ChunkserverEntry` struct is used extensively within the
* `NetworkWorkerThread` to manage connections. It supports both reading and
* writing operations, including forwarding data to other chunkservers in a
* write chain. The struct also tracks job IDs and partially completed writes to
* ensure data consistency and proper error handling.
*/
struct ChunkserverEntry {
/// The possible modes of a `ChunkserverEntry`.
enum class Mode : uint8_t {
Header, // reading packet header
Data // reading packet data
};
/// The possible connection states of a `ChunkserverEntry`.
enum class State : uint8_t {
Idle, // idle connection, new or used previously
Read, // after CLTOCS_READ, but didn't send all of the
// CSTOCL_READ_(DATA|STATUS)
GetBlock, // after CSTOCS_GET_CHUNK_BLOCKS, but didn't send response
WriteLast, // ready for writing data; data not forwarded to other CSs
Connecting, // connecting to other chunkserver to form a writing chain
WriteInit, // sending packet forming a chain to the next chunkserver
WriteForward, // ready for writing data; will be forwarded to other CSs
IOFinish, // closing a connection after finishing IO, but before sending the final status
// to the client
Close, // close request, will change to CloseWait or Closed
CloseWait, // waits for a worker to finish a job, then will be Closed
Closed // ready to be deleted
};
// Some constants to improve readability
static constexpr int kInvalidSocket = -1;
static constexpr int kInitConnectionOK = 0;
static constexpr int kInitConnectionFailed = -1;
static constexpr uint32_t kGenerateChartExpectedPacketSize =
sizeof(uint32_t);
JobPool *workerJobPool; // Job pool assigned to a given network worker thread
ChunkserverEntry::State state = ChunkserverEntry::State::Idle;
ChunkserverEntry::Mode mode = ChunkserverEntry::Mode::Header;
ChunkserverEntry::Mode fwdMode = ChunkserverEntry::Mode::Header;
int sock;
int fwdSocket = kInvalidSocket; ///< forwarding socket for writing
uint64_t connectStartTimeUSec = 0; ///< for timeout and retry (usec)
uint8_t connectRetryCounter = 0; ///< for timeout and retry
NetworkAddress fwdServer; // the next server in write chain
int32_t pDescPos = -1; ///< Position in the poll descriptors array
int32_t fwdPDescPos = -1; ///< Position in poll descriptors for fwdSocket
uint32_t lastActivity = 0; ///< Last activity time
uint8_t headerBuffer[PacketHeader::kSize]{}; ///< buffer for packet header
uint8_t fwdHeaderBuffer[PacketHeader::kSize]{}; ///< fwd packet header buff
/// Stores the data of the incoming packet for processing
PacketStruct inputPacket;
PacketStruct fwdOutputPacket; ///< used for forwarding inputpacket data
PacketStruct fwdInputPacket; ///< used for receiving status from fwdSocket
std::vector<uint8_t> fwdInitPacket; ///< used only for write initialization
/// List of output packets waiting to be sent to the clients
std::list<std::unique_ptr<PacketStruct>> outputPackets;
uint64_t chunkId = 0; // R+W
uint32_t chunkVersion = 0; // R+W
ChunkPartType chunkType = slice_traits::standard::ChunkPartType(); // R
ChunkserverEntry(int socket, JobPool *workerJobPool, uint16_t maxBlocksPerHddReadJob,
uint16_t maxParallelHddReadJobs, uint16_t maxBlocksPerHddWriteJob);
// Disallow copying and moving to avoid misuse.
ChunkserverEntry(const ChunkserverEntry &) = delete;
ChunkserverEntry &operator=(const ChunkserverEntry &) = delete;
ChunkserverEntry(ChunkserverEntry &&) = delete;
ChunkserverEntry &operator=(ChunkserverEntry &&) = delete;
/// Destructor: closes the sockets.
~ChunkserverEntry();
/// Returns whether the last header type was SAU_CLTOCS_WRITE_DATA.
inline bool isLastHeaderTypeWriteData();
/// Attaches a packet to the output packet list (taking ownership).
inline void attachPacket(std::unique_ptr<PacketStruct> &&packet);
/// Attaches an output buffer to the output packet list (taking ownership).
void attachBuffer(std::shared_ptr<OutputBuffer> &&buffer);
/// Creates an attached packet from the given vector.
/// The function takes ownership of the vector.
void createAttachedPacket(std::vector<uint8_t> &packet);
/// Creates an attached packet with the given type and operation size.
///
/// @param type The type of the packet.
/// @param operationSize The size of the operation.
/// @return Pointer to the created packet data.
uint8_t *createAttachedPacket(uint32_t type, uint32_t operationSize);
/// Processes read or write bytes from the socket.
/// @param bytesRW The number of bytes read or written.
/// @param packet The packet structure being processed.
/// @param shouldForwardError Indicates if the error should be forwarded.
/// @param callerName The name of the calling function for logging purposes.
/// @param isRead Indicates if the operation is a read (true) or write (false).
/// @return True if the operation was successful, false otherwise.
bool processRWBytes(int bytesRW, PacketStruct &packet, bool shouldForwardError,
const char *callerName, bool isRead);
/// Reads the packet header from the socket.
/// @param socket The socket to read from.
/// @param packet The packet structure to fill.
/// @param headerBuf The buffer to store the header.
/// @param targetMode The mode to set after reading the header.
/// @return True if the header was read successfully, false otherwise.
bool readHeader(int socket, PacketStruct &packet, uint8_t *headerBuf, Mode &targetMode);
/// Reads data from the socket into the packet structure.
/// @param socket The socket to read from.
/// @param packet The packet structure to fill.
/// @return True if the data was read successfully, false otherwise.
bool readData(int socket, PacketStruct &packet);
/// Writes the packet data to the socket.
/// @param socket The socket to write to.
/// @param packet The packet structure containing the data to write.
/// @return True if the data was written successfully, false otherwise.
bool writePacket(int socket, PacketStruct &packet);
/// Processes a packet based on its type and the current mode.
/// @param packet The packet structure to process.
/// @param headerBuf The buffer containing the packet header.
/// @param targetMode The mode to set after processing the packet.
/// @param fromForward Indicates if the packet is being processed from a forward operation.
void processPacket(PacketStruct &packet, uint8_t *headerBuf, Mode &targetMode,
bool fromForward);
/// Handles forwarding errors by setting the appropriate error status and
/// transitioning the connection state to `IOFinish`.
///
/// This function is called when an error occurs during forwarding
/// operations, such as read or write errors on the forwarding socket. It
/// serializes an error status message and attaches it to the packet, then
/// sets the state to `IOFinish` to indicate that the connection should
/// be closed after sending the error status.
void fwdError();
/// Handles the event when a connection to another chunkserver is
/// successfully established.
///
/// This function is called when the connection to the next chunkserver in
/// the write chain is successfully established.
///
/// Typically invoked after a successful non-blocking connect operation.
///
/// \see ChunkserverEntry::retryConnect
void fwdConnected();
/// Reads data from the forwarding socket and processes it.
void fwdRead();
/// Writes data to the forwarding socket.
///
/// This function handles writing data to the forwarding socket
/// (`fwdSocket`). It attempts to write the remaining data in the
/// `fwdStartPtr` buffer to the socket.
///
/// This function is typically invoked when the forwarding socket is ready
/// for writing, as indicated by the `POLLOUT` event in the poll descriptor.
void fwdWrite();
/// Initiates the forwarding process for the current packet.
///
/// This function is responsible for initiating the forwarding process of
/// the current packet to the next chunkserver in the chain.
///
/// This function is typically called when a packet needs to be forwarded to
/// another chunkserver for further processing.
void forward();
/// Initializes the connection to the next chunkserver in the chain.
///
/// This function sets up the necessary parameters and state for
/// establishing a connection to the next chunkserver.
///
/// This function is typically called when a new connection needs to be made
/// to forward data to another chunkserver.
///
/// @return An integer status code indicating the success or failure of the
/// connection initialization. A return value of 0 indicates
/// success, while a non-zero value indicates an error.
int initConnection();
/// Attempts to re-establish a connection to the next chunkserver.
/// Implements a retry mechanism to ensure that the connection
/// is eventually established
void retryConnect();
/// Processes a received packet based on its type.
///
/// @param type The type of the packet.
/// @param data Pointer to the packet data.
/// @param length The length of the packet data.
void gotPacket(uint32_t type, const uint8_t *data, uint32_t length);
/* IDLE Operations */
/// Answers to a ping message with the given data and length.
void ping(const uint8_t *data, PacketHeader::Length length);
/// Initializes a read operation
///
/// @param data Pointer to the buffer containing the information to read.
/// @param type The type of the packet.
/// @param length The length of the packet data.
void readInit(const uint8_t *data, PacketHeader::Type type,
PacketHeader::Length length);
/// Requests a data prefetch operation.
/// Prefetch in this context means reading data from the disk and storing it
/// in the page cache.
void prefetch(const uint8_t *data, PacketHeader::Type type,
PacketHeader::Length length);
/// Serializes and attaches a write status message to the output packets list.
void createAttachedWriteStatus(uint64_t targetChunkId, uint8_t status, uint32_t writeId);
/// Retrieves chunk blocks from the given information using the new way.
void sauGetChunkBlocks(const uint8_t *data, uint32_t length);
/// Retrieves the list with the HDDs information.
void hddListV2([[maybe_unused]] const uint8_t *data, uint32_t length);
/// Lists the disk groups (if the DiskManager supports it).
void listDiskGroups([[maybe_unused]] const uint8_t *data,
[[maybe_unused]] uint32_t length);
/// Generates a chart in PNG or CSV format.
void generateChartPNGorCSV(const uint8_t *data, uint32_t length);
/// Generates chart data.
void generateChartData(const uint8_t *data, uint32_t length);
/// Adds a chunk to the test queue for CRC checking.
/// Usually the master server sends this command after a client reports an
/// error in the CRC of a block.
void testChunk(const uint8_t *data, uint32_t length);
/// Initializes a write operation.
void writeInit(const uint8_t *data, PacketHeader::Type type,
PacketHeader::Length length);
/* WriteLast or WriteForward*/
/// Writes a block of data to the drives.
void writeData(const uint8_t *data, PacketHeader::Type type,
PacketHeader::Length length);
/// Finalizes a write operation and closes the chunk and connection.
void writeEnd(const uint8_t *data, uint32_t length);
/// Posts a write a status message to be sent through the network.
void writeStatus(const uint8_t *data, PacketHeader::Type type,
PacketHeader::Length length);
/* servePoll related */
/// Writes data from an output packet to the socket.
void writeToSocket();
/// Reads data from the socket into the input buffer.
void readFromSocket();
/// Checks if it is a read operation and tries to finish it.
void outputCheckReadFinished();
/// Checks if the chunk is open for reading or writing.
bool isChunkOpen();
/// Checks if it is ready to be closed, and if so set the state to Closed.
void checkAndApplyClosed();
/// Closes all active jobs and updates the state.
///
/// This function disables and changes the callback for any active read,
/// write, or get blocks jobs. If no jobs are active, it closes the chunk
/// and sets the state to `Closed`.
///
/// Called from the `NetworkWorkerThread` when a connection is closed.
void closeJobs();
private:
/// Write operation related data
std::unique_ptr<WriteHighLevelOp> writeHLO_;
/// Read operation related data
std::unique_ptr<ReadHighLevelOp> readHLO_;
/// Get blocks operation related data
std::unique_ptr<GetBlocksHighLevelOp> getBlocksHLO_;
};