-
Notifications
You must be signed in to change notification settings - Fork 320
Expand file tree
/
Copy pathblock_service_manager.cpp
More file actions
218 lines (194 loc) · 8.15 KB
/
block_service_manager.cpp
File metadata and controls
218 lines (194 loc) · 8.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "block_service_manager.h"
#include <algorithm>
#include <utility>
#include <vector>
#include "block_service/block_service.h"
#include "block_service/hdfs/hdfs_service.h"
#include "block_service/local/local_service.h"
#include "fmt/core.h"
#include "task/task_code.h"
#include "task/task_tracker.h"
#include "utils/config_api.h"
#include "utils/factory_store.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/strings.h"
namespace dsn {
namespace dist {
namespace block_service {
const char *BLOCK_SERVICE_JUICEFS = "juicefs_service";
block_service_registry::block_service_registry()
{
CHECK(utils::factory_store<block_filesystem>::register_factory(
"hdfs_service", block_filesystem::create<hdfs_service>, PROVIDER_TYPE_MAIN),
"register hdfs_service failed");
// juice_service use hdfs_service as default provider
CHECK(utils::factory_store<block_filesystem>::register_factory(
BLOCK_SERVICE_JUICEFS, block_filesystem::create<hdfs_service>, PROVIDER_TYPE_MAIN),
"register juice_service failed");
CHECK(utils::factory_store<block_filesystem>::register_factory(
"local_service", block_filesystem::create<local_service>, PROVIDER_TYPE_MAIN),
"register local_service failed");
}
block_service_manager::block_service_manager()
: // we got a instance of block_service_registry each time we create a block_service_manger
// to make sure that the filesystem providers are registered
_registry_holder(block_service_registry::instance())
{
}
block_service_manager::~block_service_manager()
{
LOG_INFO("close block service manager.");
zauto_write_lock l(_fs_lock);
_fs_map.clear();
}
block_filesystem *block_service_manager::get_or_create_block_filesystem(const std::string &provider)
{
zauto_write_lock l(_fs_lock);
auto iter = _fs_map.find(provider);
if (iter != _fs_map.end()) {
return iter->second.get();
}
block_filesystem *fs = nullptr;
const char *provider_type = nullptr;
bool isJuicefs = is_juicefs_provider(provider);
if (isJuicefs) {
provider_type = BLOCK_SERVICE_JUICEFS;
} else {
provider_type = dsn_config_get_value_string(
(std::string("block_service.") + provider).c_str(), "type", "", "block service type");
}
fs = utils::factory_store<block_filesystem>::create(provider_type, PROVIDER_TYPE_MAIN);
if (fs == nullptr) {
LOG_ERROR("acquire block filesystem failed, provider = {}, provider_type = {}",
provider,
provider_type);
return nullptr;
}
std::vector<std::string> args;
std::string args_for_log;
if (isJuicefs) {
// juicefs provider example: jfs://pegasus@ak-bigdata
args = {provider, "/"};
args_for_log = provider + " /";
} else {
const char *arguments =
dsn_config_get_value_string((std::string("block_service.") + provider).c_str(),
"args",
"",
"args for block_service");
utils::split_args(arguments, args);
args_for_log = arguments;
}
dsn::error_code err = fs->initialize(args);
const auto provider_desc = fmt::format(
"provider = {}, provider_type = {}, args = {}", provider, provider_type, args_for_log);
if (dsn::ERR_OK == err) {
LOG_INFO("create block filesystem ok for {}", provider_desc);
_fs_map.emplace(provider, std::unique_ptr<block_filesystem>(fs));
} else {
LOG_ERROR("create block filesystem failed for {}, error = {}", provider_desc, err);
delete fs;
fs = nullptr;
}
return fs;
}
static create_file_response create_block_file_sync(const std::string &remote_file_path,
bool ignore_meta,
block_filesystem *fs,
task_tracker *tracker)
{
create_file_response ret;
fs->create_file(
create_file_request{remote_file_path, ignore_meta},
TASK_CODE_EXEC_INLINED,
[&ret](const create_file_response &resp) { ret = resp; },
tracker);
tracker->wait_outstanding_tasks();
return ret;
}
static download_response
download_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker)
{
download_response ret;
bf->download(
download_request{local_file_path, 0, -1},
TASK_CODE_EXEC_INLINED,
[&ret](const download_response &resp) { ret = resp; },
tracker);
tracker->wait_outstanding_tasks();
return ret;
}
error_code block_service_manager::download_file(const std::string &remote_dir,
const std::string &local_dir,
const std::string &file_name,
block_filesystem *fs,
/*out*/ uint64_t &download_file_size)
{
std::string md5;
return download_file(remote_dir, local_dir, file_name, fs, download_file_size, md5);
}
// ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_DEFAULT
error_code block_service_manager::download_file(const std::string &remote_dir,
const std::string &local_dir,
const std::string &file_name,
block_filesystem *fs,
/*out*/ uint64_t &download_file_size,
/*out*/ std::string &download_file_md5)
{
// local file exists
const std::string local_file_name = utils::filesystem::path_combine(local_dir, file_name);
if (utils::filesystem::file_exists(local_file_name)) {
LOG_INFO("local file({}) exists", local_file_name);
return ERR_PATH_ALREADY_EXIST;
}
task_tracker tracker;
// Create a block_file object.
const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
auto create_resp =
create_block_file_sync(remote_file_name, false /*ignore file meta*/, fs, &tracker);
error_code err = create_resp.err;
if (err != ERR_OK) {
LOG_ERROR("create file({}) failed with error({})", remote_file_name, err);
return err;
}
block_file_ptr bf = create_resp.file_handle;
download_response resp = download_block_file_sync(local_file_name, bf.get(), &tracker);
if (resp.err != ERR_OK) {
// during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable
// error, however, if file damaged on remote file provider, bulk load should stop,
// return ERR_CORRUPTION instead
if (resp.err == ERR_OBJECT_NOT_FOUND) {
LOG_ERROR("download file({}) failed, file on remote file provider is damaged",
local_file_name);
return ERR_CORRUPTION;
}
return resp.err;
}
LOG_INFO("download file({}) succeed, file_size = {}, md5 = {}",
local_file_name,
resp.downloaded_size,
resp.file_md5);
download_file_size = resp.downloaded_size;
download_file_md5 = resp.file_md5;
return ERR_OK;
}
} // namespace block_service
} // namespace dist
} // namespace dsn