Skip to content
This repository was archived by the owner on Feb 12, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kinesis_video_streamer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ catkin_package()
## Build ##
###########
## Declare a C++ executable
set(KINESIS_VIDEO_STREAMER_SRC src/ros_stream_subscription_installer.cpp src/subscriber_callbacks.cpp src/streamer.cpp)
set(KINESIS_VIDEO_STREAMER_SRC src/ros_stream_subscription_installer.cpp src/subscriber_callbacks.cpp src/streamer.cpp src/credentials.cpp)
add_executable(${PROJECT_NAME} src/main.cpp ${KINESIS_VIDEO_STREAMER_SRC})
add_library(${PROJECT_NAME}_lib ${KINESIS_VIDEO_STREAMER_SRC})
## Specify include directories
Expand Down
20 changes: 20 additions & 0 deletions kinesis_video_streamer/config/node_sample_configuration.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
# This is the AWS IoT Credentials Provider configuration, used as 1st priority authentication method if valid.
iot:
# Path to the Root CA for the endpoint
#cafile: ""

# Path to the certificate which identifies the device
#certfile: ""

# Path to the related private key for the certificate
#keyfile: ""

# Thing name for the device
#thing_name: ""

# Host name of the iot:CredentialProvider endpoint
#endpoint: ""

# Name of the AWS IoT Role Alias for the device
#role: ""

# This is the AWS Client Configuration used by the AWS service client in the Node. If given the node will load the
# provided configuration when initializing the client.
aws_client_configuration:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws_common/sdk_utils/auth/service_credentials_provider.h>
#include <kinesis-video-producer/Auth.h>

namespace Aws
{
namespace Auth
{
/**
* Creates an AWSCredentialsProviderChain which uses in order IotRoleCredentialsProvider and EnvironmentAWSCredentialsProvider.
*/
class CustomAWSCredentialsProviderChain : public AWSCredentialsProviderChain
{
public:
CustomAWSCredentialsProviderChain() = default;

/**
* Initializes the provider chain with IotRoleCredentialsProvider and EnvironmentAWSCredentialsProvider in that order.
*
* @param config Configuration for available credential providers
*/
CustomAWSCredentialsProviderChain(const ServiceAuthConfig &config);
};

} // namespace Auth
} // namespace Aws


namespace Aws {
namespace Kinesis {
/**
* Credentials provider which uses the AWS SDK's default credential provider chain.
* @note You need to have called Aws::InitAPI before using this provider.
*/
class CustomProducerSdkAWSCredentialsProvider : public com::amazonaws::kinesis::video::CredentialProvider
{
public:
CustomProducerSdkAWSCredentialsProvider(std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
aws_credentials_provider = nullptr);
private:
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> aws_credentials_provider_;

void updateCredentials(com::amazonaws::kinesis::video::Credentials & producer_sdk_credentials) override;
};

} // namespace Kinesis
} // namespace Aws
121 changes: 121 additions & 0 deletions kinesis_video_streamer/src/credentials.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#include <aws/core/platform/Environment.h>
#include <aws/core/utils/logging/LogMacros.h>

#include <kinesis_video_streamer/credentials.h>

using namespace Aws::Auth;

/// Logging tag used for all messages emitting from this module
static const char AWS_LOG_TAG[] = "CustomAWSCredentialsProviderChain";
static const char AWS_ECS_CONTAINER_CREDENTIALS_RELATIVE_URI[] = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI";
static const char AWS_ECS_CONTAINER_CREDENTIALS_FULL_URI[] = "AWS_CONTAINER_CREDENTIALS_FULL_URI";
static const char AWS_ECS_CONTAINER_AUTHORIZATION_TOKEN[] = "AWS_CONTAINER_AUTHORIZATION_TOKEN";
static const char AWS_EC2_METADATA_DISABLED[] = "AWS_EC2_METADATA_DISABLED";

