Skip to content

Commit 406bc86

Browse files
authored
Merge pull request #734 from kirilg/1.5
1.5
2 parents c1ec435 + 7f7f172 commit 406bc86

File tree

9 files changed

+21
-143
lines changed

9 files changed

+21
-143
lines changed

tensorflow

Submodule tensorflow updated 1435 files

tensorflow_serving/core/aspired_versions_manager.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,6 @@ Status AspiredVersionsManager::Create(
156156
basic_manager_options.max_num_load_retries = options.max_num_load_retries;
157157
basic_manager_options.load_retry_interval_micros =
158158
options.load_retry_interval_micros;
159-
basic_manager_options.flush_filesystem_caches =
160-
options.flush_filesystem_caches;
161159
basic_manager_options.servable_event_bus = options.servable_event_bus;
162160
basic_manager_options.pre_load_hook = std::move(options.pre_load_hook);
163161
std::unique_ptr<BasicManager> basic_manager;

tensorflow_serving/core/aspired_versions_manager.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,6 @@ class AspiredVersionsManager : public Manager,
123123
/// Default: 1 minute.
124124
int64 load_retry_interval_micros = 1LL * 60 * 1000 * 1000;
125125

126-
// If true, and there are not multiple load threads, filesystem caches will
127-
// be flushed after each servable is loaded. (Cache flush is skipped when
128-
// multiple load threads are active, in order to avoid setting back a
129-
// concurrent load on another thread.)
130-
bool flush_filesystem_caches = false;
131-
132126
/// The environment to use for starting threads in the thread-pool or for
133127
/// sleeping.
134128
Env* env = Env::Default();

tensorflow_serving/core/basic_manager.cc

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -205,23 +205,21 @@ Status BasicManager::Create(Options options,
205205
manager->reset(new BasicManager(
206206
options.env, options.num_load_threads, options.num_unload_threads,
207207
options.max_num_load_retries, options.load_retry_interval_micros,
208-
options.flush_filesystem_caches, std::move(options.resource_tracker),
209-
options.servable_event_bus, std::move(options.pre_load_hook)));
208+
std::move(options.resource_tracker), options.servable_event_bus,
209+
std::move(options.pre_load_hook)));
210210
return Status::OK();
211211
}
212212

