diff --git a/native_with_state/internal/BUILD b/native_with_state/internal/BUILD index a04e2b2..4807cc5 100644 --- a/native_with_state/internal/BUILD +++ b/native_with_state/internal/BUILD @@ -14,6 +14,32 @@ package(default_visibility = ["//visibility:private"]) +cc_library( + name = "conference_data_channel_interface", + hdrs = ["conference_data_channel_interface.h"], + deps = [ + "@com_google_absl//absl/functional:any_invocable", + "@com_google_absl//absl/status", + "@media_api_samples//native_with_state/api:media_api_client_interface", + ], +) + +cc_library( + name = "conference_data_channel", + srcs = ["conference_data_channel.cc"], + hdrs = ["conference_data_channel.h"], + deps = [ + ":conference_data_channel_interface", + ":resource_handler_interface", + "@com_google_absl//absl/log", + "@com_google_absl//absl/status", + "@com_google_absl//absl/status:statusor", + "@com_google_absl//absl/strings:string_view", + "@media_api_samples//native_with_state/api:media_api_client_interface", + "@webrtc", + ], +) + cc_library( name = "resource_handler_interface", hdrs = [ diff --git a/native_with_state/internal/conference_data_channel.cc b/native_with_state/internal/conference_data_channel.cc new file mode 100644 index 0000000..1160150 --- /dev/null +++ b/native_with_state/internal/conference_data_channel.cc @@ -0,0 +1,84 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "native_with_state/internal/conference_data_channel.h" + +#include +#include + +#include "absl/log/log.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/string_view.h" +#include "native_with_state/api/media_api_client_interface.h" +#include "webrtc/api/data_channel_interface.h" +#include "webrtc/api/rtc_error.h" + +namespace meet { + +void ConferenceDataChannel::OnMessage(const webrtc::DataBuffer& buffer) { + // Short-circuit if there is no callback for the message. + if (!callback_) { + LOG(WARNING) << label() + << " data channel received message but has no callback."; + return; + } + + // Meet servers should always send JSON updates. + if (buffer.binary) { + LOG(ERROR) << label() << " data channel received unexpected binary update."; + return; + } + + absl::string_view message(buffer.data.cdata(), buffer.size()); + absl::StatusOr update_parse_status = + resource_handler_->ParseUpdate(message); + if (!update_parse_status.ok()) { + LOG(ERROR) << "Received " << label() + << " resource update but it failed to parse: " + << update_parse_status.status(); + return; + } + + VLOG(1) << label() << " data channel received update: " << message; + + callback_(*std::move(update_parse_status)); +} + +absl::Status ConferenceDataChannel::SendRequest(ResourceRequest request) { + absl::StatusOr stringify_status = + resource_handler_->StringifyRequest(request); + if (!stringify_status.ok()) { + return stringify_status.status(); + } + + VLOG(1) << "Sending " << label() << " request: " << *stringify_status; + + data_channel_->SendAsync( + // Closing the peer connection cancels any pending blocks until any + // pending tasks complete. Therefore, the conference data channel will + // exist if this is called and null references should not be possible. + webrtc::DataBuffer(*std::move(stringify_status)), + [this](webrtc::RTCError error) { + if (!error.ok()) { + LOG(ERROR) << "Error sending request via data channel: " << label() + << " " << error.message(); + } + }); + return absl::OkStatus(); +} + +} // namespace meet diff --git a/native_with_state/internal/conference_data_channel.h b/native_with_state/internal/conference_data_channel.h new file mode 100644 index 0000000..d9a676f --- /dev/null +++ b/native_with_state/internal/conference_data_channel.h @@ -0,0 +1,93 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NATIVE_WITH_STATE_INTERNAL_CONFERENCE_DATA_CHANNEL_H_ +#define NATIVE_WITH_STATE_INTERNAL_CONFERENCE_DATA_CHANNEL_H_ + +#include +#include +#include + +#include "absl/log/log.h" +#include "absl/status/status.h" +#include "native_with_state/api/media_api_client_interface.h" +#include "native_with_state/internal/conference_data_channel_interface.h" +#include "native_with_state/internal/resource_handler_interface.h" +#include "webrtc/api/data_channel_interface.h" +#include "webrtc/api/scoped_refptr.h" + +namespace meet { + +// A wrapper around a `webrtc::DataChannelInterface` that provides a simplified +// interface for sending resource requests and receiving resource updates. +// +// This class closes the underlying data channel when it is destroyed. +class ConferenceDataChannel : public ConferenceDataChannelInterface, + public webrtc::DataChannelObserver { + public: + ConferenceDataChannel( + std::unique_ptr resource_handler, + rtc::scoped_refptr data_channel) + : resource_handler_(std::move(resource_handler)), + data_channel_(std::move(data_channel)) { + data_channel_->RegisterObserver(this); + }; + + ~ConferenceDataChannel() override { data_channel_->Close(); } + + // ConferenceDataChannel is neither copyable nor movable. + ConferenceDataChannel(const ConferenceDataChannel&) = delete; + ConferenceDataChannel& operator=(const ConferenceDataChannel&) = delete; + + void OnStateChange() override { + LOG(INFO) << "ConferenceDataChannel::OnStateChange: " + << data_channel_->state(); + }; + + void OnMessage(const webrtc::DataBuffer& buffer) override; + + // Future WebRTC updates will force this to always be true. Ensure that + // current behavior reflects desired future behavior. + bool IsOkToCallOnTheNetworkThread() override { return true; } + + // Sets the callback for receiving resource updates from the resource data + // channel. + // + // The callback is called on the associated peer connection's network thread. + // + // Resource data channels can only have one callback at a time, and the + // callback must outlive the resource data channel if one is set. + // + // Setting the callback is not thread-safe, so it should only be called before + // the resource data channel is used (i.e. before the peer connection is + // started). + void SetCallback(ResourceUpdateCallback callback) override { + callback_ = std::move(callback); + } + + absl::Status SendRequest(ResourceRequest request) override; + + private: + std::string label() const { return data_channel_->label(); } + + ResourceUpdateCallback callback_; + std::unique_ptr resource_handler_; + rtc::scoped_refptr data_channel_; +}; + +} // namespace meet + +#endif // NATIVE_WITH_STATE_INTERNAL_CONFERENCE_DATA_CHANNEL_H_ diff --git a/native_with_state/internal/conference_data_channel_interface.h b/native_with_state/internal/conference_data_channel_interface.h new file mode 100644 index 0000000..231a61b --- /dev/null +++ b/native_with_state/internal/conference_data_channel_interface.h @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NATIVE_WITH_STATE_INTERNAL_CONFERENCE_DATA_CHANNEL_INTERFACE_H_ +#define NATIVE_WITH_STATE_INTERNAL_CONFERENCE_DATA_CHANNEL_INTERFACE_H_ + +#include "absl/functional/any_invocable.h" +#include "absl/status/status.h" +#include "native_with_state/api/media_api_client_interface.h" + +namespace meet { + +// Interface for sending resource requests and receiving resource updates to and +// from Meet servers. +class ConferenceDataChannelInterface { + public: + using ResourceUpdateCallback = absl::AnyInvocable; + + virtual ~ConferenceDataChannelInterface() = default; + + // Sets the callback for receiving resource updates from Meet servers. + virtual void SetCallback(ResourceUpdateCallback callback) = 0; + + // Sends a resource request to Meet servers. + virtual absl::Status SendRequest(ResourceRequest request) = 0; +}; + +} // namespace meet + +#endif // NATIVE_WITH_STATE_INTERNAL_CONFERENCE_DATA_CHANNEL_INTERFACE_H_ diff --git a/native_with_state/internal/conference_data_channel_test.cc b/native_with_state/internal/conference_data_channel_test.cc new file mode 100644 index 0000000..3b88882 --- /dev/null +++ b/native_with_state/internal/conference_data_channel_test.cc @@ -0,0 +1,245 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "native_with_state/internal/conference_data_channel.h" + +#include +#include +#include +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "testing/base/public/mock-log.h" +#include "absl/base/log_severity.h" +#include "absl/functional/any_invocable.h" +#include "absl/log/log.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/string_view.h" +#include "native_with_state/api/media_api_client_interface.h" +#include "native_with_state/api/session_control_resource.h" +#include "native_with_state/internal/resource_handler_interface.h" +#include "webrtc/api/data_channel_interface.h" +#include "webrtc/api/rtc_error.h" +#include "webrtc/api/scoped_refptr.h" +#include "webrtc/api/test/mock_data_channel.h" + +namespace meet { +namespace { + +using ::base_logging::ERROR; +using ::base_logging::WARNING; +using ::testing::_; +using ::testing::HasSubstr; +using ::testing::kDoNotCaptureLogsYet; +using ::testing::MockFunction; +using ::testing::NotNull; +using ::testing::Return; +using ::testing::ScopedMockLog; +using ::testing::StrEq; +using ::testing::status::StatusIs; + +class MockResourceHandler : public ResourceHandlerInterface { + public: + MOCK_METHOD(absl::StatusOr, ParseUpdate, + (absl::string_view update), (override)); + MOCK_METHOD(absl::StatusOr, StringifyRequest, + (const ResourceRequest &request), (override)); +}; + +TEST(ConferenceDataChannelTest, SendRequestSucceeds) { + auto resource_handler = std::make_unique(); + EXPECT_CALL(*resource_handler, StringifyRequest(_)) + .WillOnce(Return("test-request")); + auto data_channel = webrtc::MockDataChannelInterface::Create(); + std::string sent_message; + EXPECT_CALL(*data_channel, SendAsync) + .WillOnce([&sent_message]( + webrtc::DataBuffer buffer, + absl::AnyInvocable on_complete) { + sent_message = std::string(buffer.data.cdata(), buffer.size()); + std::move(on_complete)(webrtc::RTCError::OK()); + }); + ConferenceDataChannel conference_data_channel(std::move(resource_handler), + std::move(data_channel)); + + absl::Status status = + conference_data_channel.SendRequest(SessionControlChannelFromClient()); + + EXPECT_OK(status); + EXPECT_THAT(sent_message, StrEq("test-request")); +} + +TEST(ConferenceDataChannelTest, SendRequestFailsWhenParsingFails) { + auto resource_handler = std::make_unique(); + EXPECT_CALL(*resource_handler, StringifyRequest(_)) + .WillOnce(Return(absl::InternalError("test-error"))); + auto data_channel = webrtc::MockDataChannelInterface::Create(); + EXPECT_CALL(*data_channel, SendAsync).Times(0); + ConferenceDataChannel conference_data_channel(std::move(resource_handler), + std::move(data_channel)); + + absl::Status status = + conference_data_channel.SendRequest(SessionControlChannelFromClient()); + + EXPECT_THAT(status, StatusIs(absl::StatusCode::kInternal, "test-error")); +} + +TEST(ConferenceDataChannelTest, SendRequestLogsErrorWhenTransmissionFails) { + auto resource_handler = std::make_unique(); + EXPECT_CALL(*resource_handler, StringifyRequest(_)) + .WillOnce(Return("test-request")); + auto data_channel = webrtc::MockDataChannelInterface::Create(); + EXPECT_CALL(*data_channel, SendAsync) + .WillOnce([](webrtc::DataBuffer /*buffer*/, + absl::AnyInvocable on_complete) { + std::move(on_complete)(webrtc::RTCError( + webrtc::RTCErrorType::INTERNAL_ERROR, "test-error")); + }); + ScopedMockLog log(kDoNotCaptureLogsYet); + std::string message; + EXPECT_CALL(log, Log(ERROR, _, _)) + .WillOnce([&message](int, const std::string &, const std::string &msg) { + message = msg; + }); + log.StartCapturingLogs(); + ConferenceDataChannel conference_data_channel(std::move(resource_handler), + std::move(data_channel)); + + absl::Status status = + conference_data_channel.SendRequest(SessionControlChannelFromClient()); + + EXPECT_OK(status); + EXPECT_THAT(message, HasSubstr("test-error")); +} + +TEST(ConferenceDataChannelTest, + CreatingConferenceDataChannelRegistersObserver) { + auto data_channel = webrtc::MockDataChannelInterface::Create(); + webrtc::DataChannelObserver *observer; + EXPECT_CALL(*data_channel, RegisterObserver(_)) + .WillOnce([&observer](webrtc::DataChannelObserver *inner_observer) { + observer = inner_observer; + }); + + ConferenceDataChannel conference_data_channel( + std::make_unique(), std::move(data_channel)); + + EXPECT_THAT(observer, NotNull()); +} + +TEST(ConferenceDataChannelTest, ReceivingUpdateSucceeds) { + auto resource_handler = std::make_unique(); + EXPECT_CALL(*resource_handler, ParseUpdate(_)) + .WillOnce(Return(SessionControlChannelToClient())); + auto data_channel = webrtc::MockDataChannelInterface::Create(); + webrtc::DataChannelObserver *observer; + EXPECT_CALL(*data_channel, RegisterObserver(_)) + .WillOnce([&observer](webrtc::DataChannelObserver *inner_observer) { + observer = inner_observer; + }); + ConferenceDataChannel conference_data_channel(std::move(resource_handler), + std::move(data_channel)); + ResourceUpdate received_update; + MockFunction mock_function; + EXPECT_CALL(mock_function, Call) + .WillOnce([&received_update](ResourceUpdate update) { + received_update = std::move(update); + }); + conference_data_channel.SetCallback(mock_function.AsStdFunction()); + + observer->OnMessage(webrtc::DataBuffer("test-update")); + + EXPECT_TRUE( + std::holds_alternative(received_update)); +} + +TEST(ConferenceDataChannelTest, ReceivingUpdateWithoutCallbackDoesNothing) { + auto data_channel = webrtc::MockDataChannelInterface::Create(); + webrtc::DataChannelObserver *observer; + EXPECT_CALL(*data_channel, RegisterObserver(_)) + .WillOnce([&observer](webrtc::DataChannelObserver *inner_observer) { + observer = inner_observer; + }); + ConferenceDataChannel conference_data_channel( + std::make_unique(), std::move(data_channel)); + ScopedMockLog log(kDoNotCaptureLogsYet); + std::string message; + EXPECT_CALL(log, Log(WARNING, _, _)) + .WillOnce([&message](int, const std::string &, const std::string &msg) { + message = msg; + }); + log.StartCapturingLogs(); + + observer->OnMessage(webrtc::DataBuffer("test-update")); + + EXPECT_THAT(message, + HasSubstr("data channel received message but has no callback.")); +} + +TEST(ConferenceDataChannelTest, ReceivingUpdateFailsWhenReceivingBinaryData) { + auto data_channel = webrtc::MockDataChannelInterface::Create(); + webrtc::DataChannelObserver *observer; + EXPECT_CALL(*data_channel, RegisterObserver(_)) + .WillOnce([&observer](webrtc::DataChannelObserver *inner_observer) { + observer = inner_observer; + }); + ConferenceDataChannel conference_data_channel( + std::make_unique(), std::move(data_channel)); + conference_data_channel.SetCallback([](ResourceUpdate /*update*/) {}); + ScopedMockLog log(kDoNotCaptureLogsYet); + std::string message; + EXPECT_CALL(log, Log(ERROR, _, _)) + .WillOnce([&message](int, const std::string &, const std::string &msg) { + message = msg; + }); + log.StartCapturingLogs(); + + observer->OnMessage(webrtc::DataBuffer("test-update", true)); + + EXPECT_THAT(message, + HasSubstr("data channel received unexpected binary update.")); +} + +TEST(ConferenceDataChannelTest, ReceivingUpdateFailsWhenParsingFails) { + auto resource_handler = std::make_unique(); + EXPECT_CALL(*resource_handler, ParseUpdate(_)) + .WillOnce(Return(absl::InternalError("parsing-error"))); + auto data_channel = webrtc::MockDataChannelInterface::Create(); + webrtc::DataChannelObserver *observer; + EXPECT_CALL(*data_channel, RegisterObserver(_)) + .WillOnce([&observer](webrtc::DataChannelObserver *inner_observer) { + observer = inner_observer; + }); + ConferenceDataChannel conference_data_channel(std::move(resource_handler), + std::move(data_channel)); + conference_data_channel.SetCallback([](ResourceUpdate /*update*/) {}); + ScopedMockLog log(kDoNotCaptureLogsYet); + std::string message; + EXPECT_CALL(log, Log(ERROR, _, _)) + .WillOnce([&message](int, const std::string &, const std::string &msg) { + message = msg; + }); + log.StartCapturingLogs(); + + observer->OnMessage(webrtc::DataBuffer("test-update")); + + EXPECT_THAT(message, HasSubstr("parsing-error")); +} + +} // namespace +} // namespace meet