From fb2d78ed9700c3983d94d5771dae95ef211de704 Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Mon, 20 Jun 2022 08:07:58 -0500 Subject: [PATCH 1/8] Adding combo node kind --- README.md | 2 +- src/Configuration.cpp | 4 ++++ src/Configuration.h | 1 + src/JanusFtl.cpp | 12 ++++++------ vendor/cpp-httplib | 1 + 5 files changed, 13 insertions(+), 7 deletions(-) create mode 160000 vendor/cpp-httplib diff --git a/README.md b/README.md index 8af600b..5506634 100644 --- a/README.md +++ b/README.md @@ -102,7 +102,7 @@ Configuration is achieved through environment variables. | Environment Variable | Supported Values | Notes | | :--------------------- | :--------------- | :---------------- | | `FTL_HOSTNAME` | Valid hostname | The hostname of the machine running the FTL service. Defaults to system hostname. | -| `FTL_NODE_KIND` | `Standalone`: (default) This instance will listen for incoming FTL connections and transmit them to WebRTC clients.
`Ingest`: This instance will listen for incoming FTL connections and relay them to other nodes when instructed by an Orchestrator service.
`Edge`: This instance will receive stream relays from other nodes and transmit them to WebRTC clients. | This configuration value controls the behavior of the FTL plugin when used in conjunction with an [Orchestrator service](https://github.com/Glimesh/janus-ftl-orchestrator). | +| `FTL_NODE_KIND` | `Standalone`: (default) This instance will listen for incoming FTL connections and transmit them to WebRTC clients.
`Ingest`: This instance will listen for incoming FTL connections and relay them to other nodes when instructed by an Orchestrator service.
`Edge`: This instance will receive stream relays from other nodes and transmit them to WebRTC clients.
`Combo`: This instance will listen for incoming FTL connections, serve streams from itself and other relays. | This configuration value controls the behavior of the FTL plugin when used in conjunction with an [Orchestrator service](https://github.com/Glimesh/janus-ftl-orchestrator). | | `FTL_ORCHESTRATOR_HOSTNAME` | Valid hostname | The hostname of the Orchestrator service to connect to for stream relay information. | | `FTL_ORCHESTRATOR_PORT` | Port number, `1`-`65535`. | The port number to use when connecting to the Orchestrator service. | | `FTL_ORCHESTRATOR_PSK` | String of arbitrary hex values (ex. `001122334455ff`) | This is the pre-shared key used to establish a secure TLS1.3 connection to the Orchestrator service. | diff --git a/src/Configuration.cpp b/src/Configuration.cpp index 8568cca..f807d9d 100644 --- a/src/Configuration.cpp +++ b/src/Configuration.cpp @@ -70,6 +70,10 @@ void Configuration::Load() { nodeKind = NodeKind::Edge; } + else if (nodeKindStr.compare("combo") == 0) + { + nodeKind = NodeKind::Combo; + } } // FTL_ORCHESTRATOR_HOSTNAME -> OrchestratorHostname diff --git a/src/Configuration.h b/src/Configuration.h index 08ae3e5..00a07cd 100644 --- a/src/Configuration.h +++ b/src/Configuration.h @@ -22,6 +22,7 @@ enum class NodeKind Ingest = 1, // Relay = 2, Edge = 3, + Combo = 4, }; enum class ServiceConnectionKind diff --git a/src/JanusFtl.cpp b/src/JanusFtl.cpp index e63a8cb..d9ac077 100644 --- a/src/JanusFtl.cpp +++ b/src/JanusFtl.cpp @@ -271,7 +271,7 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) // If we're an Edge node and there are no more viewers for this channel, we can // unsubscribe. - if ((configuration->GetNodeKind() == NodeKind::Edge) && + if ((configuration->GetNodeKind() == NodeKind::Edge || configuration->GetNodeKind() == NodeKind::Combo) && (watchingStream->GetViewerCount() == 0)) { orchestratorUnsubscribe = true; @@ -287,7 +287,7 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) // If this was the last pending viewer for this channel, unsubscribe. if (pendingViewerSessions[channelId].size() == 0) { - if (configuration->GetNodeKind() == NodeKind::Edge) + if (configuration->GetNodeKind() == NodeKind::Edge || configuration->GetNodeKind() == NodeKind::Combo) { orchestratorUnsubscribe = true; } @@ -376,7 +376,7 @@ Result JanusFtl::ftlServerStreamStarted( // TODO: Notify viewer sessions // If we are configured as an Ingest node, notify the Orchestrator that a stream has started. - if ((configuration->GetNodeKind() == NodeKind::Ingest) && (orchestrationClient != nullptr)) + if ((configuration->GetNodeKind() == NodeKind::Ingest || configuration->GetNodeKind() == NodeKind::Combo) && (orchestrationClient != nullptr)) { spdlog::info("Publishing channel {} / stream {} to Orchestrator...", channelId, streamId); @@ -451,7 +451,7 @@ void JanusFtl::initOrchestratorConnection() void JanusFtl::initServiceConnection() { // If we are configured to be an edge node, we *must* use the EdgeNodeServiceConnection - if (configuration->GetNodeKind() == NodeKind::Edge) + if (configuration->GetNodeKind() == NodeKind::Edge || configuration->GetNodeKind() == NodeKind::Combo) { serviceConnection = std::make_shared(); } @@ -677,7 +677,7 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId, // TODO: Tell viewers stream is offline. // If we are configured as an Ingest node, notify the Orchestrator that a stream has ended. - if ((configuration->GetNodeKind() == NodeKind::Ingest) && (orchestrationClient != nullptr)) + if ((configuration->GetNodeKind() == NodeKind::Ingest || configuration->GetNodeKind() == NodeKind::Combo) && (orchestrationClient != nullptr)) { spdlog::info("Unpublishing channel {} / stream {} from Orchestrator", stream->GetChannelId(), stream->GetStreamId()); @@ -749,7 +749,7 @@ janus_plugin_result* JanusFtl::handleWatchMessage(ActiveSession& session, JsonPt // If we're an Edge node and this is a first viewer for a given channel, // request that this channel be relayed to us. - if ((configuration->GetNodeKind() == NodeKind::Edge) && (pendingViewers == 0)) + if ((configuration->GetNodeKind() == NodeKind::Edge || configuration->GetNodeKind() == NodeKind::Combo) && (pendingViewers == 0)) { // Generate a new stream key for incoming relay of this channel const auto& edgeServiceConnection = diff --git a/vendor/cpp-httplib b/vendor/cpp-httplib new file mode 160000 index 0000000..cf475bc --- /dev/null +++ b/vendor/cpp-httplib @@ -0,0 +1 @@ +Subproject commit cf475bcb505678046d53f0e0575a9efaa5b227f9 From 88785170c300127f223d2d114fe3aba5ac483a57 Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Mon, 20 Jun 2022 08:23:49 -0500 Subject: [PATCH 2/8] Removing unused vendor file --- vendor/cpp-httplib | 1 - 1 file changed, 1 deletion(-) delete mode 160000 vendor/cpp-httplib diff --git a/vendor/cpp-httplib b/vendor/cpp-httplib deleted file mode 160000 index cf475bc..0000000 --- a/vendor/cpp-httplib +++ /dev/null @@ -1 +0,0 @@ -Subproject commit cf475bcb505678046d53f0e0575a9efaa5b227f9 From 413f2119ab401f528c8e7a51399de2f4e7aaab11 Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Tue, 21 Jun 2022 07:16:45 -0500 Subject: [PATCH 3/8] Removing the edge service connection handler for combo --- src/JanusFtl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/JanusFtl.cpp b/src/JanusFtl.cpp index d9ac077..c43be6f 100644 --- a/src/JanusFtl.cpp +++ b/src/JanusFtl.cpp @@ -451,7 +451,7 @@ void JanusFtl::initOrchestratorConnection() void JanusFtl::initServiceConnection() { // If we are configured to be an edge node, we *must* use the EdgeNodeServiceConnection - if (configuration->GetNodeKind() == NodeKind::Edge || configuration->GetNodeKind() == NodeKind::Combo) + if (configuration->GetNodeKind() == NodeKind::Edge) { serviceConnection = std::make_shared(); } From 0c31cd43910d002c41050b3f18fd10f1a3a8bb51 Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Fri, 24 Jun 2022 14:31:26 -0400 Subject: [PATCH 4/8] Different approach, removing combo and adding orchestrationEnabled --- README.md | 2 +- src/Configuration.cpp | 4 ---- src/Configuration.h | 1 - src/JanusFtl.cpp | 44 +++++++++++++++++++++++-------------------- src/JanusFtl.h | 4 +++- 5 files changed, 28 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 5506634..8af600b 100644 --- a/README.md +++ b/README.md @@ -102,7 +102,7 @@ Configuration is achieved through environment variables. | Environment Variable | Supported Values | Notes | | :--------------------- | :--------------- | :---------------- | | `FTL_HOSTNAME` | Valid hostname | The hostname of the machine running the FTL service. Defaults to system hostname. | -| `FTL_NODE_KIND` | `Standalone`: (default) This instance will listen for incoming FTL connections and transmit them to WebRTC clients.
`Ingest`: This instance will listen for incoming FTL connections and relay them to other nodes when instructed by an Orchestrator service.
`Edge`: This instance will receive stream relays from other nodes and transmit them to WebRTC clients.
`Combo`: This instance will listen for incoming FTL connections, serve streams from itself and other relays. | This configuration value controls the behavior of the FTL plugin when used in conjunction with an [Orchestrator service](https://github.com/Glimesh/janus-ftl-orchestrator). | +| `FTL_NODE_KIND` | `Standalone`: (default) This instance will listen for incoming FTL connections and transmit them to WebRTC clients.
`Ingest`: This instance will listen for incoming FTL connections and relay them to other nodes when instructed by an Orchestrator service.
`Edge`: This instance will receive stream relays from other nodes and transmit them to WebRTC clients. | This configuration value controls the behavior of the FTL plugin when used in conjunction with an [Orchestrator service](https://github.com/Glimesh/janus-ftl-orchestrator). | | `FTL_ORCHESTRATOR_HOSTNAME` | Valid hostname | The hostname of the Orchestrator service to connect to for stream relay information. | | `FTL_ORCHESTRATOR_PORT` | Port number, `1`-`65535`. | The port number to use when connecting to the Orchestrator service. | | `FTL_ORCHESTRATOR_PSK` | String of arbitrary hex values (ex. `001122334455ff`) | This is the pre-shared key used to establish a secure TLS1.3 connection to the Orchestrator service. | diff --git a/src/Configuration.cpp b/src/Configuration.cpp index f807d9d..8568cca 100644 --- a/src/Configuration.cpp +++ b/src/Configuration.cpp @@ -70,10 +70,6 @@ void Configuration::Load() { nodeKind = NodeKind::Edge; } - else if (nodeKindStr.compare("combo") == 0) - { - nodeKind = NodeKind::Combo; - } } // FTL_ORCHESTRATOR_HOSTNAME -> OrchestratorHostname diff --git a/src/Configuration.h b/src/Configuration.h index 00a07cd..08ae3e5 100644 --- a/src/Configuration.h +++ b/src/Configuration.h @@ -22,7 +22,6 @@ enum class NodeKind Ingest = 1, // Relay = 2, Edge = 3, - Combo = 4, }; enum class ServiceConnectionKind diff --git a/src/JanusFtl.cpp b/src/JanusFtl.cpp index c43be6f..70057a4 100644 --- a/src/JanusFtl.cpp +++ b/src/JanusFtl.cpp @@ -60,7 +60,7 @@ JanusFtl::JanusFtl( initOrchestratorConnection(); - initServiceConnection(); + initServiceConnections(); ftlServer = std::make_unique(std::move(ingestControlListener), std::move(mediaConnectionCreator), @@ -271,8 +271,7 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) // If we're an Edge node and there are no more viewers for this channel, we can // unsubscribe. - if ((configuration->GetNodeKind() == NodeKind::Edge || configuration->GetNodeKind() == NodeKind::Combo) && - (watchingStream->GetViewerCount() == 0)) + if (orchestrationEnabled && (watchingStream->GetViewerCount() == 0)) { orchestratorUnsubscribe = true; } @@ -287,7 +286,7 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) // If this was the last pending viewer for this channel, unsubscribe. if (pendingViewerSessions[channelId].size() == 0) { - if (configuration->GetNodeKind() == NodeKind::Edge || configuration->GetNodeKind() == NodeKind::Combo) + if (orchestrationEnabled) { orchestratorUnsubscribe = true; } @@ -299,14 +298,14 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) if (orchestratorUnsubscribe) { // Remove temporary stream key - const auto& edgeServiceConnection = - std::dynamic_pointer_cast(serviceConnection); - if (edgeServiceConnection == nullptr) + const auto& edgeService = + std::dynamic_pointer_cast(edgeServiceConnection); + if (edgeService == nullptr) { throw std::runtime_error( "Unexpected service connection type - expected EdgeNodeServiceConnection."); } - edgeServiceConnection->ClearStreamKey(channelId); + edgeService->ClearStreamKey(channelId); spdlog::info("Last viewer for channel {} has disconnected - unsubscribing...", channelId); @@ -376,7 +375,7 @@ Result JanusFtl::ftlServerStreamStarted( // TODO: Notify viewer sessions // If we are configured as an Ingest node, notify the Orchestrator that a stream has started. - if ((configuration->GetNodeKind() == NodeKind::Ingest || configuration->GetNodeKind() == NodeKind::Combo) && (orchestrationClient != nullptr)) + if (orchestrationEnabled && (orchestrationClient != nullptr)) { spdlog::info("Publishing channel {} / stream {} to Orchestrator...", channelId, streamId); @@ -410,7 +409,7 @@ void JanusFtl::initVideoDecoders() void JanusFtl::initOrchestratorConnection() { - if (configuration->GetNodeKind() != NodeKind::Standalone) + if (configuration->GetOrchestratorHostname() != nullptr) { spdlog::info( "Connecting to Orchestration service @ {}:{}...", @@ -445,17 +444,22 @@ void JanusFtl::initOrchestratorConnection() .RegionCode = configuration->GetOrchestratorRegionCode(), .Hostname = configuration->GetMyHostname(), }); + + orchestrationEnabled = true; } } -void JanusFtl::initServiceConnection() +void JanusFtl::initServiceConnections() { - // If we are configured to be an edge node, we *must* use the EdgeNodeServiceConnection - if (configuration->GetNodeKind() == NodeKind::Edge) + // If we are configured to be an standalone or edge node, we need to setup the EdgeNodeServiceConnection + if (configuration->GetNodeKind() == NodeKind::Standalone || configuration->GetNodeKind() == NodeKind::Edge) { - serviceConnection = std::make_shared(); + edgeServiceConnection = std::make_shared(); + edgeServiceConnection->Init(); } - else + + // If we're only an edge, don't setup any service connection + if (configuration->GetNodeKind() !== NodeKind::Edge) { switch (configuration->GetServiceConnectionKind()) { @@ -482,9 +486,9 @@ void JanusFtl::initServiceConnection() configuration->GetDummyPreviewImagePath()); break; } - } - serviceConnection->Init(); + serviceConnection->Init(); + } } void JanusFtl::initServiceReportThread() @@ -677,7 +681,7 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId, // TODO: Tell viewers stream is offline. // If we are configured as an Ingest node, notify the Orchestrator that a stream has ended. - if ((configuration->GetNodeKind() == NodeKind::Ingest || configuration->GetNodeKind() == NodeKind::Combo) && (orchestrationClient != nullptr)) + if (orchestrationEnabled && (orchestrationClient != nullptr)) { spdlog::info("Unpublishing channel {} / stream {} from Orchestrator", stream->GetChannelId(), stream->GetStreamId()); @@ -749,11 +753,11 @@ janus_plugin_result* JanusFtl::handleWatchMessage(ActiveSession& session, JsonPt // If we're an Edge node and this is a first viewer for a given channel, // request that this channel be relayed to us. - if ((configuration->GetNodeKind() == NodeKind::Edge || configuration->GetNodeKind() == NodeKind::Combo) && (pendingViewers == 0)) + if (orchestrationEnabled && (pendingViewers == 0)) { // Generate a new stream key for incoming relay of this channel const auto& edgeServiceConnection = - std::dynamic_pointer_cast(serviceConnection); + std::dynamic_pointer_cast(edgeServiceConnection); if (edgeServiceConnection == nullptr) { throw std::runtime_error( diff --git a/src/JanusFtl.h b/src/JanusFtl.h index 7dd5103..56df3f9 100644 --- a/src/JanusFtl.h +++ b/src/JanusFtl.h @@ -96,8 +96,10 @@ class JanusFtl janus_callbacks* janusCore; std::unique_ptr ftlServer; std::unique_ptr configuration; + bool orchestrationEnabled = false; std::shared_ptr orchestrationClient; std::shared_ptr serviceConnection; + std::shared_ptr edgeServiceConnection; std::unordered_map> videoDecoders; uint32_t maxAllowedBitsPerSecond = 0; uint32_t rollingSizeAvgMs = 2000; @@ -124,7 +126,7 @@ class JanusFtl // Initialization void initVideoDecoders(); void initOrchestratorConnection(); - void initServiceConnection(); + void initServiceConnections(); void initServiceReportThread(); // Service report thread body void serviceReportThreadBody(std::promise&& threadEndedPromise); From d8ea8a1262f9fee30a6dc9e4b67a2e7a869a12ff Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Sat, 25 Jun 2022 12:03:47 -0400 Subject: [PATCH 5/8] Working version --- src/JanusFtl.cpp | 40 ++++++++++++++----- src/JanusFtl.h | 4 ++ .../EdgeNodeServiceConnection.cpp | 3 ++ .../GlimeshServiceConnection.cpp | 2 + 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/src/JanusFtl.cpp b/src/JanusFtl.cpp index 70057a4..ab29f08 100644 --- a/src/JanusFtl.cpp +++ b/src/JanusFtl.cpp @@ -331,7 +331,18 @@ json_t* JanusFtl::QuerySession(janus_plugin_session* handle) #pragma region Private methods Result> JanusFtl::ftlServerRequestKey(ftl_channel_id_t channelId) { - return serviceConnection->GetHmacKey(channelId); + std::shared_ptr connection = getServiceConnection(channelId); + return connection->GetHmacKey(channelId); +} + +std::shared_ptr JanusFtl::getServiceConnection(ftl_channel_id_t channelId) +{ + if (pendingEdgeChannels.count(channelId) > 0) { + // This stream is coming from another ingest + return edgeServiceConnection; + } else { + return serviceConnection; + } } Result JanusFtl::ftlServerStreamStarted( @@ -341,7 +352,8 @@ Result JanusFtl::ftlServerStreamStarted( std::unique_lock lock(streamDataMutex); // Attempt to start the stream on the service connection - Result startResult = serviceConnection->StartStream(channelId); + std::shared_ptr connection = getServiceConnection(channelId); + Result startResult = connection->StartStream(channelId); if (startResult.IsError) { return Result::Error(startResult.ErrorMessage); @@ -375,7 +387,7 @@ Result JanusFtl::ftlServerStreamStarted( // TODO: Notify viewer sessions // If we are configured as an Ingest node, notify the Orchestrator that a stream has started. - if (orchestrationEnabled && (orchestrationClient != nullptr)) + if (orchestrationEnabled && orchestrationClient != nullptr && pendingEdgeChannels.count(channelId) == 0) { spdlog::info("Publishing channel {} / stream {} to Orchestrator...", channelId, streamId); @@ -409,7 +421,7 @@ void JanusFtl::initVideoDecoders() void JanusFtl::initOrchestratorConnection() { - if (configuration->GetOrchestratorHostname() != nullptr) + if (!empty(configuration->GetOrchestratorHostname())) { spdlog::info( "Connecting to Orchestration service @ {}:{}...", @@ -459,7 +471,7 @@ void JanusFtl::initServiceConnections() } // If we're only an edge, don't setup any service connection - if (configuration->GetNodeKind() !== NodeKind::Edge) + if (configuration->GetNodeKind() != NodeKind::Edge) { switch (configuration->GetServiceConnectionKind()) { @@ -607,8 +619,10 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) .videoWidth = videoWidth, .videoHeight = videoHeight, }; + + std::shared_ptr connection = getServiceConnection(channelId); Result updateResult = - serviceConnection->UpdateStreamMetadata(streamId, metadata); + connection->UpdateStreamMetadata(streamId, metadata); // Check if the request failed, or the service wants to end this stream if (updateResult.IsError || (updateResult.Value == ServiceConnection::ServiceResponse::EndStream)) @@ -636,7 +650,7 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) { std::vector jpegBytes = videoDecoders.at(keyframe.Codec)->GenerateJpegImage(keyframe.Packets); - serviceConnection->SendJpegPreviewImage(streamId, jpegBytes); + connection->SendJpegPreviewImage(streamId, jpegBytes); } catch (const PreviewGenerationFailedException& e) { @@ -698,7 +712,8 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId, spdlog::info("Stream ended. Channel {} / stream {}", stream->GetChannelId(), stream->GetStreamId()); - serviceConnection->EndStream(streamId); + std::shared_ptr connection = getServiceConnection(channelId); + connection->EndStream(streamId); streams.erase(channelId); } @@ -756,14 +771,17 @@ janus_plugin_result* JanusFtl::handleWatchMessage(ActiveSession& session, JsonPt if (orchestrationEnabled && (pendingViewers == 0)) { // Generate a new stream key for incoming relay of this channel - const auto& edgeServiceConnection = + const auto& edgeService = std::dynamic_pointer_cast(edgeServiceConnection); - if (edgeServiceConnection == nullptr) + if (edgeService == nullptr) { throw std::runtime_error( "Unexpected service connection type - expected EdgeNodeServiceConnection."); } - std::vector streamKey = edgeServiceConnection->ProvisionStreamKey(channelId); + + pendingEdgeChannels.insert(channelId); + + std::vector streamKey = edgeService->ProvisionStreamKey(channelId); // Subscribe for relay of this stream spdlog::info("First viewer for channel {} - subscribing...", diff --git a/src/JanusFtl.h b/src/JanusFtl.h index 56df3f9..1b005ec 100644 --- a/src/JanusFtl.h +++ b/src/JanusFtl.h @@ -16,6 +16,7 @@ #include "JanusSession.h" #include "JanusStream.h" #include "ServiceConnections/ServiceConnection.h" +#include "ServiceConnections/EdgeNodeServiceConnection.h" #include "Utilities/FtlTypes.h" #include "Utilities/JanssonPtr.h" #include "Utilities/Result.h" @@ -116,6 +117,7 @@ class JanusFtl std::unordered_map> streams; std::unordered_map sessions; std::unordered_map> pendingViewerSessions; + std::unordered_set pendingEdgeChannels; /* Private methods */ // FtlServer Callbacks @@ -148,4 +150,6 @@ class JanusFtl ConnectionResult onOrchestratorIntro(ConnectionIntroPayload payload); ConnectionResult onOrchestratorOutro(ConnectionOutroPayload payload); ConnectionResult onOrchestratorStreamRelay(ConnectionRelayPayload payload); + // Helper functions + std::shared_ptr getServiceConnection(ftl_channel_id_t channelId); }; \ No newline at end of file diff --git a/src/ServiceConnections/EdgeNodeServiceConnection.cpp b/src/ServiceConnections/EdgeNodeServiceConnection.cpp index 72127e0..d430c7e 100644 --- a/src/ServiceConnections/EdgeNodeServiceConnection.cpp +++ b/src/ServiceConnections/EdgeNodeServiceConnection.cpp @@ -19,6 +19,8 @@ EdgeNodeServiceConnection::EdgeNodeServiceConnection() #pragma region Public methods std::vector EdgeNodeServiceConnection::ProvisionStreamKey(ftl_channel_id_t channelId) { + spdlog::debug( + "EdgeNodeServiceConnection::ProvisionStreamKey called..."); // If a stream key already exists for the given channel, just return the existing one if (streamKeys.count(channelId) > 0) { @@ -41,6 +43,7 @@ void EdgeNodeServiceConnection::Init() Result> EdgeNodeServiceConnection::GetHmacKey(ftl_channel_id_t channelId) { + if (streamKeys.count(channelId) > 0) { const auto& key = streamKeys[channelId]; diff --git a/src/ServiceConnections/GlimeshServiceConnection.cpp b/src/ServiceConnections/GlimeshServiceConnection.cpp index 837548b..885c83a 100644 --- a/src/ServiceConnections/GlimeshServiceConnection.cpp +++ b/src/ServiceConnections/GlimeshServiceConnection.cpp @@ -40,6 +40,7 @@ void GlimeshServiceConnection::Init() Result> GlimeshServiceConnection::GetHmacKey(uint32_t channelId) { + spdlog::debug("GlimeshServiceConnection::GetHmacKey..."); std::stringstream query; query << "query { channel(id: \"" << channelId << "\") { hmacKey } }"; @@ -68,6 +69,7 @@ Result> GlimeshServiceConnection::GetHmacKey(uint32_t cha Result GlimeshServiceConnection::StartStream(ftl_channel_id_t channelId) { + spdlog::debug("GlimeshServiceConnection::StartStream..."); std::stringstream query; query << "mutation { startStream(channelId: " << channelId << ") { id } }"; From 247a425b216eb3e9741db219e393687ab47c33c9 Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Sun, 26 Jun 2022 15:44:15 -0400 Subject: [PATCH 6/8] Renaming orchestratorRelayChannels and removing them when unsubscribing --- src/JanusFtl.cpp | 8 +++++--- src/JanusFtl.h | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/JanusFtl.cpp b/src/JanusFtl.cpp index ab29f08..6f6a422 100644 --- a/src/JanusFtl.cpp +++ b/src/JanusFtl.cpp @@ -305,7 +305,9 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) throw std::runtime_error( "Unexpected service connection type - expected EdgeNodeServiceConnection."); } + edgeService->ClearStreamKey(channelId); + orchestratorRelayChannels.erase(channelId); spdlog::info("Last viewer for channel {} has disconnected - unsubscribing...", channelId); @@ -337,7 +339,7 @@ Result> JanusFtl::ftlServerRequestKey(ftl_channel_id_t ch std::shared_ptr JanusFtl::getServiceConnection(ftl_channel_id_t channelId) { - if (pendingEdgeChannels.count(channelId) > 0) { + if (orchestratorRelayChannels.count(channelId) > 0) { // This stream is coming from another ingest return edgeServiceConnection; } else { @@ -387,7 +389,7 @@ Result JanusFtl::ftlServerStreamStarted( // TODO: Notify viewer sessions // If we are configured as an Ingest node, notify the Orchestrator that a stream has started. - if (orchestrationEnabled && orchestrationClient != nullptr && pendingEdgeChannels.count(channelId) == 0) + if (orchestrationEnabled && orchestrationClient != nullptr && orchestratorRelayChannels.count(channelId) == 0) { spdlog::info("Publishing channel {} / stream {} to Orchestrator...", channelId, streamId); @@ -779,7 +781,7 @@ janus_plugin_result* JanusFtl::handleWatchMessage(ActiveSession& session, JsonPt "Unexpected service connection type - expected EdgeNodeServiceConnection."); } - pendingEdgeChannels.insert(channelId); + orchestratorRelayChannels.insert(channelId); std::vector streamKey = edgeService->ProvisionStreamKey(channelId); diff --git a/src/JanusFtl.h b/src/JanusFtl.h index 1b005ec..274acc3 100644 --- a/src/JanusFtl.h +++ b/src/JanusFtl.h @@ -117,7 +117,7 @@ class JanusFtl std::unordered_map> streams; std::unordered_map sessions; std::unordered_map> pendingViewerSessions; - std::unordered_set pendingEdgeChannels; + std::unordered_set orchestratorRelayChannels; /* Private methods */ // FtlServer Callbacks From 90905252b32d2bb7c924cf0d62c07721f9cd5f24 Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Mon, 27 Jun 2022 10:00:07 -0400 Subject: [PATCH 7/8] Adding lock & formatting --- src/JanusFtl.cpp | 20 ++++++++++++-------- src/JanusFtl.h | 3 ++- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/JanusFtl.cpp b/src/JanusFtl.cpp index 6f6a422..102789e 100644 --- a/src/JanusFtl.cpp +++ b/src/JanusFtl.cpp @@ -337,7 +337,9 @@ Result> JanusFtl::ftlServerRequestKey(ftl_channel_id_t ch return connection->GetHmacKey(channelId); } -std::shared_ptr JanusFtl::getServiceConnection(ftl_channel_id_t channelId) +std::shared_ptr JanusFtl::getServiceConnection(ftl_channel_id_t channelId, + const std::unique_lock& streamDataLock +) { if (orchestratorRelayChannels.count(channelId) > 0) { // This stream is coming from another ingest @@ -354,7 +356,7 @@ Result JanusFtl::ftlServerStreamStarted( std::unique_lock lock(streamDataMutex); // Attempt to start the stream on the service connection - std::shared_ptr connection = getServiceConnection(channelId); + std::shared_ptr connection = getServiceConnection(channelId, lock); Result startResult = connection->StartStream(channelId); if (startResult.IsError) { @@ -423,7 +425,7 @@ void JanusFtl::initVideoDecoders() void JanusFtl::initOrchestratorConnection() { - if (!empty(configuration->GetOrchestratorHostname())) + if (configuration->GetOrchestratorHostname().empty() == false) { spdlog::info( "Connecting to Orchestration service @ {}:{}...", @@ -465,14 +467,16 @@ void JanusFtl::initOrchestratorConnection() void JanusFtl::initServiceConnections() { - // If we are configured to be an standalone or edge node, we need to setup the EdgeNodeServiceConnection - if (configuration->GetNodeKind() == NodeKind::Standalone || configuration->GetNodeKind() == NodeKind::Edge) + // If we are configured to be an standalone or edge node, + // we need to setup an edge service connection + if (configuration->GetNodeKind() == NodeKind::Standalone || + configuration->GetNodeKind() == NodeKind::Edge) { edgeServiceConnection = std::make_shared(); edgeServiceConnection->Init(); } - // If we're only an edge, don't setup any service connection + // If we're not an edge, setup a service connection as well if (configuration->GetNodeKind() != NodeKind::Edge) { switch (configuration->GetServiceConnectionKind()) @@ -714,7 +718,7 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId, spdlog::info("Stream ended. Channel {} / stream {}", stream->GetChannelId(), stream->GetStreamId()); - std::shared_ptr connection = getServiceConnection(channelId); + std::shared_ptr connection = getServiceConnection(channelId, streamDataLock); connection->EndStream(streamId); streams.erase(channelId); } @@ -1026,4 +1030,4 @@ ConnectionResult JanusFtl::onOrchestratorStreamRelay(ConnectionRelayPayload payl return ConnectionResult { .IsSuccess = true }; } } -#pragma endregion Private methods +#pragma endregion Private methods \ No newline at end of file diff --git a/src/JanusFtl.h b/src/JanusFtl.h index 274acc3..4db49af 100644 --- a/src/JanusFtl.h +++ b/src/JanusFtl.h @@ -151,5 +151,6 @@ class JanusFtl ConnectionResult onOrchestratorOutro(ConnectionOutroPayload payload); ConnectionResult onOrchestratorStreamRelay(ConnectionRelayPayload payload); // Helper functions - std::shared_ptr getServiceConnection(ftl_channel_id_t channelId); + std::shared_ptr getServiceConnection(ftl_channel_id_t channelId, + const std::unique_lock& streamDataLock); }; \ No newline at end of file From 4dfd808152420ef4654c4495e6e9b66c13570309 Mon Sep 17 00:00:00 2001 From: Luke Strickland Date: Mon, 27 Jun 2022 16:14:45 -0400 Subject: [PATCH 8/8] Locks on locks on locks --- src/JanusFtl.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/JanusFtl.cpp b/src/JanusFtl.cpp index 102789e..beb4ff0 100644 --- a/src/JanusFtl.cpp +++ b/src/JanusFtl.cpp @@ -333,7 +333,8 @@ json_t* JanusFtl::QuerySession(janus_plugin_session* handle) #pragma region Private methods Result> JanusFtl::ftlServerRequestKey(ftl_channel_id_t channelId) { - std::shared_ptr connection = getServiceConnection(channelId); + std::unique_lock lock(streamDataMutex); + std::shared_ptr connection = getServiceConnection(channelId, lock); return connection->GetHmacKey(channelId); } @@ -543,6 +544,7 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) ftlServer->GetAllStatsAndKeyframes(); std::unordered_map metadataByChannel; std::unordered_map viewersByChannel; + std::unordered_map> servicesByChannel; std::unique_lock lock(streamDataMutex); for (const auto& streamInfo : statsAndKeyframes) { @@ -553,6 +555,7 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) } metadataByChannel.try_emplace(channelId, streams.at(channelId)->GetMetadata()); viewersByChannel.try_emplace(channelId, streams.at(channelId)->GetViewerCount()); + servicesByChannel.try_emplace(channelId, getServiceConnection(channelId, lock)); } lock.unlock(); @@ -578,7 +581,8 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) } if ((viewersByChannel.count(channelId) <= 0) || - (metadataByChannel.count(channelId) <= 0)) + (metadataByChannel.count(channelId) <= 0) || + (servicesByChannel.count(channelId) <= 0)) { continue; } @@ -626,7 +630,8 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) .videoHeight = videoHeight, }; - std::shared_ptr connection = getServiceConnection(channelId); + + const std::shared_ptr connection = servicesByChannel.at(channelId); Result updateResult = connection->UpdateStreamMetadata(streamId, metadata); // Check if the request failed, or the service wants to end this stream