diff --git a/README.md b/README.md index 4e09353..79ff2e0 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,7 @@ An example launch file called `sample_application.launch` is included in this pr - `roslaunch `[`raspicam_node`]`camerav2_410x308_30fps.launch` - `roslaunch h264_video_encoder sample_application.launch` - `roslaunch kinesis_video_streamer sample_application.launch` + - Start/stop streaming with `rosservice call /kinesis_video_streamer/command "enable: true/false"` - Log into your AWS Console to see the availabe Kinesis Video stream. - For other platforms, replace step 1 with an equivalent command to launch your camera node. Reconfigure the topic names accordingly. @@ -142,6 +143,7 @@ The parameters below apply to the node as a whole and are not specific to any on | Parameter Name | Description | Type | | -------------- | -----------------------------------------------------------| ------------- | | aws_client_configuration/region | The AWS region which the video should be streamed to. | *string* | +| kinesis_video/enabled | Enable/disable on start-up. | *bool* | | kinesis_video/stream_count | The number of streams you wish to load and transmit. Each stream should have its corresponding parameter set as described below. | *int* | | kinesis_video/log4cplus_config | (optional) Config file path for the log4cplus logger, which is used by the Kinesis Video Producer SDK. | *string* | diff --git a/kinesis_video_streamer/CMakeLists.txt b/kinesis_video_streamer/CMakeLists.txt index bb57b7d..43f1a14 100755 --- a/kinesis_video_streamer/CMakeLists.txt +++ b/kinesis_video_streamer/CMakeLists.txt @@ -21,8 +21,21 @@ find_package(catkin REQUIRED COMPONENTS kinesis_video_msgs roscpp image_transport + message_generation std_msgs) -catkin_package() + + +add_service_files( + DIRECTORY srv + FILES + Command.srv +) +generate_messages() + +catkin_package( + CATKIN_DEPENDS + message_runtime +) ########### ## Build ## @@ -37,6 +50,9 @@ target_include_directories(${PROJECT_NAME}_lib PUBLIC include ${catkin_INCLUDE_D ## Specify libraries to link a library or executable target against target_link_libraries(${PROJECT_NAME} ${catkin_LIBRARIES} ${kinesis_manager_LIBRARIES} ${PRODUCER_LIBRARY} ${LOG_LIBRARY} ${CURL_LIBRARIES}) target_link_libraries(${PROJECT_NAME}_lib ${catkin_LIBRARIES} ${kinesis_manager_LIBRARIES} ${PRODUCER_LIBRARY} ${CURL_LIBRARIES}) +## Specify dependencies +add_dependencies(${PROJECT_NAME} ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS}) +add_dependencies(${PROJECT_NAME}_lib ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS}) ############# ## Install ## diff --git a/kinesis_video_streamer/include/kinesis_video_streamer/streamer.h b/kinesis_video_streamer/include/kinesis_video_streamer/streamer.h index a593e39..c81c82f 100644 --- a/kinesis_video_streamer/include/kinesis_video_streamer/streamer.h +++ b/kinesis_video_streamer/include/kinesis_video_streamer/streamer.h @@ -16,6 +16,7 @@ #include #include +#include namespace Aws { namespace Kinesis { @@ -40,6 +41,14 @@ class StreamerNode : public ros::NodeHandle KinesisManagerStatus InitializeStreamSubscriptions(); + KinesisManagerStatus UninitializeStreamSubscriptions(); + + bool CommandCb(kinesis_video_streamer::Command::Request &req, kinesis_video_streamer::Command::Response &res); + + bool Command(bool enable); + + void PublishStatus(bool enable); + void Spin(); void set_subscription_installer(std::shared_ptr subscription_installer); @@ -49,6 +58,9 @@ class StreamerNode : public ros::NodeHandle std::shared_ptr subscription_installer_; std::shared_ptr stream_manager_; StreamDefinitionProvider stream_definition_provider_; + ros::ServiceServer srv_command_; + ros::Publisher pub_enabled_; + bool enabled_; }; } // namespace Kinesis diff --git a/kinesis_video_streamer/launch/kinesis_video_streamer.launch b/kinesis_video_streamer/launch/kinesis_video_streamer.launch index d3da73b..df13d4b 100755 --- a/kinesis_video_streamer/launch/kinesis_video_streamer.launch +++ b/kinesis_video_streamer/launch/kinesis_video_streamer.launch @@ -6,6 +6,7 @@ @param stream_config Configuration for the (first) stream. If provided, rosparam will attempt to load the file into the private namespace of the node. Otherwise, the example configuration file 'sample_configuration.yaml' will be loaded. @param aws_client_configuration/region Defaults to us-west-2. + @param kinesis_video/enabled Enable/disable on start-up. @param kinesis_video/stream_count Number of streams to load & transmit. Stream definition should be provided for each stream. @param kinesis_video/log4cplus_config Optional path for a log4cplus config which will be used by the Kinesis Video Producer SDK. @param kinesis_video/stream/ as described in https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/how-data.html#how-data-header-streamdefinition @@ -26,5 +27,6 @@ + diff --git a/kinesis_video_streamer/package.xml b/kinesis_video_streamer/package.xml index 110ff0e..86719eb 100755 --- a/kinesis_video_streamer/package.xml +++ b/kinesis_video_streamer/package.xml @@ -12,6 +12,10 @@ catkin + message_generation + + message_runtime + aws_common aws_ros1_common kinesis_manager diff --git a/kinesis_video_streamer/src/main.cpp b/kinesis_video_streamer/src/main.cpp index b5f10e3..d882c15 100644 --- a/kinesis_video_streamer/src/main.cpp +++ b/kinesis_video_streamer/src/main.cpp @@ -61,11 +61,6 @@ int main(int argc, char * argv[]) return shutdown(options, status); } - status = streamer.InitializeStreamSubscriptions(); - if (!KINESIS_MANAGER_STATUS_SUCCEEDED(status)) { - return shutdown(options, status); - } - AWS_LOG_INFO(__func__, "Starting Kinesis Video Node..."); streamer.Spin(); diff --git a/kinesis_video_streamer/src/streamer.cpp b/kinesis_video_streamer/src/streamer.cpp index 5c9f600..c7aed12 100644 --- a/kinesis_video_streamer/src/streamer.cpp +++ b/kinesis_video_streamer/src/streamer.cpp @@ -25,6 +25,7 @@ #include #include #include +#include using namespace Aws::Client; using namespace Aws::Kinesis; @@ -86,7 +87,20 @@ KinesisManagerStatus StreamerNode::Initialize() << initialize_video_producer_result); return initialize_video_producer_result; } - + // Set up command service server + srv_command_ = advertiseService("command", &StreamerNode::CommandCb, this); + AWS_LOGSTREAM_INFO(__func__, "Service server created: " << srv_command_.getService()); + // Enable on start if needed + parameter_reader_->ReadParam(GetKinesisVideoParameter("enabled"), enabled_); + pub_enabled_ = advertise("enabled", 1, true); // Latched + if (enabled_) { + KinesisManagerStatus status = InitializeStreamSubscriptions(); + if (KINESIS_MANAGER_STATUS_FAILED(status)) { + return status; + } + } + PublishStatus(enabled_); + return KINESIS_MANAGER_STATUS_SUCCESS; } @@ -106,6 +120,65 @@ KinesisManagerStatus StreamerNode::InitializeStreamSubscriptions() return KINESIS_MANAGER_STATUS_SUCCESS; } +KinesisManagerStatus StreamerNode::UninitializeStreamSubscriptions() +{ + // Unsubscribe and free all Kinesis streams to stop streaming + int video_stream_count = 0; + parameter_reader_->ReadParam(GetKinesisVideoParameter(kStreamParameters.stream_count), video_stream_count); + for (int stream_idx = 0; stream_idx < video_stream_count; stream_idx++) { + std::string stream_name, topic_name; + parameter_reader_->ReadParam(GetStreamParameterPath(stream_idx, kStreamParameters.stream_name), stream_name); + parameter_reader_->ReadParam(GetStreamParameterPath(stream_idx, kStreamParameters.topic_name), topic_name); + subscription_installer_->Uninstall(topic_name); + stream_manager_->FreeStream(stream_name); + } + return KINESIS_MANAGER_STATUS_SUCCESS; +} + +bool StreamerNode::CommandCb(kinesis_video_streamer::Command::Request &req, kinesis_video_streamer::Command::Response &res) +{ + // Request node shutdown if start/stop streaming failed + if (!Command(req.enable)) { + ros::requestShutdown(); + return false; + } + else + return true; +} + +bool StreamerNode::Command(bool enable) +{ + // Act only on mode switch + if (enable == enabled_) + return true; + + KinesisManagerStatus status; + // Enable + if (enable) { + status = InitializeStreamSubscriptions(); // Can partially succeed, but returned status will be failed + } + // Disable + else { + status = UninitializeStreamSubscriptions(); + } + + // Set enabled if succeeded only + if (KINESIS_MANAGER_STATUS_SUCCEEDED(status)) { + enabled_ = enable; + PublishStatus(enabled_); + return true; + } + else + return false; +} + +void StreamerNode::PublishStatus(bool enable) +{ + std_msgs::Bool msg; + msg.data = enable; + pub_enabled_.publish(msg); +} + void StreamerNode::Spin() { uint32_t spinner_thread_count = kDefaultNumberOfSpinnerThreads; diff --git a/kinesis_video_streamer/srv/Command.srv b/kinesis_video_streamer/srv/Command.srv new file mode 100644 index 0000000..1ce10cb --- /dev/null +++ b/kinesis_video_streamer/srv/Command.srv @@ -0,0 +1,4 @@ +# Command Kinesis video stream node + +bool enable +---