Skip to content

Commit 868d13a

Browse files
committed
Separate out the configuration from initialization and construction
Make the construction of the Cloud objects (LogController, CloudEnv, and StorageProvider) be a separate step from the configuration of the objects and the initialization. This will later allow the objects to be configured and initialized via properties files.
1 parent 368e0bb commit 868d13a

File tree

11 files changed

+293
-292
lines changed

11 files changed

+293
-292
lines changed

cloud/aws/aws_env.cc

Lines changed: 30 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ Status AwsCloudAccessCredentials::TEST_Initialize() {
7676
Status AwsCloudAccessCredentials::CheckCredentials(
7777
const AwsAccessType& aws_type) const {
7878
#ifndef USE_AWS
79+
(void) aws_type;
7980
return Status::NotSupported("AWS not supported");
8081
#else
8182
if (aws_type == AwsAccessType::kSimple) {
@@ -287,139 +288,7 @@ AwsEnv::AwsEnv(Env* underlying_env, const CloudEnvOptions& _cloud_env_options,
287288
cloud_env_options.dest_bucket.SetRegion(region);
288289
}
289290
}
290-
291-
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> creds;
292-
create_bucket_status_ =
293-
cloud_env_options.credentials.GetCredentialsProvider(&creds);
294-
if (!create_bucket_status_.ok()) {
295-
Log(InfoLogLevel::INFO_LEVEL, info_log,
296-
"[aws] NewAwsEnv - Bad AWS credentials");
297-
}
298-
299-
Header(info_log_, " AwsEnv.src_bucket_name: %s",
300-
cloud_env_options.src_bucket.GetBucketName().c_str());
301-
Header(info_log_, " AwsEnv.src_object_path: %s",
302-
cloud_env_options.src_bucket.GetObjectPath().c_str());
303-
Header(info_log_, " AwsEnv.src_bucket_region: %s",
304-
cloud_env_options.src_bucket.GetRegion().c_str());
305-
Header(info_log_, " AwsEnv.dest_bucket_name: %s",
306-
cloud_env_options.dest_bucket.GetBucketName().c_str());
307-
Header(info_log_, " AwsEnv.dest_object_path: %s",
308-
cloud_env_options.dest_bucket.GetObjectPath().c_str());
309-
Header(info_log_, " AwsEnv.dest_bucket_region: %s",
310-
cloud_env_options.dest_bucket.GetRegion().c_str());
311-
Header(info_log_, " AwsEnv.credentials: %s",
312-
creds ? "[given]" : "[not given]");
313-
314291
base_env_ = underlying_env;
315-
316-
// TODO: support buckets being in different regions
317-
if (!SrcMatchesDest() && HasSrcBucket() && HasDestBucket()) {
318-
if (cloud_env_options.src_bucket.GetRegion() == cloud_env_options.dest_bucket.GetRegion()) {
319-
// alls good
320-
} else {
321-
create_bucket_status_ =
322-
Status::InvalidArgument("Two different regions not supported");
323-
Log(InfoLogLevel::ERROR_LEVEL, info_log,
324-
"[aws] NewAwsEnv Buckets %s, %s in two different regions %s, %s "
325-
"is not supported",
326-
cloud_env_options.src_bucket.GetBucketName().c_str(),
327-
cloud_env_options.dest_bucket.GetBucketName().c_str(),
328-
cloud_env_options.src_bucket.GetRegion().c_str(),
329-
cloud_env_options.dest_bucket.GetRegion().c_str());
330-
return;
331-
}
332-
}
333-
// create AWS S3 client with appropriate timeouts
334-
Aws::Client::ClientConfiguration config;
335-
create_bucket_status_ =
336-
AwsCloudOptions::GetClientConfiguration(this,
337-
cloud_env_options.src_bucket.GetRegion(),
338-
&config);
339-
if (create_bucket_status_.ok()) {
340-
create_bucket_status_ = CloudStorageProviderImpl::CreateS3Provider(
341-
&cloud_env_options.storage_provider);
342-
if (create_bucket_status_.ok()) {
343-
create_bucket_status_ = cloud_env_options.storage_provider->Prepare(this);
344-
}
345-
}
346-
if (!create_bucket_status_.ok()) {
347-
return;
348-
}
349-
350-
Header(info_log_, "AwsEnv connection to endpoint in region: %s",
351-
config.region.c_str());
352-
353-
// create dest bucket if specified
354-
if (HasDestBucket()) {
355-
if (cloud_env_options.storage_provider->ExistsBucket(GetDestBucketName())
356-
.ok()) {
357-
Log(InfoLogLevel::INFO_LEVEL, info_log,
358-
"[aws] NewAwsEnv Bucket %s already exists",
359-
GetDestBucketName().c_str());
360-
} else if (cloud_env_options.create_bucket_if_missing) {
361-
Log(InfoLogLevel::INFO_LEVEL, info_log,
362-
"[aws] NewAwsEnv Going to create bucket %s",
363-
GetDestBucketName().c_str());
364-
create_bucket_status_ =
365-
cloud_env_options.storage_provider->CreateBucket(GetDestBucketName());
366-
} else {
367-
create_bucket_status_ = Status::NotFound(
368-
"[aws] Bucket not found and create_bucket_if_missing is false");
369-
}
370-
}
371-
if (!create_bucket_status_.ok()) {
372-
Log(InfoLogLevel::ERROR_LEVEL, info_log,
373-
"[aws] NewAwsEnv Unable to create bucket %s %s",
374-
GetDestBucketName().c_str(),
375-
create_bucket_status_.ToString().c_str());
376-
}
377-
378-
// create cloud log client for storing/reading logs
379-
if (create_bucket_status_.ok() && !cloud_env_options.keep_local_log_files) {
380-
if (cloud_env_options.log_type == kLogKinesis) {
381-
create_bucket_status_ = CloudLogControllerImpl::CreateKinesisController(
382-
this, &cloud_env_options.cloud_log_controller);
383-
} else if (cloud_env_options.log_type == kLogKafka) {
384-
#ifdef USE_KAFKA
385-
create_bucket_status_ = CloudLogControllerImpl::CreateKafkaController(
386-
this, &cloud_env_options.cloud_log_controller);
387-
#else
388-
create_bucket_status_ = Status::NotSupported(
389-
"In order to use Kafka, make sure you're compiling with "
390-
"USE_KAFKA=1");
391-
392-
Log(InfoLogLevel::ERROR_LEVEL, info_log,
393-
"[aws] NewAwsEnv Unknown log type %d. %s",
394-
cloud_env_options.log_type,
395-
create_bucket_status_.ToString().c_str());
396-
#endif /* USE_KAFKA */
397-
} else {
398-
create_bucket_status_ =
399-
Status::NotSupported("We currently only support Kinesis and Kafka");
400-
401-
Log(InfoLogLevel::ERROR_LEVEL, info_log,
402-
"[aws] NewAwsEnv Unknown log type %d. %s", cloud_env_options.log_type,
403-
create_bucket_status_.ToString().c_str());
404-
}
405-
406-
// Create Kinesis stream and wait for it to be ready
407-
if (create_bucket_status_.ok()) {
408-
create_bucket_status_ =
409-
cloud_env_options.cloud_log_controller->StartTailingStream(
410-
GetSrcBucketName());
411-
if (!create_bucket_status_.ok()) {
412-
Log(InfoLogLevel::ERROR_LEVEL, info_log,
413-
"[aws] NewAwsEnv Unable to create stream %s",
414-
create_bucket_status_.ToString().c_str());
415-
}
416-
}
417-
}
418-
if (!create_bucket_status_.ok()) {
419-
Log(InfoLogLevel::ERROR_LEVEL, info_log,
420-
"[aws] NewAwsEnv Unable to create environment %s",
421-
create_bucket_status_.ToString().c_str());
422-
}
423292
}
424293

425294
AwsEnv::~AwsEnv() {
@@ -868,11 +737,6 @@ void AwsEnv::RemoveFileFromDeletionQueue(const std::string& filename) {
868737
}
869738
}
870739

871-
Status AwsEnv::TEST_DeletePathInS3(const std::string& bucket,
872-
const std::string& fname) {
873-
return cloud_env_options.storage_provider->DeleteObject(bucket, fname);
874-
}
875-
876740
Status AwsEnv::DeleteFile(const std::string& logical_fname) {
877741
assert(status().ok());
878742

@@ -1358,10 +1222,35 @@ Status AwsEnv::NewAwsEnv(Env* base_env,
13581222
if (!base_env) {
13591223
base_env = Env::Default();
13601224
}
1361-
std::unique_ptr<AwsEnv> aenv(new AwsEnv(base_env, cloud_options, info_log));
1362-
if (!aenv->status().ok()) {
1363-
status = aenv->status();
1364-
} else {
1225+
// These lines of code are likely temporary until the new configuration stuff
1226+
// comes into play.
1227+
CloudEnvOptions options = cloud_options; // Make a copy
1228+
status =
1229+
CloudStorageProviderImpl::CreateS3Provider(&options.storage_provider);
1230+
if (status.ok() && !cloud_options.keep_local_log_files) {
1231+
if (cloud_options.log_type == kLogKinesis) {
1232+
status = CloudLogControllerImpl::CreateKinesisController(
1233+
&options.cloud_log_controller);
1234+
} else if (cloud_options.log_type == kLogKafka) {
1235+
status = CloudLogControllerImpl::CreateKafkaController(
1236+
&options.cloud_log_controller);
1237+
} else {
1238+
status =
1239+
Status::NotSupported("We currently only support Kinesis and Kafka");
1240+
Log(InfoLogLevel::ERROR_LEVEL, info_log,
1241+
"[aws] NewAwsEnv Unknown log type %d. %s", cloud_options.log_type,
1242+
status.ToString().c_str());
1243+
}
1244+
}
1245+
if (!status.ok()) {
1246+
Log(InfoLogLevel::ERROR_LEVEL, info_log,
1247+
"[aws] NewAwsEnv Unable to create environment %s",
1248+
status.ToString().c_str());
1249+
return status;
1250+
}
1251+
std::unique_ptr<AwsEnv> aenv(new AwsEnv(base_env, options, info_log));
1252+
status = aenv->Prepare();
1253+
if (status.ok()) {
13651254
*cenv = aenv.release();
13661255
}
13671256
return status;

cloud/aws/aws_env.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,6 @@ class AwsEnv : public CloudEnvImpl {
215215
file_deletion_delay_ = delay;
216216
}
217217

218-
Status TEST_DeletePathInS3(const std::string& bucket,
219-
const std::string& fname);
220-
221218
private:
222219
//
223220
// The AWS credentials are specified to the constructor via

0 commit comments

Comments
 (0)