Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/common/time_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,24 @@ int64_t Timer::lap_s() {

// Timeout implementation

Timeout::Timeout(std::chrono::nanoseconds timeout) :
timeout_(std::chrono::duration_cast<SteadyDuration>(timeout)) {
Timeout::Timeout(std::chrono::nanoseconds timeout) {
if (timeout.count() < 0) {
infinite_ = true;
timeout_ = SteadyDuration(0);
} else {
infinite_ = false;
timeout_ = std::chrono::duration_cast<SteadyDuration>(timeout);
}
}

SteadyTimePoint Timeout::deadline() const {
if (infinite_) { return SteadyTimePoint::max(); }
return startTime() + timeout_;
}

SteadyDuration Timeout::remainingTime() const {
if (infinite_) { return SteadyDuration::max(); }

SteadyDuration elapsed = elapsedTime();
if (elapsed >= timeout_) {
return SteadyDuration(0);
Expand All @@ -128,21 +137,26 @@ SteadyDuration Timeout::remainingTime() const {
}

int64_t Timeout::remaining_ns() const {
if (infinite_) { return -1; }
return duration_int64_cast<std::nano>(remainingTime());
}

int64_t Timeout::remaining_us() const {
if (infinite_) { return -1; }
return duration_int64_cast<std::micro>(remainingTime());
}

int64_t Timeout::remaining_ms() const {
if (infinite_) { return -1; }
return duration_int64_cast<std::milli>(remainingTime());
}

int64_t Timeout::remaining_s() const {
if (infinite_) { return -1; }
return duration_int64_cast<std::ratio<1>>(remainingTime());
}

bool Timeout::expired() const {
if (infinite_) { return false; }
return remainingTime() == SteadyDuration(0);
}
1 change: 1 addition & 0 deletions src/common/time_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@ class Timeout : public Timer {
bool expired() const;
private:
SteadyDuration timeout_;
bool infinite_ = false;
};
18 changes: 18 additions & 0 deletions src/common/time_utils_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,21 @@ TEST(TimeUtilsTests, TimerAndTimeout) {
EXPECT_EQ(0, timeout.remaining_ns());
EXPECT_TRUE(timeout.expired());
}

TEST(TimeUtilsTests, InfiniteTimeout) {
Timeout timeout(std::chrono::milliseconds(-1));

EXPECT_FALSE(timeout.expired());
EXPECT_EQ(timeout.remaining_ms(), -1);
EXPECT_EQ(timeout.remaining_us(), -1);
EXPECT_EQ(timeout.remaining_ns(), -1);
EXPECT_EQ(timeout.remaining_s(), -1);

std::this_thread::sleep_for(std::chrono::milliseconds(50));

EXPECT_FALSE(timeout.expired());
EXPECT_EQ(timeout.remaining_ms(), -1);
EXPECT_EQ(timeout.remaining_us(), -1);
EXPECT_EQ(timeout.remaining_ns(), -1);
EXPECT_EQ(timeout.remaining_s(), -1);
}
87 changes: 21 additions & 66 deletions src/tools/append_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <cstdint>

#include "common/datapack.h"
#include "common/server_connection.h"
#include "errors/saunafs_error_codes.h"
#include "errors/sfserr.h"
#include "tools/tools_commands.h"
Expand All @@ -41,15 +42,12 @@ static void append_file_usage() {
}

static int append_file(const char *fname, const char *afname) {
uint32_t cmd, leng, uid, gid;
uint32_t uid, gid;
uint32_t msgid{0};
inode_t inode, ainode;
uint8_t status;

constexpr uint32_t kAppendFilePayload =
sizeof(msgid) + sizeof(inode) + sizeof(ainode) + sizeof(uid) + sizeof(gid);
constexpr uint32_t kReqBuffSize = sizeof(cmd) + sizeof(kAppendFilePayload) + kAppendFilePayload;
uint8_t reqbuff[kReqBuffSize], *wptr, *buff;
const uint8_t *rptr;
MessageBuffer request, response;
mode_t dmode, smode;

int fd;
Expand All @@ -74,73 +72,30 @@ static int append_file(const char *fname, const char *afname) {
uid = getUId();
gid = getGId();

wptr = reqbuff;
put32bit(&wptr, CLTOMA_FUSE_APPEND);
put32bit(&wptr, kAppendFilePayload);
put32bit(&wptr, msgid);
putINode(&wptr, inode);
putINode(&wptr, ainode);
put32bit(&wptr, uid);
put32bit(&wptr, gid);

// send the request
if (tcpwrite(fd, reqbuff, kReqBuffSize) != kReqBuffSize) {
printf("%s: master query: send error\n", fname);
close_master_conn(1);
return -1;
}

// read the first part of the answer
if (tcpread(fd, reqbuff, sizeof(cmd) + sizeof(leng)) != sizeof(cmd) + sizeof(leng)) {
printf("%s: master query: receive error\n", fname);
close_master_conn(1);
return -1;
}

rptr = reqbuff;
get32bit(&rptr, cmd);
get32bit(&rptr, leng);
try {
serializeLegacyPacket(request, CLTOMA_FUSE_APPEND, msgid, inode, ainode, uid, gid);
response = ServerConnection::sendAndReceive(
fd, request, MATOCL_FUSE_APPEND,
ServerConnection::ReceiveMode::kReceiveFirstNonNopMessage, kDefaultTimeoutMs);
deserializeAllLegacyPacketDataNoHeader(response, msgid, status);

if (cmd != MATOCL_FUSE_APPEND) {
printf("%s: master query: wrong answer (type)\n", fname);
close_master_conn(1);
return -1;
}
close_master_conn(0);

buff = (uint8_t *)malloc(leng);
if (msgid != 0) {
printf("%s: master query: wrong answer (msgid)\n", fname);
return -1;
}

if (tcpread(fd, buff, leng) != (int32_t)leng) {
printf("%s: master query: receive error\n", fname);
free(buff);
if (status != SAUNAFS_STATUS_OK) {
printf("%s: %s\n", fname, saunafs_error_string(status));
return -1;
}
} catch (const Exception &e) {
fprintf(stderr, "%s\n", e.what());
close_master_conn(1);
return -1;
}

close_master_conn(0); // not needed anymore

// check the msgid
rptr = buff;
get32bit(&rptr, msgid); // queryid
if (msgid != 0) {
printf("%s: master query: wrong answer (queryid)\n", fname);
free(buff);
return -1;
}

if (leng - sizeof(msgid) != 1) {
printf("%s: master query: wrong answer (leng)\n", fname);
free(buff);
return -1;
}

if (*rptr != SAUNAFS_STATUS_OK) {
printf("%s: %s\n", fname, saunafs_error_string(*rptr));
free(buff);
return -1;
}

free(buff);

return 0;
}

Expand Down
149 changes: 58 additions & 91 deletions src/tools/check_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <cstdint>

#include "common/datapack.h"
#include "common/server_connection.h"
#include "errors/saunafs_error_codes.h"
#include "tools/tools_commands.h"
#include "tools/tools_common_functions.h"
Expand All @@ -34,122 +35,88 @@ static void check_file_usage() {
}

static int check_file(const char *fname) {
uint32_t cmd, leng;
uint32_t msgid{0};
inode_t inode;
uint8_t copies;
uint32_t chunks;
uint8_t status;

constexpr uint32_t kCheckFilePayload = sizeof(msgid) + sizeof(inode);
constexpr uint32_t kReqBuffSize = sizeof(cmd) + sizeof(kCheckFilePayload) + kCheckFilePayload;

uint8_t reqbuff[kReqBuffSize], *wptr, *buff;
const uint8_t *rptr;
MessageBuffer request, response;

int fd;
fd = open_master_conn(fname, &inode, nullptr, false);
if (fd < 0) {
return -1;
}

wptr = reqbuff;
put32bit(&wptr, CLTOMA_FUSE_CHECK);
put32bit(&wptr, kCheckFilePayload);
put32bit(&wptr, msgid);
putINode(&wptr, inode);

// send request to master
if (tcpwrite(fd, reqbuff, kReqBuffSize) != kReqBuffSize) {
printf("%s: master query: send error\n", fname);
close_master_conn(1);
return -1;
}

// read the first part of the answer
if (tcpread(fd, reqbuff, sizeof(cmd) + sizeof(leng)) != sizeof(cmd) + sizeof(leng)) {
printf("%s: master query: receive error\n", fname);
close_master_conn(1);
return -1;
}

rptr = reqbuff;
get32bit(&rptr, cmd);
get32bit(&rptr, leng);

if (cmd != MATOCL_FUSE_CHECK) {
printf("%s: master query: wrong answer (type)\n", fname);
close_master_conn(1);
return -1;
}
try {
serializeLegacyPacket(request, CLTOMA_FUSE_CHECK, msgid, inode);
response = ServerConnection::sendAndReceive(
fd, request, MATOCL_FUSE_CHECK,
ServerConnection::ReceiveMode::kReceiveFirstNonNopMessage, kDefaultTimeoutMs);

buff = (uint8_t *)malloc(leng);

// read the rest of the answer into the buffer
if (tcpread(fd, buff, leng) != (int32_t)leng) {
printf("%s: master query: receive error\n", fname);
free(buff);
close_master_conn(1);
return -1;
}
close_master_conn(0);

close_master_conn(0); // not needed anymore
// Parse response buffer
const uint8_t *rptr = response.data();
get32bit(&rptr, msgid);

rptr = buff;
get32bit(&rptr, msgid); // queryid
if (msgid != 0) {
printf("%s: master query: wrong answer (msgid)\n", fname);
return -1;
}

if (msgid != 0) {
printf("%s: master query: wrong answer (queryid)\n", fname);
free(buff);
return -1;
}
uint32_t remaining = static_cast<uint32_t>(response.size() - sizeof(msgid));

if (leng - sizeof(msgid) == 1) { // an error code was returned
printf("%s: %s\n", fname, saunafs_error_string(*rptr));
free(buff);
return -1;
}

leng -= 4;
if (remaining == sizeof(status)) {
status = *rptr;
printf("%s: %s\n", fname, saunafs_error_string(status));
return -1;
}

constexpr uint32_t kExpectedSize = + CHUNK_MATRIX_SIZE * sizeof(uint32_t);
constexpr uint32_t kExpectedSize = CHUNK_MATRIX_SIZE * sizeof(uint32_t);

if (leng % 3 != 0 && leng != kExpectedSize) {
printf("%s: master query: wrong answer (leng)\n", fname);
free(buff);
return -1;
}
if (remaining % 3 != 0 && remaining != kExpectedSize) {
printf("%s: master query: wrong answer (leng)\n", fname);
return -1;
}

printf("%s:\n", fname);
printf("%s:\n", fname);

if (leng % 3 == 0) {
for (cmd = 0; cmd < leng; cmd += 3) {
copies = get8bit(&rptr);
chunks = get16bit(&rptr);
if (copies == 1) {
printf("1 copy:");
} else {
printf("%" PRIu8 " copies:", copies);
}
print_number(" ", "\n", chunks, 1, 0, 1);
}
} else {
for (cmd = 0; cmd < CHUNK_MATRIX_SIZE; cmd++) {
get32bit(&rptr, chunks);
if (chunks > 0) {
if (cmd == 1) {
printf(" chunks with 1 copy: ");
} else if (cmd >= 10) {
printf(" chunks with 10+ copies:");
// Legacy format: N * [copies:8, chunks:16]
if (remaining % 3 == 0) {
for (uint32_t offset = 0; offset < remaining; offset += 3) {
uint8_t copies = get8bit(&rptr);
uint16_t chunkCount16 = get16bit(&rptr);
uint32_t chunkCount = chunkCount16;
if (copies == 1) {
printf("1 copy:");
} else {
printf(" chunks with %u copies: ", cmd);
printf("%" PRIu8 " copies:", copies);
}
print_number(" ", "\n", chunkCount, 1, 0, 1);
}
} else {
// Modern format: CHUNK_MATRIX_SIZE * [chunks:32]
for (uint32_t copyIndex = 0; copyIndex < CHUNK_MATRIX_SIZE; ++copyIndex) {
uint32_t chunkCount = 0;
get32bit(&rptr, chunkCount);
if (chunkCount > 0) {
if (copyIndex == 1) {
printf(" chunks with 1 copy: ");
} else if (copyIndex >= 10) {
printf(" chunks with 10+ copies:");
} else {
printf(" chunks with %u copies: ", copyIndex);
}
print_number(" ", "\n", chunkCount, 1, 0, 1);
}
print_number(" ", "\n", chunks, 1, 0, 1);
}
}
} catch (const Exception &e) {
fprintf(stderr, "%s\n", e.what());
close_master_conn(1);
return -1;
}

free(buff);

return 0;
}

Expand Down
Loading