namespace Aws
{
namespace Auth
{

/**
* \brief Validates an instance of an IotRoleConfig struct
* @param config The struct to validate
* @return True if the struct is valid, meaning all the config needed to connect is there
*/
static bool IsIotConfigValid(const IotRoleConfig & config)
{
return config.cafile.length() > 0 && config.certfile.length() > 0 &&
config.keyfile.length() > 0 && config.host.length() > 0 && config.role.length() > 0 &&
config.name.length() > 0 && config.connect_timeout_ms > 0 && config.total_timeout_ms > 0;
}

CustomAWSCredentialsProviderChain::CustomAWSCredentialsProviderChain(const ServiceAuthConfig &config):
AWSCredentialsProviderChain()
{
// Add IoT credentials provider if valid
if (IsIotConfigValid(config.iot)) {
AWS_LOG_INFO(AWS_LOG_TAG, "Found valid IoT auth config, adding IotRoleCredentialsProvider");
auto provider = Aws::MakeShared<IotRoleCredentialsProvider>(__func__, config.iot);
AddProvider(provider);
} else {
AWS_LOG_INFO(AWS_LOG_TAG, "No valid IoT auth config, skipping IotRoleCredentialsProvider");
}

// Add environment credentials provider
AddProvider(Aws::MakeShared<EnvironmentAWSCredentialsProvider>(AWS_LOG_TAG));
AddProvider(Aws::MakeShared<ProfileConfigFileAWSCredentialsProvider>(AWS_LOG_TAG));

//ECS TaskRole Credentials only available when ENVIRONMENT VARIABLE is set
const auto relativeUri = Aws::Environment::GetEnv(AWS_ECS_CONTAINER_CREDENTIALS_RELATIVE_URI);
AWS_LOGSTREAM_DEBUG(AWS_LOG_TAG, "The environment variable value " << AWS_ECS_CONTAINER_CREDENTIALS_RELATIVE_URI
<< " is " << relativeUri);

const auto absoluteUri = Aws::Environment::GetEnv(AWS_ECS_CONTAINER_CREDENTIALS_FULL_URI);
AWS_LOGSTREAM_DEBUG(AWS_LOG_TAG, "The environment variable value " << AWS_ECS_CONTAINER_CREDENTIALS_FULL_URI
<< " is " << absoluteUri);

const auto ec2MetadataDisabled = Aws::Environment::GetEnv(AWS_EC2_METADATA_DISABLED);
AWS_LOGSTREAM_DEBUG(AWS_LOG_TAG, "The environment variable value " << AWS_EC2_METADATA_DISABLED
<< " is " << ec2MetadataDisabled);

if (!relativeUri.empty())
{
AddProvider(Aws::MakeShared<TaskRoleCredentialsProvider>(AWS_LOG_TAG, relativeUri.c_str()));
AWS_LOGSTREAM_INFO(AWS_LOG_TAG, "Added ECS metadata service credentials provider with relative path: ["
<< relativeUri << "] to the provider chain.");
}
else if (!absoluteUri.empty())
{
const auto token = Aws::Environment::GetEnv(AWS_ECS_CONTAINER_AUTHORIZATION_TOKEN);
AddProvider(Aws::MakeShared<TaskRoleCredentialsProvider>(AWS_LOG_TAG,
absoluteUri.c_str(), token.c_str()));

//DO NOT log the value of the authorization token for security purposes.
AWS_LOGSTREAM_INFO(AWS_LOG_TAG, "Added ECS credentials provider with URI: ["
<< absoluteUri << "] to the provider chain with a" << (token.empty() ? "n empty " : " non-empty ")
<< "authorization token.");
}
else if (Aws::Utils::StringUtils::ToLower(ec2MetadataDisabled.c_str()) != "true")
{
AddProvider(Aws::MakeShared<InstanceProfileCredentialsProvider>(AWS_LOG_TAG));
AWS_LOGSTREAM_INFO(AWS_LOG_TAG, "Added EC2 metadata service credentials provider to the provider chain.");
}
}

} // namespace Auth
} // namespace Aws

