diff --git a/src/JanusFtl.cpp b/src/JanusFtl.cpp index e63a8cb..beb4ff0 100644 --- a/src/JanusFtl.cpp +++ b/src/JanusFtl.cpp @@ -60,7 +60,7 @@ JanusFtl::JanusFtl( initOrchestratorConnection(); - initServiceConnection(); + initServiceConnections(); ftlServer = std::make_unique(std::move(ingestControlListener), std::move(mediaConnectionCreator), @@ -271,8 +271,7 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) // If we're an Edge node and there are no more viewers for this channel, we can // unsubscribe. - if ((configuration->GetNodeKind() == NodeKind::Edge) && - (watchingStream->GetViewerCount() == 0)) + if (orchestrationEnabled && (watchingStream->GetViewerCount() == 0)) { orchestratorUnsubscribe = true; } @@ -287,7 +286,7 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) // If this was the last pending viewer for this channel, unsubscribe. if (pendingViewerSessions[channelId].size() == 0) { - if (configuration->GetNodeKind() == NodeKind::Edge) + if (orchestrationEnabled) { orchestratorUnsubscribe = true; } @@ -299,14 +298,16 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) if (orchestratorUnsubscribe) { // Remove temporary stream key - const auto& edgeServiceConnection = - std::dynamic_pointer_cast(serviceConnection); - if (edgeServiceConnection == nullptr) + const auto& edgeService = + std::dynamic_pointer_cast(edgeServiceConnection); + if (edgeService == nullptr) { throw std::runtime_error( "Unexpected service connection type - expected EdgeNodeServiceConnection."); } - edgeServiceConnection->ClearStreamKey(channelId); + + edgeService->ClearStreamKey(channelId); + orchestratorRelayChannels.erase(channelId); spdlog::info("Last viewer for channel {} has disconnected - unsubscribing...", channelId); @@ -332,7 +333,21 @@ json_t* JanusFtl::QuerySession(janus_plugin_session* handle) #pragma region Private methods Result> JanusFtl::ftlServerRequestKey(ftl_channel_id_t channelId) { - return serviceConnection->GetHmacKey(channelId); + std::unique_lock lock(streamDataMutex); + std::shared_ptr connection = getServiceConnection(channelId, lock); + return connection->GetHmacKey(channelId); +} + +std::shared_ptr JanusFtl::getServiceConnection(ftl_channel_id_t channelId, + const std::unique_lock& streamDataLock +) +{ + if (orchestratorRelayChannels.count(channelId) > 0) { + // This stream is coming from another ingest + return edgeServiceConnection; + } else { + return serviceConnection; + } } Result JanusFtl::ftlServerStreamStarted( @@ -342,7 +357,8 @@ Result JanusFtl::ftlServerStreamStarted( std::unique_lock lock(streamDataMutex); // Attempt to start the stream on the service connection - Result startResult = serviceConnection->StartStream(channelId); + std::shared_ptr connection = getServiceConnection(channelId, lock); + Result startResult = connection->StartStream(channelId); if (startResult.IsError) { return Result::Error(startResult.ErrorMessage); @@ -376,7 +392,7 @@ Result JanusFtl::ftlServerStreamStarted( // TODO: Notify viewer sessions // If we are configured as an Ingest node, notify the Orchestrator that a stream has started. - if ((configuration->GetNodeKind() == NodeKind::Ingest) && (orchestrationClient != nullptr)) + if (orchestrationEnabled && orchestrationClient != nullptr && orchestratorRelayChannels.count(channelId) == 0) { spdlog::info("Publishing channel {} / stream {} to Orchestrator...", channelId, streamId); @@ -410,7 +426,7 @@ void JanusFtl::initVideoDecoders() void JanusFtl::initOrchestratorConnection() { - if (configuration->GetNodeKind() != NodeKind::Standalone) + if (configuration->GetOrchestratorHostname().empty() == false) { spdlog::info( "Connecting to Orchestration service @ {}:{}...", @@ -445,17 +461,24 @@ void JanusFtl::initOrchestratorConnection() .RegionCode = configuration->GetOrchestratorRegionCode(), .Hostname = configuration->GetMyHostname(), }); + + orchestrationEnabled = true; } } -void JanusFtl::initServiceConnection() +void JanusFtl::initServiceConnections() { - // If we are configured to be an edge node, we *must* use the EdgeNodeServiceConnection - if (configuration->GetNodeKind() == NodeKind::Edge) + // If we are configured to be an standalone or edge node, + // we need to setup an edge service connection + if (configuration->GetNodeKind() == NodeKind::Standalone || + configuration->GetNodeKind() == NodeKind::Edge) { - serviceConnection = std::make_shared(); + edgeServiceConnection = std::make_shared(); + edgeServiceConnection->Init(); } - else + + // If we're not an edge, setup a service connection as well + if (configuration->GetNodeKind() != NodeKind::Edge) { switch (configuration->GetServiceConnectionKind()) { @@ -482,9 +505,9 @@ void JanusFtl::initServiceConnection() configuration->GetDummyPreviewImagePath()); break; } - } - serviceConnection->Init(); + serviceConnection->Init(); + } } void JanusFtl::initServiceReportThread() @@ -521,6 +544,7 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) ftlServer->GetAllStatsAndKeyframes(); std::unordered_map metadataByChannel; std::unordered_map viewersByChannel; + std::unordered_map> servicesByChannel; std::unique_lock lock(streamDataMutex); for (const auto& streamInfo : statsAndKeyframes) { @@ -531,6 +555,7 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) } metadataByChannel.try_emplace(channelId, streams.at(channelId)->GetMetadata()); viewersByChannel.try_emplace(channelId, streams.at(channelId)->GetViewerCount()); + servicesByChannel.try_emplace(channelId, getServiceConnection(channelId, lock)); } lock.unlock(); @@ -556,7 +581,8 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) } if ((viewersByChannel.count(channelId) <= 0) || - (metadataByChannel.count(channelId) <= 0)) + (metadataByChannel.count(channelId) <= 0) || + (servicesByChannel.count(channelId) <= 0)) { continue; } @@ -603,8 +629,11 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) .videoWidth = videoWidth, .videoHeight = videoHeight, }; + + + const std::shared_ptr connection = servicesByChannel.at(channelId); Result updateResult = - serviceConnection->UpdateStreamMetadata(streamId, metadata); + connection->UpdateStreamMetadata(streamId, metadata); // Check if the request failed, or the service wants to end this stream if (updateResult.IsError || (updateResult.Value == ServiceConnection::ServiceResponse::EndStream)) @@ -632,7 +661,7 @@ void JanusFtl::serviceReportThreadBody(std::promise&& threadEndedPromise) { std::vector jpegBytes = videoDecoders.at(keyframe.Codec)->GenerateJpegImage(keyframe.Packets); - serviceConnection->SendJpegPreviewImage(streamId, jpegBytes); + connection->SendJpegPreviewImage(streamId, jpegBytes); } catch (const PreviewGenerationFailedException& e) { @@ -677,7 +706,7 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId, // TODO: Tell viewers stream is offline. // If we are configured as an Ingest node, notify the Orchestrator that a stream has ended. - if ((configuration->GetNodeKind() == NodeKind::Ingest) && (orchestrationClient != nullptr)) + if (orchestrationEnabled && (orchestrationClient != nullptr)) { spdlog::info("Unpublishing channel {} / stream {} from Orchestrator", stream->GetChannelId(), stream->GetStreamId()); @@ -694,7 +723,8 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId, spdlog::info("Stream ended. Channel {} / stream {}", stream->GetChannelId(), stream->GetStreamId()); - serviceConnection->EndStream(streamId); + std::shared_ptr connection = getServiceConnection(channelId, streamDataLock); + connection->EndStream(streamId); streams.erase(channelId); } @@ -749,17 +779,20 @@ janus_plugin_result* JanusFtl::handleWatchMessage(ActiveSession& session, JsonPt // If we're an Edge node and this is a first viewer for a given channel, // request that this channel be relayed to us. - if ((configuration->GetNodeKind() == NodeKind::Edge) && (pendingViewers == 0)) + if (orchestrationEnabled && (pendingViewers == 0)) { // Generate a new stream key for incoming relay of this channel - const auto& edgeServiceConnection = - std::dynamic_pointer_cast(serviceConnection); - if (edgeServiceConnection == nullptr) + const auto& edgeService = + std::dynamic_pointer_cast(edgeServiceConnection); + if (edgeService == nullptr) { throw std::runtime_error( "Unexpected service connection type - expected EdgeNodeServiceConnection."); } - std::vector streamKey = edgeServiceConnection->ProvisionStreamKey(channelId); + + orchestratorRelayChannels.insert(channelId); + + std::vector streamKey = edgeService->ProvisionStreamKey(channelId); // Subscribe for relay of this stream spdlog::info("First viewer for channel {} - subscribing...", @@ -1002,4 +1035,4 @@ ConnectionResult JanusFtl::onOrchestratorStreamRelay(ConnectionRelayPayload payl return ConnectionResult { .IsSuccess = true }; } } -#pragma endregion Private methods +#pragma endregion Private methods \ No newline at end of file diff --git a/src/JanusFtl.h b/src/JanusFtl.h index 7dd5103..4db49af 100644 --- a/src/JanusFtl.h +++ b/src/JanusFtl.h @@ -16,6 +16,7 @@ #include "JanusSession.h" #include "JanusStream.h" #include "ServiceConnections/ServiceConnection.h" +#include "ServiceConnections/EdgeNodeServiceConnection.h" #include "Utilities/FtlTypes.h" #include "Utilities/JanssonPtr.h" #include "Utilities/Result.h" @@ -96,8 +97,10 @@ class JanusFtl janus_callbacks* janusCore; std::unique_ptr ftlServer; std::unique_ptr configuration; + bool orchestrationEnabled = false; std::shared_ptr orchestrationClient; std::shared_ptr serviceConnection; + std::shared_ptr edgeServiceConnection; std::unordered_map> videoDecoders; uint32_t maxAllowedBitsPerSecond = 0; uint32_t rollingSizeAvgMs = 2000; @@ -114,6 +117,7 @@ class JanusFtl std::unordered_map> streams; std::unordered_map sessions; std::unordered_map> pendingViewerSessions; + std::unordered_set orchestratorRelayChannels; /* Private methods */ // FtlServer Callbacks @@ -124,7 +128,7 @@ class JanusFtl // Initialization void initVideoDecoders(); void initOrchestratorConnection(); - void initServiceConnection(); + void initServiceConnections(); void initServiceReportThread(); // Service report thread body void serviceReportThreadBody(std::promise&& threadEndedPromise); @@ -146,4 +150,7 @@ class JanusFtl ConnectionResult onOrchestratorIntro(ConnectionIntroPayload payload); ConnectionResult onOrchestratorOutro(ConnectionOutroPayload payload); ConnectionResult onOrchestratorStreamRelay(ConnectionRelayPayload payload); + // Helper functions + std::shared_ptr getServiceConnection(ftl_channel_id_t channelId, + const std::unique_lock& streamDataLock); }; \ No newline at end of file diff --git a/src/ServiceConnections/EdgeNodeServiceConnection.cpp b/src/ServiceConnections/EdgeNodeServiceConnection.cpp index 72127e0..d430c7e 100644 --- a/src/ServiceConnections/EdgeNodeServiceConnection.cpp +++ b/src/ServiceConnections/EdgeNodeServiceConnection.cpp @@ -19,6 +19,8 @@ EdgeNodeServiceConnection::EdgeNodeServiceConnection() #pragma region Public methods std::vector EdgeNodeServiceConnection::ProvisionStreamKey(ftl_channel_id_t channelId) { + spdlog::debug( + "EdgeNodeServiceConnection::ProvisionStreamKey called..."); // If a stream key already exists for the given channel, just return the existing one if (streamKeys.count(channelId) > 0) { @@ -41,6 +43,7 @@ void EdgeNodeServiceConnection::Init() Result> EdgeNodeServiceConnection::GetHmacKey(ftl_channel_id_t channelId) { + if (streamKeys.count(channelId) > 0) { const auto& key = streamKeys[channelId]; diff --git a/src/ServiceConnections/GlimeshServiceConnection.cpp b/src/ServiceConnections/GlimeshServiceConnection.cpp index 837548b..885c83a 100644 --- a/src/ServiceConnections/GlimeshServiceConnection.cpp +++ b/src/ServiceConnections/GlimeshServiceConnection.cpp @@ -40,6 +40,7 @@ void GlimeshServiceConnection::Init() Result> GlimeshServiceConnection::GetHmacKey(uint32_t channelId) { + spdlog::debug("GlimeshServiceConnection::GetHmacKey..."); std::stringstream query; query << "query { channel(id: \"" << channelId << "\") { hmacKey } }"; @@ -68,6 +69,7 @@ Result> GlimeshServiceConnection::GetHmacKey(uint32_t cha Result GlimeshServiceConnection::StartStream(ftl_channel_id_t channelId) { + spdlog::debug("GlimeshServiceConnection::StartStream..."); std::stringstream query; query << "mutation { startStream(channelId: " << channelId << ") { id } }";