213213
BasicManager::BasicManager(Env* const env, const uint32 num_load_threads,
214214
const uint32 num_unload_threads,
215215
uint32 max_num_load_retries,
216216
int64 load_retry_interval_micros,
217-
bool flush_filesystem_caches,
218217
std::unique_ptr<ResourceTracker> resource_tracker,
219218
EventBus<ServableState>* servable_event_bus,
220219
std::function<void(const ServableId&)> pre_load_hook)
221220
: servable_event_bus_(servable_event_bus),
222221
env_(env),
223222
num_load_threads_(num_load_threads),
224-
flush_filesystem_caches_(flush_filesystem_caches),
225223
pre_load_hook_(std::move(pre_load_hook)) {
226224
harness_options_.max_num_load_retries = max_num_load_retries;
227225
harness_options_.load_retry_interval_micros = load_retry_interval_micros;
@@ -231,7 +229,7 @@ BasicManager::BasicManager(Env* const env, const uint32 num_load_threads,
231229
};
232230

233231
{
234-
mutex_lock l(load_executor_mu_);
232+
mutex_lock l(num_load_threads_mu_);
235233
load_executor_ =
236234
CreateExecutor(env_, num_load_threads, "BasicManager_Load_ThreadPool");
237235
}
@@ -243,7 +241,7 @@ BasicManager::BasicManager(Env* const env, const uint32 num_load_threads,
243241
BasicManager::~BasicManager() {
244242
// Reset the executors first to finish all pending loads/unloads.
245243
{
246-
mutex_lock l(load_executor_mu_);
244+
mutex_lock l(num_load_threads_mu_);
247245
load_executor_.reset();
248246
}
249247
unload_executor_.reset();
@@ -464,18 +462,7 @@ Status BasicManager::ExecuteLoad(LoaderHarness* harness) {
464462
}
465463

466464
// We don't hold the lock while calling Load() as it may block.
467-
const Status status = harness->Load();
468-
469-
// Whether the load succeeded or failed, flush filesystem caches if there is
470-
// only one load thread.
471-
if (flush_filesystem_caches_ && num_load_threads() <= 1) {
472-
const Status flush_status = Env::Default()->FlushFileSystemCaches();
473-
if (!flush_status.ok()) {
474-
LOG(WARNING) << "flushing filesystem caches failed: " << flush_status;
475-
}
476-
}
477-
478-
TF_RETURN_IF_ERROR(status);
465+
TF_RETURN_IF_ERROR(harness->Load());
479466

480467
{
481468
mutex_lock l(mu_);
@@ -559,16 +546,18 @@ Status BasicManager::ExecuteLoadOrUnload(const LoadOrUnloadRequest& request,
559546
}
560547

561548
void BasicManager::SetNumLoadThreads(const uint32 num_load_threads) {
562-
mutex_lock l(load_executor_mu_);
549+
mutex_lock l(num_load_threads_mu_);
563550

564551
load_executor_.reset();
565-
num_load_threads_.store(num_load_threads);
552+
num_load_threads_ = num_load_threads;
566553
load_executor_ =
567-
CreateExecutor(env_, num_load_threads, "BasicManager_Load_ThreadPool");
554+
CreateExecutor(env_, num_load_threads_, "BasicManager_Load_ThreadPool");
568555
}
569556

570557
uint32 BasicManager::num_load_threads() const {
571-
return num_load_threads_.load();
558+
mutex_lock l(num_load_threads_mu_);
559+
560+
return num_load_threads_;
572561
}
573562

574563
void BasicManager::LoadOrUnloadServable(const LoadOrUnloadRequest& request,
@@ -596,7 +585,7 @@ void BasicManager::LoadOrUnloadServable(const LoadOrUnloadRequest& request,
596585

597586
switch (request.kind) {
598587
case LoadOrUnloadRequest::Kind::kLoad: {
599-
mutex_lock l(load_executor_mu_);
588+
mutex_lock l(num_load_threads_mu_);
600589
load_executor_->Schedule([this, request, done_callback]() {
601590
HandleLoadOrUnloadRequest(request, done_callback);
602591
});

tensorflow_serving/core/basic_manager.h

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ limitations under the License.
1616
#ifndef TENSORFLOW_SERVING_CORE_BASIC_MANAGER_H_
1717
#define TENSORFLOW_SERVING_CORE_BASIC_MANAGER_H_
1818

19-
#include <atomic>
2019
#include <memory>
2120
#include <string>
2221
#include <unordered_map>
@@ -140,12 +139,6 @@ class BasicManager : public Manager {
140139
// Default: 1 minute.
141140
int64 load_retry_interval_micros = 1LL * 60 * 1000 * 1000;
142141

143-
// If true, and there are not multiple load threads, filesystem caches will
144-
// be flushed after each servable is loaded. (Cache flush is skipped when
145-
// multiple load threads are active, in order to avoid setting back a
146-
// concurrent load on another thread.)
147-
bool flush_filesystem_caches = false;
148-
149142
// The environment to use for starting threads in the thread-pool.
150143
Env* env = Env::Default();
151144

@@ -270,7 +263,6 @@ class BasicManager : public Manager {
270263

271264
BasicManager(Env* env, uint32 num_load_threads, uint32 num_unload_threads,
272265
uint32 max_num_load_retries, int64 load_retry_interval_micros,
273-
bool flush_filesystem_caches,
274266
std::unique_ptr<ResourceTracker> resource_tracker,
275267
EventBus<ServableState>* servable_event_bus,
276268
PreLoadHook pre_load_hook);
@@ -388,8 +380,8 @@ class BasicManager : public Manager {
388380
// the old thread pool blocks until all threads are done, so it could block
389381
// for a long time.
390382
void SetNumLoadThreads(uint32 num_load_threads)
391-
LOCKS_EXCLUDED(load_executor_mu_);
392-
uint32 num_load_threads() const;
383+
LOCKS_EXCLUDED(num_load_threads_mu_);
384+
uint32 num_load_threads() const LOCKS_EXCLUDED(num_load_threads_mu_);
393385

394386
// Keys are the servable names.
395387
// Values are the harnesses for each servable version. The values when
@@ -482,14 +474,12 @@ class BasicManager : public Manager {
482474

483475
Env* const env_;
484476

485-
// The number of load threads. Can be changed after instantiation of the
486-
// manager via SetNumLoadThreads().
487-
std::atomic<uint32> num_load_threads_;
488-
// Whether to flush filesystem caches (if num_load_threads_ == 1)
489-
const bool flush_filesystem_caches_ = false;
490-
// The executor (and associated mutex) used for executing loads of servables.
491-
mutable mutex load_executor_mu_;
492-
std::unique_ptr<Executor> load_executor_ GUARDED_BY(load_executor_mu_);
477+
// The number of load threads and the associated executor. They can be changed
478+
// after instantiation of the manager via SetNumLoadThreads().
479+
mutable mutex num_load_threads_mu_;
480+
uint32 num_load_threads_ GUARDED_BY(num_load_threads_mu_);
481+
// The executor used for executing loads of servables.
482+
std::unique_ptr<Executor> load_executor_ GUARDED_BY(num_load_threads_mu_);
493483

494484
// The executor used for executing unloads of servables. (Unlike for loads,
495485
// the unload executor is fixed for the lifetime of the manager.)

tensorflow_serving/core/basic_manager_test.cc

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -938,73 +938,6 @@ TEST_F(SetNumLoadThreadsBasicManagerTest, FastLoad) {
938938
}
939939
}
940940

941-
// This filesystem detects a call to FlushCaches(), which is triggered by the
942-
// BasicManager's call to Env::Default()->FlushFileSystemCaches() after loading
943-
// a servable.
944-
class FlushDetectingFileSystem : public NullFileSystem {
945-
public:
946-
void FlushCaches() override { flushed = true; }
947-
static std::atomic<bool> flushed;
948-
};
949-
950-
std::atomic<bool> FlushDetectingFileSystem::flushed;
951-
952-
REGISTER_FILE_SYSTEM("flush", FlushDetectingFileSystem);
953-
954-
// This test loads servables with BasicManager::Options::flush_filesystem_caches
955-
// true or false, and verifies that filesystem caches were flushed (or not
956-
// flushed) as expected.
957-
class FlushFileSystemCachesTest : public ::testing::TestWithParam<bool> {
958-
protected:
959-
FlushFileSystemCachesTest() : flush_filesystem_caches_(GetParam()) {
960-
BasicManager::Options options;
961-
options.flush_filesystem_caches = flush_filesystem_caches_;
962-
TF_CHECK_OK(BasicManager::Create(std::move(options), &basic_manager_));
963-
}
964-
965-
std::unique_ptr<BasicManager> basic_manager_;
966-
bool flush_filesystem_caches_;
967-
};
968-
969-
TEST_P(FlushFileSystemCachesTest, Load) {
970-
test_util::BasicManagerTestAccess manager_test_access(basic_manager_.get());
971-
// The number of load threads is initially zero, so filesystems should be
972-
// flushed if flush_filesystem_caches_ is true.
973-
FlushDetectingFileSystem::flushed.store(false);
974-
const ServableId id0 = {kServableName3, 0};
975-
TF_CHECK_OK(basic_manager_->ManageServable(CreateServable(id0)));
976-
basic_manager_->LoadServable(id0, [&](const Status& status) {
977-
TF_ASSERT_OK(status);
978-
EXPECT_EQ(flush_filesystem_caches_,
979-
FlushDetectingFileSystem::flushed.load());
980-
});
981-
// Load another servable with two load threads. Filesystem caches should not
982-
// be flushed.
983-
manager_test_access.SetNumLoadThreads(2);
984-
FlushDetectingFileSystem::flushed.store(false);
985-
const ServableId id1 = {kServableName3, 1};
986-
TF_CHECK_OK(basic_manager_->ManageServable(CreateServable(id1)));
987-
basic_manager_->LoadServable(id1, [&](const Status& status) {
988-
TF_ASSERT_OK(status);
989-
EXPECT_FALSE(FlushDetectingFileSystem::flushed.load());
990-
});
991-
// Now move to a single load thread and load a third servable. Filesystem
992-
// caches should once again be flushed if flush_filesystem_caches_ is true.
993-
manager_test_access.SetNumLoadThreads(1);
994-
FlushDetectingFileSystem::flushed.store(false);
995-
const ServableId id2 = {kServableName3, 2};
996-
TF_CHECK_OK(basic_manager_->ManageServable(CreateServable(id2)));
997-
basic_manager_->LoadServable(id2, [&](const Status& status) {
998-
TF_ASSERT_OK(status);
999-
EXPECT_EQ(flush_filesystem_caches_,
1000-
FlushDetectingFileSystem::flushed.load());
1001-
});
1002-
basic_manager_.reset();
1003-
}
1004-
1005-
INSTANTIATE_TEST_CASE_P(WithOrWithoutFlush, FlushFileSystemCachesTest,
1006-
::testing::Bool());
1007-
1008941
TEST_P(BasicManagerTest, ConcurrentLoadsOnlyOneSucceeds) {
1009942
const ServableId id = {kServableName3, 0};
1010943
mutex status_mu;

tensorflow_serving/model_servers/main.cc

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ int main(int argc, char** argv) {
308308
tensorflow::string batching_parameters_file;
309309
tensorflow::string model_name = "default";
310310
tensorflow::int32 file_system_poll_wait_seconds = 1;
311-
bool flush_filesystem_caches = true;
312311
tensorflow::string model_base_path;
313312
const bool use_saved_model = true;
314313
// Tensorflow session parallelism of zero means that both inter and intra op
@@ -340,14 +339,6 @@ int main(int argc, char** argv) {
340339
&file_system_poll_wait_seconds,
341340
"interval in seconds between each poll of the file "
342341
"system for new model version"),
343-
tensorflow::Flag("flush_filesystem_caches", &flush_filesystem_caches,
344-
"If true (the default), filesystem caches will be "
345-
"flushed after the initial load of all servables, and "
346-
"after each subsequent individual servable reload (if "
347-
"the number of load threads is 1). This reduces memory "
348-
"consumption of the model server, at the potential cost "
349-
"of cache misses if model files are accessed after "
350-
"servables are loaded."),
351342
tensorflow::Flag("tensorflow_session_parallelism",
352343
&tensorflow_session_parallelism,
353344
"Number of threads to use for running a "
@@ -427,7 +418,6 @@ int main(int argc, char** argv) {
427418
options.aspired_version_policy =
428419
std::unique_ptr<AspiredVersionPolicy>(new AvailabilityPreservingPolicy);
429420
options.file_system_poll_wait_seconds = file_system_poll_wait_seconds;
430-
options.flush_filesystem_caches = flush_filesystem_caches;
431421

432422
std::unique_ptr<ServerCore> core;
433423
TF_CHECK_OK(ServerCore::Create(std::move(options), &core));

tensorflow_serving/model_servers/server_core.cc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -459,10 +459,6 @@ Status ServerCore::ReloadConfig(const ModelServerConfig& new_config) {
459459
}
460460
TF_RETURN_IF_ERROR(MaybeUpdateServerRequestLogger());
461461

462-
if (options_.flush_filesystem_caches) {
463-
return Env::Default()->FlushFileSystemCaches();
464-
}
465-
466462
return Status::OK();
467463
}
468464

@@ -636,7 +632,6 @@ Status ServerCore::CreateAspiredVersionsManager(
636632
manager_options.num_unload_threads = options_.num_unload_threads;
637633
manager_options.max_num_load_retries = options_.max_num_load_retries;
638634
manager_options.pre_load_hook = std::move(options_.pre_load_hook);
639-
manager_options.flush_filesystem_caches = options_.flush_filesystem_caches;
640635
const tensorflow::Status status =
641636
AspiredVersionsManager::Create(std::move(manager_options), manager);
642637
if (!status.ok()) {

tensorflow_serving/model_servers/server_core.h

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -117,17 +117,6 @@ class ServerCore : public Manager {
117117
// Time interval between file-system polls, in seconds.
118118
int32 file_system_poll_wait_seconds = 30;
119119

120-
// If true, filesystem caches are flushed in the following cases:
121-
//
122-
// 1) After the initial models are loaded.
123-
// 2) After a new config is supplied and a changed set of models are loaded.
124-
// 3) After each new model version is loaded, if num_load_threads == 1.
125-
//
126-
// In the common scenario where the number of load threads is set to 1 after
127-
// the initial load, this will take care of flushing the cache once after
128-
// the initial load, and after every subsequent load of every model version.
129-
bool flush_filesystem_caches = false;
130-
131120
// Configuration for the supported platforms.
132121
PlatformConfigMap platform_config_map;
133122

0 commit comments

Comments
 (0)