namespace Aws {
namespace Kinesis {
/**
* Credentials provider which uses the AWS SDK's default credential provider chain.
* @note You need to have called Aws::InitAPI before using this provider.
*/
CustomProducerSdkAWSCredentialsProvider::CustomProducerSdkAWSCredentialsProvider(
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> aws_credentials_provider)
{
if (aws_credentials_provider) {
aws_credentials_provider_ = aws_credentials_provider;
} else {
aws_credentials_provider_ =
Aws::MakeShared<Aws::Auth::DefaultAWSCredentialsProviderChain>(__func__);
}
}

void CustomProducerSdkAWSCredentialsProvider::updateCredentials(
com::amazonaws::kinesis::video::Credentials & producer_sdk_credentials)
{
Aws::Auth::AWSCredentials aws_sdk_credentials =
aws_credentials_provider_->GetAWSCredentials();
producer_sdk_credentials.setAccessKey(aws_sdk_credentials.GetAWSAccessKeyId().c_str());
producer_sdk_credentials.setSecretKey(aws_sdk_credentials.GetAWSSecretKey().c_str());
producer_sdk_credentials.setSessionToken(aws_sdk_credentials.GetSessionToken().c_str());
auto now = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch());
auto refresh_interval = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::milliseconds(Aws::Auth::REFRESH_THRESHOLD));
producer_sdk_credentials.setExpiration(now + refresh_interval);
}

} // namespace Kinesis
} // namespace Aws
28 changes: 24 additions & 4 deletions kinesis_video_streamer/src/streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
#include <aws_common/sdk_utils/client_configuration_provider.h>
#include <aws_ros1_common/sdk_utils/logging/aws_ros_logger.h>
#include <aws_ros1_common/sdk_utils/ros1_node_parameter_reader.h>
#include <kinesis_manager/default_callbacks.h>
#include <kinesis_video_streamer/credentials.h>
#include <kinesis_video_streamer/ros_stream_subscription_installer.h>
#include <kinesis_video_streamer/streamer.h>
#include <kinesis_video_streamer/subscriber_callbacks.h>
#include <ros/ros.h>

using namespace Aws::Client;
using namespace Aws::Kinesis;
using namespace com::amazonaws::kinesis::video;


#ifndef RETURN_CODE_MASK
Expand Down Expand Up @@ -71,15 +74,32 @@ KinesisManagerStatus StreamerNode::Initialize()
AWS_LOG_FATAL(__func__, "Failed to set up subscription callbacks.");
return KINESIS_MANAGER_STATUS_ERROR_BASE;
}
auto kinesis_client = std::unique_ptr<KinesisClient>(
Aws::New<Aws::Kinesis::KinesisClientFacade>(__func__, aws_sdk_config));
Aws::Auth::ServiceAuthConfig aws_ros_config;
Aws::Auth::GetServiceAuthConfig(aws_ros_config, parameter_reader_);
auto aws_credentials_provider = Aws::MakeShared<Aws::Auth::CustomAWSCredentialsProviderChain>(__func__, aws_ros_config);
auto kinesis_client = std::make_unique<KinesisClient>(aws_credentials_provider, aws_sdk_config);
stream_manager_ = std::make_shared<KinesisStreamManager>(
parameter_reader_.get(), &stream_definition_provider_, subscription_installer_.get(),
std::move(kinesis_client));
subscription_installer_->set_stream_manager(stream_manager_.get());
/* Initialization of video producer */
KinesisManagerStatus initialize_video_producer_result =
stream_manager_->InitializeVideoProducer(aws_sdk_config.region.c_str());
auto credentials_provider = std::make_unique<CustomProducerSdkAWSCredentialsProvider>(aws_credentials_provider);
KinesisManagerStatus initialize_video_producer_result;
if (!credentials_provider) {
AWS_LOG_ERROR(__func__,
"Credential provider is invalid, have you set the environment variables required "
"for AWS access?");
initialize_video_producer_result = KINESIS_MANAGER_STATUS_DEFAULT_CREDENTIAL_PROVIDER_CREATION_FAILED;
}
else {
auto device_provider = std::make_unique<DefaultDeviceInfoProvider>();
auto client_callback_provider = std::make_unique<DefaultClientCallbackProvider>();
auto stream_callback_provider = std::make_unique<DefaultStreamCallbackProvider>();
initialize_video_producer_result =
stream_manager_->InitializeVideoProducer(aws_sdk_config.region.c_str(),
std::move(device_provider), std::move(client_callback_provider),
std::move(stream_callback_provider), std::move(credentials_provider));
}
if (KINESIS_MANAGER_STATUS_FAILED(initialize_video_producer_result)) {
fprintf(stderr, "Failed to initialize video producer");
AWS_LOGSTREAM_FATAL(__func__, "Failed to initialize video producer. Error code: "
Expand Down