Skip to content

Commit 7b6488e

Browse files
committed
Remove host status loop and simplify SSE lifecycle flow
1 parent 2285109 commit 7b6488e

10 files changed

Lines changed: 65 additions & 157 deletions

File tree

internal/server/handlers/sse.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
package handlers
22

33
import (
4-
"context"
54
"errors"
65
"fmt"
76
"log"
87
"net/http"
98
"os"
109
"strings"
11-
"sync"
1210
"time"
1311

1412
"github.com/glimesh/broadcast-box/internal/environment"
1513
"github.com/glimesh/broadcast-box/internal/server/helpers"
1614
"github.com/glimesh/broadcast-box/internal/webrtc/sessions/manager"
17-
"github.com/google/uuid"
1815
)
1916

2017
func sseHandler(responseWriter http.ResponseWriter, request *http.Request) {
@@ -37,15 +34,11 @@ func sseHandler(responseWriter http.ResponseWriter, request *http.Request) {
3734
ctx := request.Context()
3835
responseController := http.NewResponseController(responseWriter)
3936

40-
var writeLock sync.Mutex
41-
writeEvent := func(writeCtx context.Context, msg string) bool {
42-
if msg == "" || writeCtx.Err() != nil {
37+
writeEvent := func(msg string) bool {
38+
if msg == "" || ctx.Err() != nil {
4339
return false
4440
}
4541

46-
writeLock.Lock()
47-
defer writeLock.Unlock()
48-
4942
if debugSseMessages {
5043
log.Println("API.SSE Sending:", msg)
5144
}
@@ -78,35 +71,42 @@ func sseHandler(responseWriter http.ResponseWriter, request *http.Request) {
7871
}
7972

8073
if streamSession, whepSession, foundSession := manager.SessionsManager.GetSessionAndWHEPByID(sessionID); foundSession {
81-
subscriberCtx, subscriberCancel := context.WithCancel(ctx)
82-
defer subscriberCancel()
83-
84-
subscriberID := uuid.NewString()
85-
subscriberWrite := func(msg string) bool {
86-
return writeEvent(subscriberCtx, msg)
87-
}
88-
if !whepSession.AddSSESubscriber(subscriberID, subscriberWrite, subscriberCancel) {
89-
helpers.LogHTTPError(responseWriter, "Invalid request", http.StatusBadRequest)
90-
return
91-
}
92-
defer whepSession.RemoveSSESubscriber(subscriberID)
93-
94-
if !subscriberWrite(streamSession.GetSessionStatsEvent()) {
74+
if !writeEvent(streamSession.GetSessionStatsEvent()) {
9575
return
9676
}
9777

9878
host := streamSession.Host.Load()
99-
if host != nil && !subscriberWrite(host.GetAvailableLayersEvent()) {
79+
if host != nil && !writeEvent(host.GetAvailableLayersEvent()) {
10080
return
10181
}
10282

103-
<-subscriberCtx.Done()
104-
log.Println("API.SSE: Client disconnected")
105-
return
83+
ticker := time.NewTicker(5 * time.Second)
84+
defer ticker.Stop()
85+
86+
for {
87+
select {
88+
case <-ctx.Done():
89+
log.Println("API.SSE: Client disconnected")
90+
return
91+
case <-ticker.C:
92+
if whepSession.IsSessionClosed.Load() {
93+
return
94+
}
95+
96+
if !writeEvent(streamSession.GetSessionStatsEvent()) {
97+
return
98+
}
99+
100+
host := streamSession.Host.Load()
101+
if host != nil && !writeEvent(host.GetAvailableLayersEvent()) {
102+
return
103+
}
104+
}
105+
}
106106
}
107107

108108
if streamSession, foundSession := manager.SessionsManager.GetSessionByHostSessionID(sessionID); foundSession {
109-
if !writeEvent(ctx, streamSession.GetSessionStatsEvent()) {
109+
if !writeEvent(streamSession.GetSessionStatsEvent()) {
110110
return
111111
}
112112

@@ -119,7 +119,7 @@ func sseHandler(responseWriter http.ResponseWriter, request *http.Request) {
119119
log.Println("API.SSE: Client disconnected")
120120
return
121121
case <-ticker.C:
122-
if !writeEvent(ctx, streamSession.GetSessionStatsEvent()) {
122+
if !writeEvent(streamSession.GetSessionStatsEvent()) {
123123
return
124124
}
125125
}

internal/webrtc/sessions/manager/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ func (m *SessionManager) addSession(profile authorization.PublicProfile) (s *ses
3838
m.sessionsLock.Unlock()
3939
})
4040

41-
s.HasHost.Store(true)
4241
m.sessionsLock.Lock()
4342
m.sessions[profile.StreamKey] = s
4443
m.sessionsLock.Unlock()
44+
4545
return s, nil
4646
}
4747

internal/webrtc/sessions/session/routines.go

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package session
22

33
import (
44
"log"
5-
"time"
65

76
"github.com/glimesh/broadcast-box/internal/webrtc/sessions/whep"
87
"github.com/pion/rtcp"
@@ -38,53 +37,3 @@ func (s *Session) handleWHEPVideoRTCPSender(whepSession *whep.WHEPSession, rtcpS
3837
}
3938
}
4039
}
41-
42-
// Broadcast stream status to connected WHEP clients while host is active.
43-
func (s *Session) hostStatusLoop() {
44-
log.Println("Session.Host.HostStatusLoop")
45-
ticker := time.NewTicker(5 * time.Second)
46-
defer ticker.Stop()
47-
48-
for {
49-
host := s.Host.Load()
50-
if host == nil {
51-
if s.isEmpty() {
52-
s.close()
53-
return
54-
}
55-
56-
time.Sleep(5 * time.Second)
57-
continue
58-
}
59-
60-
select {
61-
62-
case <-host.ActiveContext.Done():
63-
s.RemoveHost()
64-
65-
if s.isEmpty() {
66-
s.close()
67-
}
68-
return
69-
70-
// Send status every 5 seconds
71-
case <-ticker.C:
72-
if s.isEmpty() {
73-
s.close()
74-
} else if s.Host.Load() != nil {
75-
status := s.GetSessionStatsEvent()
76-
77-
s.WHEPSessionsLock.RLock()
78-
whepSessions := make([]*whep.WHEPSession, 0, len(s.WHEPSessions))
79-
for _, whepSession := range s.WHEPSessions {
80-
whepSessions = append(whepSessions, whepSession)
81-
}
82-
s.WHEPSessionsLock.RUnlock()
83-
84-
for _, whepSession := range whepSessions {
85-
whepSession.BroadcastSSE(status)
86-
}
87-
}
88-
}
89-
}
90-
}

internal/webrtc/sessions/session/session.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
func (s *Session) UpdateStreamStatus(profile authorization.PublicProfile) {
1616
s.StatusLock.Lock()
1717

18-
s.HasHost.Store(true)
1918
s.MOTD = profile.MOTD
2019
s.IsPublic = profile.IsPublic
2120

@@ -80,6 +79,7 @@ func (s *Session) AddHost(peerConnection *webrtc.PeerConnection) (err error) {
8079
AudioTracks: make(map[string]*whip.AudioTrack),
8180
VideoTracks: make(map[string]*whip.VideoTrack),
8281
}
82+
host.SetOnClosed(s.handleHostClosed)
8383

8484
host.AddPeerConnection(peerConnection, s.StreamKey)
8585
if !s.Host.CompareAndSwap(nil, host) {
@@ -89,8 +89,7 @@ func (s *Session) AddHost(peerConnection *webrtc.PeerConnection) (err error) {
8989
}
9090
host.WHEPSessionsSnapshot.Store(make(map[string]*whep.WHEPSession))
9191
s.updateHostWHEPSessionsSnapshot()
92-
93-
go s.hostStatusLoop()
92+
s.HasHost.Store(true)
9493

9594
return nil
9695
}
@@ -104,6 +103,7 @@ func (s *Session) RemoveHost() {
104103
}
105104

106105
log.Println("Session.RemoveHost", s.StreamKey)
106+
s.HasHost.Store(false)
107107

108108
host.WHEPSessionsSnapshot.Store(make(map[string]*whep.WHEPSession))
109109
host.RemovePeerConnection()
@@ -131,6 +131,14 @@ func (s *Session) handleWHEPClose(whepSessionID string) {
131131
}
132132
}
133133

134+
func (s *Session) handleHostClosed() {
135+
s.RemoveHost()
136+
137+
if s.isEmpty() {
138+
s.close()
139+
}
140+
}
141+
134142
// Remove all Hosts and clients before closing down session
135143
func (s *Session) close() {
136144
s.closeOnce.Do(func() {

internal/webrtc/sessions/whep/types.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,14 @@ import (
1010
)
1111

1212
type (
13-
sseSubscriber struct {
14-
writeEvent func(string) bool
15-
cancel func()
16-
}
17-
1813
WHEPSession struct {
1914
SessionID string
2015
IsWaitingForKeyframe atomic.Bool
2116
IsSessionClosed atomic.Bool
2217

23-
SSESubscribersLock sync.RWMutex
24-
SSESubscribers map[string]sseSubscriber
25-
SessionClose sync.Once
26-
onClose func(string)
27-
pliSender func()
18+
SessionClose sync.Once
19+
onClose func(string)
20+
pliSender func()
2821

2922
PeerConnectionLock sync.RWMutex
3023
PeerConnection *webrtc.PeerConnection

internal/webrtc/sessions/whep/whep.go

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ func CreateNewWHEP(
2626
VideoTrack: videoTrack,
2727
AudioTimestamp: 5000,
2828
VideoTimestamp: 5000,
29-
SSESubscribers: make(map[string]sseSubscriber),
3029
PeerConnection: peerConnection,
3130
pliSender: pliSender,
3231
videoBitrateWindowStart: time.Now(),
@@ -66,10 +65,6 @@ func (w *WHEPSession) Close() {
6665
w.VideoLock.Unlock()
6766
w.AudioLock.Unlock()
6867

69-
w.SSESubscribersLock.Lock()
70-
w.SSESubscribers = make(map[string]sseSubscriber)
71-
w.SSESubscribersLock.Unlock()
72-
7368
if w.onClose != nil {
7469
w.onClose(w.SessionID)
7570
}
@@ -136,48 +131,6 @@ func (w *WHEPSession) SendPLI() {
136131
w.pliSender()
137132
}
138133
}
139-
140-
func (w *WHEPSession) AddSSESubscriber(subscriberID string, writeEvent func(string) bool, cancel func()) bool {
141-
w.SSESubscribersLock.Lock()
142-
defer w.SSESubscribersLock.Unlock()
143-
144-
if w.IsSessionClosed.Load() || writeEvent == nil || cancel == nil {
145-
return false
146-
}
147-
148-
w.SSESubscribers[subscriberID] = sseSubscriber{
149-
writeEvent: writeEvent,
150-
cancel: cancel,
151-
}
152-
return true
153-
}
154-
155-
func (w *WHEPSession) RemoveSSESubscriber(subscriberID string) {
156-
w.SSESubscribersLock.Lock()
157-
delete(w.SSESubscribers, subscriberID)
158-
w.SSESubscribersLock.Unlock()
159-
}
160-
161-
func (w *WHEPSession) BroadcastSSE(message string) {
162-
if message == "" || w.IsSessionClosed.Load() {
163-
return
164-
}
165-
166-
w.SSESubscribersLock.RLock()
167-
subscribers := make(map[string]sseSubscriber, len(w.SSESubscribers))
168-
for id, subscriber := range w.SSESubscribers {
169-
subscribers[id] = subscriber
170-
}
171-
w.SSESubscribersLock.RUnlock()
172-
173-
for id, subscriber := range subscribers {
174-
if !subscriber.writeEvent(message) {
175-
w.RemoveSSESubscriber(id)
176-
subscriber.cancel()
177-
}
178-
}
179-
}
180-
181134
func (w *WHEPSession) updateVideoBitrateLocked(now time.Time) {
182135
if w.videoBitrateWindowStart.IsZero() {
183136
w.videoBitrateWindowStart = now

internal/webrtc/sessions/whip/handlers.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func (w *WHIPSession) onICEConnectionStateChangeHandler() func(webrtc.ICEConnect
2424
return func(state webrtc.ICEConnectionState) {
2525
if state == webrtc.ICEConnectionStateFailed || state == webrtc.ICEConnectionStateClosed {
2626
log.Println("WHIPSession.PeerConnection.OnICEConnectionStateChange", w.ID)
27-
w.ActiveContextCancel()
27+
w.notifyClosed()
2828
}
2929
}
3030
}
@@ -51,9 +51,10 @@ func (w *WHIPSession) onConnectionStateChange() func(webrtc.PeerConnectionState)
5151

5252
switch state {
5353
case webrtc.PeerConnectionStateClosed:
54+
w.notifyClosed()
5455
case webrtc.PeerConnectionStateFailed:
5556
log.Println("WHIPSession.PeerConnection.OnConnectionStateChange: Host removed", w.ID)
56-
w.ActiveContextCancel()
57+
w.notifyClosed()
5758

5859
case webrtc.PeerConnectionStateConnected:
5960
log.Println("WHIPSession.PeerConnection.OnConnectionStateChange: Host connected", w.ID)

internal/webrtc/sessions/whip/peerconnection.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,18 @@ import (
77
"github.com/pion/webrtc/v4"
88
)
99

10+
func (w *WHIPSession) SetOnClosed(onClosed func()) {
11+
w.onClosed = onClosed
12+
}
13+
14+
func (w *WHIPSession) notifyClosed() {
15+
w.closeOnce.Do(func() {
16+
if w.onClosed != nil {
17+
w.onClosed()
18+
}
19+
})
20+
}
21+
1022
func (w *WHIPSession) AddPeerConnection(peerConnection *webrtc.PeerConnection, streamKey string) {
1123
log.Println("WHIPSession.AddPeerConnection")
1224

internal/webrtc/sessions/whip/types.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package whip
22

33
import (
4-
"context"
54
"sync"
65
"sync/atomic"
76

@@ -11,11 +10,11 @@ import (
1110

1211
type (
1312
WHIPSession struct {
14-
ID string
15-
ActiveContext context.Context
16-
ActiveContextCancel func()
17-
PeerConnectionLock sync.RWMutex
18-
PeerConnection *webrtc.PeerConnection
13+
ID string
14+
PeerConnection *webrtc.PeerConnection
15+
closeOnce sync.Once
16+
onClosed func()
17+
PeerConnectionLock sync.RWMutex
1918

2019
// Protects AudioTrack, VideoTracks
2120
TracksLock sync.RWMutex

0 commit comments

Comments
 (0)