Skip to content

Commit 967ae4f

Browse files
authored
fix: resolve multi thread mkdir error (#8)
1 parent 619fb19 commit 967ae4f

File tree

6 files changed

+127
-106
lines changed

6 files changed

+127
-106
lines changed

src/paimon/common/fs/file_system_test.cpp

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -979,16 +979,17 @@ TEST_P(FileSystemTest, TestMkdirsFailsWithExistingParentFile) {
979979
}
980980

981981
TEST_P(FileSystemTest, TestMkdir) {
982-
std::string path = PathUtil::JoinPath(test_root_, "/tmp.txt/tmpB");
983-
ASSERT_OK(fs_->Mkdirs(path));
982+
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp.txt/tmpB"));
983+
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmpA/tmpB/"));
984+
985+
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp/local/f/1"));
986+
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1"));
987+
ASSERT_OK(fs_->Mkdirs(test_root_ + "/tmp1/f2/"));
988+
ASSERT_OK(fs_->Mkdirs("/"));
989+
ASSERT_NOK_WITH_MSG(fs_->Mkdirs(""), "path is an empty string.");
984990
}
985991

986992
TEST_P(FileSystemTest, TestMkdir2) {
987-
std::string path = PathUtil::JoinPath(test_root_, "/tmpA/tmpB/");
988-
ASSERT_OK(fs_->Mkdirs(path));
989-
}
990-
991-
TEST_P(FileSystemTest, TestMkdir3) {
992993
{
993994
std::string dir_path = test_root_ + "/file_dir/";
994995
ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
@@ -1012,7 +1013,8 @@ TEST_P(FileSystemTest, TestMkdir3) {
10121013
}
10131014
}
10141015

1015-
TEST_P(FileSystemTest, TestMkdirMultiThread) {
1016+
// test for create multi dir such as "/table/partition1/bucket1" and "/table/partition1/bucket2"
1017+
TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameNonExistParentDir) {
10161018
uint32_t runs_count = 10;
10171019
uint32_t thread_count = 10;
10181020
auto executor = CreateDefaultExecutor(thread_count);
@@ -1036,6 +1038,50 @@ TEST_P(FileSystemTest, TestMkdirMultiThread) {
10361038
}
10371039
}
10381040

1041+
// test for create multi dir such as "/table/partition1" and "/table/partition1"
1042+
TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameName) {
1043+
uint32_t runs_count = 10;
1044+
uint32_t thread_count = 10;
1045+
auto executor = CreateDefaultExecutor(thread_count);
1046+
1047+
for (uint32_t i = 0; i < runs_count; i++) {
1048+
std::string uuid;
1049+
ASSERT_TRUE(UUID::Generate(&uuid));
1050+
std::vector<std::future<void>> futures;
1051+
for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++) {
1052+
futures.push_back(Via(executor.get(), [this, &uuid]() -> void {
1053+
std::string dir_path = PathUtil::JoinPath(test_root_, uuid);
1054+
ASSERT_OK(fs_->Mkdirs(dir_path));
1055+
ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
1056+
ASSERT_TRUE(is_exist);
1057+
}));
1058+
}
1059+
Wait(futures);
1060+
}
1061+
}
1062+
1063+
// test for create multi dir such as "partition1" and "partition1" (relative path)
1064+
TEST_P(FileSystemTest, TestMkdirMultiThreadWithSameNameWithRelativePath) {
1065+
uint32_t runs_count = 10;
1066+
uint32_t thread_count = 10;
1067+
auto executor = CreateDefaultExecutor(thread_count);
1068+
1069+
for (uint32_t i = 0; i < runs_count; i++) {
1070+
std::string uuid;
1071+
ASSERT_TRUE(UUID::Generate(&uuid));
1072+
std::vector<std::future<void>> futures;
1073+
for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++) {
1074+
futures.push_back(Via(executor.get(), [this, &uuid]() -> void {
1075+
std::string dir_path = uuid;
1076+
ASSERT_OK(fs_->Mkdirs(dir_path));
1077+
ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
1078+
ASSERT_TRUE(is_exist);
1079+
}));
1080+
}
1081+
Wait(futures);
1082+
}
1083+
}
1084+
10391085
TEST_P(FileSystemTest, TestInvalidMkdir) {
10401086
{
10411087
// test mkdir with one exist dir

src/paimon/common/utils/path_util.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class PAIMON_EXPORT PathUtil {
3939
~PathUtil() = delete;
4040

4141
static std::string JoinPath(const std::string& path, const std::string& name) noexcept;
42+
// TODO(jinli.zjw): should pass `Path.path` and normalize; otherwise if path is
43+
// "oss://bucket1/", GetParentDirPath will return "oss:"
4244
static std::string GetParentDirPath(const std::string& path) noexcept;
4345
static std::string GetName(const std::string& path) noexcept;
4446
static void TrimLastDelim(std::string* dir_path) noexcept;

src/paimon/fs/local/local_file.cpp

Lines changed: 2 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -123,68 +123,9 @@ Status LocalFile::Delete() const {
123123
return Status::OK();
124124
}
125125

126-
Status LocalFile::MkNestDir(const std::string& dir_name) const {
126+
Result<bool> LocalFile::Mkdir() const {
127127
CHECK_HOOK();
128-
size_t pos = dir_name.rfind('/');
129-
if (pos == std::string::npos) {
130-
if (mkdir(dir_name.c_str(), 0755) < 0) {
131-
if (errno != EEXIST) {
132-
int32_t cur_errno = errno;
133-
return Status::IOError(fmt::format("MkNestDir path '{}' fail, ec: {}", path_,
134-
std::strerror(cur_errno)));
135-
}
136-
}
137-
return Status::OK();
138-
}
139-
140-
std::string parent_dir = dir_name.substr(0, pos);
141-
if (!parent_dir.empty() && access(parent_dir.c_str(), F_OK) != 0) {
142-
PAIMON_RETURN_NOT_OK(MkNestDir(parent_dir));
143-
}
144-
145-
if (mkdir(dir_name.c_str(), 0755) < 0) {
146-
if (errno != EEXIST) {
147-
int32_t cur_errno = errno;
148-
return Status::IOError(
149-
fmt::format("MkNestDir path '{}' fail, ec: {}", path_, std::strerror(cur_errno)));
150-
}
151-
}
152-
return Status::OK();
153-
}
154-
155-
Status LocalFile::Mkdir() const {
156-
CHECK_HOOK();
157-
std::string dir = path_;
158-
size_t len = dir.size();
159-
if (dir[len - 1] == '/') {
160-
if (len == 1) {
161-
return Status::Exist(fmt::format("directory '{}' already exist", dir));
162-
} else {
163-
dir.resize(len - 1);
164-
}
165-
}
166-
if (access(dir.c_str(), F_OK) == 0) {
167-
return Status::Exist(fmt::format("directory '{}' already exist", dir));
168-
}
169-
size_t pos = dir.rfind('/');
170-
if (pos == std::string::npos) {
171-
if (mkdir(dir.c_str(), 0755) < 0) {
172-
int32_t cur_errno = errno;
173-
return Status::IOError(
174-
fmt::format("Mkdir path '{}' fail, ec: {}", dir, std::strerror(cur_errno)));
175-
}
176-
return Status::OK();
177-
}
178-
std::string parent_dir = dir.substr(0, pos);
179-
if (!parent_dir.empty() && access(parent_dir.c_str(), F_OK) != 0) {
180-
PAIMON_RETURN_NOT_OK(MkNestDir(parent_dir));
181-
}
182-
if (mkdir(dir.c_str(), 0755) < 0) {
183-
int32_t cur_errno = errno;
184-
return Status::IOError(
185-
fmt::format("create directory '{}' failed, ec: {}", dir, std::strerror(cur_errno)));
186-
}
187-
return Status::OK();
128+
return mkdir(path_.c_str(), 0755) == 0;
188129
}
189130

190131
Result<std::unique_ptr<LocalFileStatus>> LocalFile::GetFileStatus() const {
@@ -349,13 +290,6 @@ Status LocalFile::OpenFile(bool is_read_file) {
349290
CHECK_HOOK();
350291
file_ = fopen(path_.c_str(), "r");
351292
} else {
352-
LocalFile parent_dir = GetParentFile();
353-
if (!parent_dir.GetAbsolutePath().empty()) {
354-
PAIMON_ASSIGN_OR_RAISE(bool is_exist, parent_dir.Exists());
355-
if (!is_exist) {
356-
PAIMON_RETURN_NOT_OK(parent_dir.Mkdir());
357-
}
358-
}
359293
CHECK_HOOK();
360294
file_ = fopen(path_.c_str(), "w");
361295
}

src/paimon/fs/local/local_file.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class LocalFile {
4747
Status Delete() const;
4848
const std::string& GetAbsolutePath() const;
4949
LocalFile GetParentFile() const;
50-
Status Mkdir() const;
50+
Result<bool> Mkdir() const;
5151
Result<std::unique_ptr<LocalFileStatus>> GetFileStatus() const;
5252
Result<uint64_t> Length() const;
5353
Result<int64_t> LastModifiedTimeMs() const;
@@ -68,8 +68,6 @@ class LocalFile {
6868
}
6969

7070
private:
71-
Status MkNestDir(const std::string& dir_name) const;
72-
7371
const std::string path_;
7472
FILE* file_ = nullptr;
7573
IOHook* hook_;

src/paimon/fs/local/local_file_system.cpp

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ Status LocalFileSystem::Mkdirs(const std::string& path) const {
7676
}
7777

7878
Status LocalFileSystem::MkdirsInternal(const LocalFile& file) const {
79-
// Important: The 'Exists()' check above must come before the 'IsDirectory()'
79+
// Important: The 'Exists()' check above must come before the 'IsDir()'
8080
// check to be safe when multiple parallel instances try to create the directory
8181
PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists());
8282
if (is_exist) {
@@ -90,7 +90,20 @@ Status LocalFileSystem::MkdirsInternal(const LocalFile& file) const {
9090
}
9191
}
9292

93-
PAIMON_RETURN_NOT_OK(file.Mkdir());
93+
auto parent = file.GetParentFile();
94+
if (!parent.IsEmpty()) {
95+
PAIMON_RETURN_NOT_OK(MkdirsInternal(parent));
96+
}
97+
PAIMON_ASSIGN_OR_RAISE(bool success, file.Mkdir());
98+
if (!success) {
99+
PAIMON_ASSIGN_OR_RAISE(bool is_dir, file.IsDir());
100+
if (is_dir) {
101+
return Status::OK();
102+
} else {
103+
return Status::IOError(
104+
fmt::format("create directory '{}' failed", file.GetAbsolutePath()));
105+
}
106+
}
94107
return Status::OK();
95108
}
96109

@@ -210,17 +223,7 @@ Status LocalFileSystem::Rename(const std::string& src, const std::string& dst) c
210223
}
211224
PAIMON_ASSIGN_OR_RAISE(LocalFile dst_file, ToFile(dst));
212225
auto parent = dst_file.GetParentFile();
213-
if (!parent.GetAbsolutePath().empty()) {
214-
PAIMON_ASSIGN_OR_RAISE(bool is_exist, parent.Exists());
215-
if (is_exist) {
216-
// pass
217-
} else {
218-
Status status = parent.Mkdir();
219-
if (!status.ok() && !status.IsExist()) {
220-
return status;
221-
}
222-
}
223-
}
226+
PAIMON_RETURN_NOT_OK(Mkdirs(parent.GetAbsolutePath()));
224227
if (::rename(src.c_str(), dst.c_str()) != 0) {
225228
int32_t cur_errno = errno;
226229
return Status::IOError(err_msg, std::strerror(cur_errno));

src/paimon/fs/local/local_file_test.cpp

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ TEST(LocalFileTest, TestReadWriteEmptyContent) {
3333
if (dir.Exists().ok()) {
3434
ASSERT_TRUE(dir.Delete().ok());
3535
}
36-
ASSERT_TRUE(dir.Mkdir().ok());
36+
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
37+
ASSERT_TRUE(success);
3738
std::string path = test_root + "/test.txt";
3839
LocalFile file = LocalFile(path);
3940
if (file.Exists().ok()) {
@@ -69,7 +70,8 @@ TEST(LocalFileTest, TestSimple) {
6970
if (dir.Exists().ok()) {
7071
ASSERT_OK(dir.Delete());
7172
}
72-
ASSERT_OK(dir.Mkdir());
73+
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
74+
ASSERT_TRUE(success);
7375
std::string path = test_root + "/test.txt";
7476
LocalFile file = LocalFile(path);
7577
if (file.Exists().ok()) {
@@ -127,21 +129,27 @@ TEST(LocalFileTest, TestSimple) {
127129
ASSERT_EQ(strcmp(str_read, "test_data"), 0);
128130
}
129131

132+
// dir already exists
133+
ASSERT_OK_AND_ASSIGN(success, dir.Mkdir());
134+
ASSERT_FALSE(success);
135+
130136
ASSERT_OK(file2.Delete());
131137
ASSERT_FALSE(file2.Exists().value());
132138
}
133139

134140
TEST(LocalFileTest, TestUsage) {
135-
std::string test_root = "tmp/local_file_test_usage";
141+
std::string test_root = "local_file_test_usage";
136142
LocalFile dir = LocalFile(test_root);
137-
ASSERT_OK(dir.Mkdir());
143+
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
144+
ASSERT_TRUE(success);
138145
std::vector<std::string> file_list;
139146
ASSERT_OK(dir.List(&file_list));
140-
std::string path_deep_dir = test_root + "/tmp2/tmp3";
147+
std::string path_deep_dir = test_root + "/tmp2";
141148
LocalFile deep_dir = LocalFile(path_deep_dir);
142-
ASSERT_OK(deep_dir.Mkdir());
149+
ASSERT_OK_AND_ASSIGN(success, deep_dir.Mkdir());
150+
ASSERT_TRUE(success);
143151
LocalFile parent_deep_dir = deep_dir.GetParentFile();
144-
ASSERT_EQ(parent_deep_dir.GetAbsolutePath(), test_root + "/tmp2");
152+
ASSERT_EQ(parent_deep_dir.GetAbsolutePath(), test_root);
145153
ASSERT_OK(deep_dir.Delete());
146154
ASSERT_OK(parent_deep_dir.Delete());
147155
ASSERT_OK(dir.Delete());
@@ -155,7 +163,8 @@ TEST(LocalFileTest, TestOpenFile) {
155163
if (dir.Exists().ok()) {
156164
ASSERT_OK(dir.Delete());
157165
}
158-
ASSERT_OK(dir.Mkdir());
166+
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
167+
ASSERT_TRUE(success);
159168
std::string path = test_root + "/test.txt";
160169
LocalFile file = LocalFile(path);
161170
if (file.Exists().ok()) {
@@ -167,20 +176,49 @@ TEST(LocalFileTest, TestOpenFile) {
167176
ASSERT_NOK_WITH_MSG(file.OpenFile(/*is_read_file=*/true), "file not exist");
168177
ASSERT_NOK_WITH_MSG(dir.OpenFile(/*is_read_file=*/true), "cannot open a directory");
169178

170-
std::string path2 = test_root + "/foo/test.txt";
171-
LocalFile file2 = LocalFile(path2);
172-
ASSERT_OK(file2.OpenFile(/*is_read_file=*/false));
173-
174179
std::string path3 = "test.txt";
175180
LocalFile file3 = LocalFile(path3);
176181
ASSERT_OK(file3.OpenFile(/*is_read_file=*/false));
177182
ASSERT_OK_AND_ASSIGN(int64_t modify_time, file3.LastModifiedTimeMs());
178183
ASSERT_GE(modify_time, -1);
179184

180185
LocalFile dir2 = LocalFile("/");
181-
ASSERT_NOK_WITH_MSG(dir2.Mkdir(), "directory '/' already exist");
186+
ASSERT_OK_AND_ASSIGN(success, dir2.Mkdir());
187+
ASSERT_FALSE(success);
182188
LocalFile dir3 = LocalFile(test_root + "/");
183-
ASSERT_NOK_WITH_MSG(dir3.Mkdir(), "already exist");
189+
ASSERT_OK_AND_ASSIGN(success, dir3.Mkdir());
190+
ASSERT_FALSE(success);
191+
}
192+
193+
TEST(LocalFileTest, TestMkdir) {
194+
auto test_root_dir = UniqueTestDirectory::Create();
195+
ASSERT_TRUE(test_root_dir);
196+
std::string test_root = test_root_dir->Str();
197+
{
198+
LocalFile dir = LocalFile(test_root + "tmp/local/f/1");
199+
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
200+
ASSERT_FALSE(success);
201+
}
202+
{
203+
LocalFile dir = LocalFile(test_root + "tmp1");
204+
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
205+
ASSERT_TRUE(success);
206+
}
207+
{
208+
LocalFile dir = LocalFile(test_root + "tmp1/f2/");
209+
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
210+
ASSERT_TRUE(success);
211+
}
212+
{
213+
LocalFile dir = LocalFile("/");
214+
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
215+
ASSERT_FALSE(success);
216+
}
217+
{
218+
LocalFile dir = LocalFile("");
219+
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
220+
ASSERT_FALSE(success);
221+
}
184222
}
185223

186224
} // namespace paimon::test

0 commit comments

Comments
 (0)