-
Notifications
You must be signed in to change notification settings - Fork 10
Allow Standalone nodes to operate in an orchestrated environment #138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 6 commits
fb2d78e
8878517
413f211
0c31cd4
d8ea8a1
247a425
9090525
4dfd808
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,7 +60,7 @@ JanusFtl::JanusFtl( | |
|
|
||
| initOrchestratorConnection(); | ||
|
|
||
| initServiceConnection(); | ||
| initServiceConnections(); | ||
|
|
||
| ftlServer = std::make_unique<FtlServer>(std::move(ingestControlListener), | ||
| std::move(mediaConnectionCreator), | ||
|
|
@@ -271,8 +271,7 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) | |
|
|
||
| // If we're an Edge node and there are no more viewers for this channel, we can | ||
| // unsubscribe. | ||
| if ((configuration->GetNodeKind() == NodeKind::Edge) && | ||
| (watchingStream->GetViewerCount() == 0)) | ||
| if (orchestrationEnabled && (watchingStream->GetViewerCount() == 0)) | ||
| { | ||
| orchestratorUnsubscribe = true; | ||
| } | ||
|
|
@@ -287,7 +286,7 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) | |
| // If this was the last pending viewer for this channel, unsubscribe. | ||
| if (pendingViewerSessions[channelId].size() == 0) | ||
| { | ||
| if (configuration->GetNodeKind() == NodeKind::Edge) | ||
| if (orchestrationEnabled) | ||
| { | ||
| orchestratorUnsubscribe = true; | ||
| } | ||
|
|
@@ -299,14 +298,16 @@ void JanusFtl::DestroySession(janus_plugin_session* handle, int* error) | |
| if (orchestratorUnsubscribe) | ||
| { | ||
| // Remove temporary stream key | ||
| const auto& edgeServiceConnection = | ||
| std::dynamic_pointer_cast<EdgeNodeServiceConnection>(serviceConnection); | ||
| if (edgeServiceConnection == nullptr) | ||
| const auto& edgeService = | ||
| std::dynamic_pointer_cast<EdgeNodeServiceConnection>(edgeServiceConnection); | ||
| if (edgeService == nullptr) | ||
| { | ||
| throw std::runtime_error( | ||
| "Unexpected service connection type - expected EdgeNodeServiceConnection."); | ||
| } | ||
| edgeServiceConnection->ClearStreamKey(channelId); | ||
|
|
||
| edgeService->ClearStreamKey(channelId); | ||
| orchestratorRelayChannels.erase(channelId); | ||
|
|
||
| spdlog::info("Last viewer for channel {} has disconnected - unsubscribing...", | ||
| channelId); | ||
|
|
@@ -332,7 +333,18 @@ json_t* JanusFtl::QuerySession(janus_plugin_session* handle) | |
| #pragma region Private methods | ||
| Result<std::vector<std::byte>> JanusFtl::ftlServerRequestKey(ftl_channel_id_t channelId) | ||
| { | ||
| return serviceConnection->GetHmacKey(channelId); | ||
| std::shared_ptr<ServiceConnection> connection = getServiceConnection(channelId); | ||
| return connection->GetHmacKey(channelId); | ||
| } | ||
|
|
||
| std::shared_ptr<ServiceConnection> JanusFtl::getServiceConnection(ftl_channel_id_t channelId) | ||
| { | ||
| if (orchestratorRelayChannels.count(channelId) > 0) { | ||
| // This stream is coming from another ingest | ||
| return edgeServiceConnection; | ||
| } else { | ||
| return serviceConnection; | ||
| } | ||
| } | ||
|
|
||
| Result<FtlServer::StartedStreamInfo> JanusFtl::ftlServerStreamStarted( | ||
|
|
@@ -342,7 +354,8 @@ Result<FtlServer::StartedStreamInfo> JanusFtl::ftlServerStreamStarted( | |
| std::unique_lock lock(streamDataMutex); | ||
|
|
||
| // Attempt to start the stream on the service connection | ||
| Result<ftl_stream_id_t> startResult = serviceConnection->StartStream(channelId); | ||
| std::shared_ptr<ServiceConnection> connection = getServiceConnection(channelId); | ||
| Result<ftl_stream_id_t> startResult = connection->StartStream(channelId); | ||
| if (startResult.IsError) | ||
| { | ||
| return Result<FtlServer::StartedStreamInfo>::Error(startResult.ErrorMessage); | ||
|
|
@@ -376,7 +389,7 @@ Result<FtlServer::StartedStreamInfo> JanusFtl::ftlServerStreamStarted( | |
| // TODO: Notify viewer sessions | ||
|
|
||
| // If we are configured as an Ingest node, notify the Orchestrator that a stream has started. | ||
| if ((configuration->GetNodeKind() == NodeKind::Ingest) && (orchestrationClient != nullptr)) | ||
| if (orchestrationEnabled && orchestrationClient != nullptr && orchestratorRelayChannels.count(channelId) == 0) | ||
| { | ||
| spdlog::info("Publishing channel {} / stream {} to Orchestrator...", channelId, | ||
| streamId); | ||
|
|
@@ -410,7 +423,7 @@ void JanusFtl::initVideoDecoders() | |
|
|
||
| void JanusFtl::initOrchestratorConnection() | ||
| { | ||
| if (configuration->GetNodeKind() != NodeKind::Standalone) | ||
| if (!empty(configuration->GetOrchestratorHostname())) | ||
|
clone1018 marked this conversation as resolved.
Outdated
|
||
| { | ||
| spdlog::info( | ||
| "Connecting to Orchestration service @ {}:{}...", | ||
|
|
@@ -445,17 +458,22 @@ void JanusFtl::initOrchestratorConnection() | |
| .RegionCode = configuration->GetOrchestratorRegionCode(), | ||
| .Hostname = configuration->GetMyHostname(), | ||
| }); | ||
|
|
||
| orchestrationEnabled = true; | ||
| } | ||
| } | ||
|
|
||
| void JanusFtl::initServiceConnection() | ||
| void JanusFtl::initServiceConnections() | ||
| { | ||
| // If we are configured to be an edge node, we *must* use the EdgeNodeServiceConnection | ||
| if (configuration->GetNodeKind() == NodeKind::Edge) | ||
| // If we are configured to be an standalone or edge node, we need to setup the EdgeNodeServiceConnection | ||
| if (configuration->GetNodeKind() == NodeKind::Standalone || configuration->GetNodeKind() == NodeKind::Edge) | ||
|
clone1018 marked this conversation as resolved.
Outdated
|
||
| { | ||
| serviceConnection = std::make_shared<EdgeNodeServiceConnection>(); | ||
| edgeServiceConnection = std::make_shared<EdgeNodeServiceConnection>(); | ||
| edgeServiceConnection->Init(); | ||
| } | ||
| else | ||
|
|
||
| // If we're only an edge, don't setup any service connection | ||
| if (configuration->GetNodeKind() != NodeKind::Edge) | ||
| { | ||
| switch (configuration->GetServiceConnectionKind()) | ||
| { | ||
|
|
@@ -482,9 +500,9 @@ void JanusFtl::initServiceConnection() | |
| configuration->GetDummyPreviewImagePath()); | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| serviceConnection->Init(); | ||
| serviceConnection->Init(); | ||
| } | ||
| } | ||
|
|
||
| void JanusFtl::initServiceReportThread() | ||
|
|
@@ -603,8 +621,10 @@ void JanusFtl::serviceReportThreadBody(std::promise<void>&& threadEndedPromise) | |
| .videoWidth = videoWidth, | ||
| .videoHeight = videoHeight, | ||
| }; | ||
|
|
||
| std::shared_ptr<ServiceConnection> connection = getServiceConnection(channelId); | ||
| Result<ServiceConnection::ServiceResponse> updateResult = | ||
| serviceConnection->UpdateStreamMetadata(streamId, metadata); | ||
| connection->UpdateStreamMetadata(streamId, metadata); | ||
| // Check if the request failed, or the service wants to end this stream | ||
| if (updateResult.IsError || | ||
| (updateResult.Value == ServiceConnection::ServiceResponse::EndStream)) | ||
|
|
@@ -632,7 +652,7 @@ void JanusFtl::serviceReportThreadBody(std::promise<void>&& threadEndedPromise) | |
| { | ||
| std::vector<uint8_t> jpegBytes = | ||
| videoDecoders.at(keyframe.Codec)->GenerateJpegImage(keyframe.Packets); | ||
| serviceConnection->SendJpegPreviewImage(streamId, jpegBytes); | ||
| connection->SendJpegPreviewImage(streamId, jpegBytes); | ||
| } | ||
| catch (const PreviewGenerationFailedException& e) | ||
| { | ||
|
|
@@ -677,7 +697,7 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId, | |
| // TODO: Tell viewers stream is offline. | ||
|
|
||
| // If we are configured as an Ingest node, notify the Orchestrator that a stream has ended. | ||
| if ((configuration->GetNodeKind() == NodeKind::Ingest) && (orchestrationClient != nullptr)) | ||
| if (orchestrationEnabled && (orchestrationClient != nullptr)) | ||
| { | ||
| spdlog::info("Unpublishing channel {} / stream {} from Orchestrator", | ||
| stream->GetChannelId(), stream->GetStreamId()); | ||
|
|
@@ -694,7 +714,8 @@ void JanusFtl::endStream(ftl_channel_id_t channelId, ftl_stream_id_t streamId, | |
| spdlog::info("Stream ended. Channel {} / stream {}", | ||
| stream->GetChannelId(), stream->GetStreamId()); | ||
|
|
||
| serviceConnection->EndStream(streamId); | ||
| std::shared_ptr<ServiceConnection> connection = getServiceConnection(channelId); | ||
| connection->EndStream(streamId); | ||
| streams.erase(channelId); | ||
| } | ||
|
|
||
|
|
@@ -749,17 +770,20 @@ janus_plugin_result* JanusFtl::handleWatchMessage(ActiveSession& session, JsonPt | |
|
|
||
| // If we're an Edge node and this is a first viewer for a given channel, | ||
| // request that this channel be relayed to us. | ||
| if ((configuration->GetNodeKind() == NodeKind::Edge) && (pendingViewers == 0)) | ||
| if (orchestrationEnabled && (pendingViewers == 0)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking through a potential ambiguity in this codepath - Consider a scenario where a standalone FTL instance receives a watch request for a channel that has not started streaming yet. The standalone FTL server will subscribe to the channel via orchestrator. Afterwards, the streamer actually begins streaming, but connects to the same standalone FTL server. I suspect the orchestrator will fulfill the standalone FTL server's subscription and notify the server to relay the stream to itself. Since only one stream can exist per channel, the standalone server will probably terminate the stream (or maybe worse, haven't really traced it through all the way). We'll probably need special behavior to either 1.) Enlighten the orchestrator so it can handle when a standalone server tries to publish a stream on the same channel that it previously has subscribed to 2.) Enlighten the standalone FTL server so it can handle "transitioning" between a pending orchestrator relay and a "direct" stream Thoughts? |
||
| { | ||
| // Generate a new stream key for incoming relay of this channel | ||
| const auto& edgeServiceConnection = | ||
| std::dynamic_pointer_cast<EdgeNodeServiceConnection>(serviceConnection); | ||
| if (edgeServiceConnection == nullptr) | ||
| const auto& edgeService = | ||
| std::dynamic_pointer_cast<EdgeNodeServiceConnection>(edgeServiceConnection); | ||
| if (edgeService == nullptr) | ||
| { | ||
| throw std::runtime_error( | ||
| "Unexpected service connection type - expected EdgeNodeServiceConnection."); | ||
| } | ||
| std::vector<std::byte> streamKey = edgeServiceConnection->ProvisionStreamKey(channelId); | ||
|
|
||
| orchestratorRelayChannels.insert(channelId); | ||
|
|
||
| std::vector<std::byte> streamKey = edgeService->ProvisionStreamKey(channelId); | ||
|
|
||
| // Subscribe for relay of this stream | ||
| spdlog::info("First viewer for channel {} - subscribing...", | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.