Skip to content

Commit 2285109

Browse files
committed
Simplify session and WHEP lifecycle cleanup callbacks
1 parent b08a238 commit 2285109

6 files changed

Lines changed: 52 additions & 75 deletions

File tree

internal/webrtc/sessions/manager/manager.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package manager
22

33
import (
4-
"context"
54
"log"
65
"maps"
76
"time"
@@ -22,7 +21,6 @@ func (m *SessionManager) Setup() {
2221
// Add new session
2322
func (m *SessionManager) addSession(profile authorization.PublicProfile) (s *session.Session, err error) {
2423
log.Println("SessionManager.AddWHIPSession")
25-
activeContext, activeContextCancel := context.WithCancel(context.Background())
2624

2725
s = &session.Session{
2826

@@ -31,27 +29,19 @@ func (m *SessionManager) addSession(profile authorization.PublicProfile) (s *ses
3129
MOTD: profile.MOTD,
3230
StreamStart: time.Now(),
3331

34-
ActiveContext: activeContext,
35-
ActiveContextCancel: activeContextCancel,
36-
3732
WHEPSessions: map[string]*whep.WHEPSession{},
3833
}
39-
40-
s.HasHost.Store(true)
41-
m.sessionsLock.Lock()
42-
m.sessions[profile.StreamKey] = s
43-
m.sessionsLock.Unlock()
44-
45-
go func() {
46-
<-activeContext.Done()
34+
s.SetOnClose(func() {
4735
log.Println("SessionManager.Session.Done")
48-
4936
m.sessionsLock.Lock()
5037
delete(m.sessions, profile.StreamKey)
5138
m.sessionsLock.Unlock()
39+
})
5240

53-
}()
54-
41+
s.HasHost.Store(true)
42+
m.sessionsLock.Lock()
43+
m.sessions[profile.StreamKey] = s
44+
m.sessionsLock.Unlock()
5545
return s, nil
5646
}
5747

internal/webrtc/sessions/session/routines.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,6 @@ import (
2323
//
2424
// }
2525

26-
// Waits for WHEP disconnect and removes the session
27-
func (s *Session) handleWHEPConnection(whepSession *whep.WHEPSession) {
28-
log.Println("Session.WHEPSession.Connected:", s.StreamKey)
29-
30-
<-whepSession.ActiveContext.Done()
31-
32-
log.Println("Session.WHEPSession.Disconnected:", s.StreamKey, " - ", whepSession.SessionID)
33-
s.removeWHEP(whepSession.SessionID)
34-
}
35-
3626
func (s *Session) handleWHEPVideoRTCPSender(whepSession *whep.WHEPSession, rtcpSender *webrtc.RTPSender) {
3727
for {
3828
rtcpPackets, _, rtcpErr := rtcpSender.ReadRTCP()

internal/webrtc/sessions/session/session.go

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package session
22

33
import (
4-
"context"
54
"fmt"
65
"log"
76

@@ -23,6 +22,10 @@ func (s *Session) UpdateStreamStatus(profile authorization.PublicProfile) {
2322
s.StatusLock.Unlock()
2423
}
2524

25+
func (session *Session) SetOnClose(onClose func()) {
26+
session.onClose = onClose
27+
}
28+
2629
// Add WHEP session to existing WHIP session
2730
func (s *Session) AddWHEP(whepSessionID string, peerConnection *webrtc.PeerConnection, audioTrack *codecs.TrackMultiCodec, videoTrack *codecs.TrackMultiCodec, videoRTCPSender *webrtc.RTPSender) (err error) {
2831
log.Println("WHIPSessionManager.WHIPSession.AddWHEPSession")
@@ -47,8 +50,7 @@ func (s *Session) AddWHEP(whepSessionID string, peerConnection *webrtc.PeerConne
4750
s.WHEPSessions[whepSessionID] = whepSession
4851
s.WHEPSessionsLock.Unlock()
4952
s.updateHostWHEPSessionsSnapshot()
50-
51-
go s.handleWHEPConnection(whepSession)
53+
whepSession.SetOnClose(s.handleWHEPClose)
5254
go s.handleWHEPVideoRTCPSender(whepSession, videoRTCPSender)
5355

5456
return nil
@@ -64,7 +66,7 @@ func (s *Session) AddHost(peerConnection *webrtc.PeerConnection) (err error) {
6466
break
6567
}
6668

67-
if host.PeerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed || s.ActiveContext.Err() == nil {
69+
if host.PeerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
6870
return fmt.Errorf("session already has a host")
6971
}
7072

@@ -73,20 +75,14 @@ func (s *Session) AddHost(peerConnection *webrtc.PeerConnection) (err error) {
7375
}
7476
}
7577

76-
activeContext, activeContextCancel := context.WithCancel(context.Background())
77-
7878
host := &whip.WHIPSession{
7979
ID: uuid.New().String(),
8080
AudioTracks: make(map[string]*whip.AudioTrack),
8181
VideoTracks: make(map[string]*whip.VideoTrack),
82-
83-
ActiveContext: activeContext,
84-
ActiveContextCancel: activeContextCancel,
8582
}
8683

8784
host.AddPeerConnection(peerConnection, s.StreamKey)
8885
if !s.Host.CompareAndSwap(nil, host) {
89-
host.ActiveContextCancel()
9086
host.RemovePeerConnection()
9187
host.RemoveTracks()
9288
return fmt.Errorf("session already has a host")
@@ -110,25 +106,24 @@ func (s *Session) RemoveHost() {
110106
log.Println("Session.RemoveHost", s.StreamKey)
111107

112108
host.WHEPSessionsSnapshot.Store(make(map[string]*whep.WHEPSession))
113-
host.ActiveContextCancel()
114109
host.RemovePeerConnection()
115110
host.RemoveTracks()
116111
}
117112

118-
// Remove WHEP session from WHIP session
119-
// In case the WHIP session does not have a host, and no more whep sessions, it will
120-
// be remove from the manager.
121-
func (s *Session) removeWHEP(whepSessionID string) {
122-
log.Println("Session.RemoveWHEPSession:", s.StreamKey, " - ", whepSessionID)
113+
func (s *Session) handleWHEPClose(whepSessionID string) {
114+
log.Println("Session.HandleWHEPClose:", s.StreamKey, " - ", whepSessionID)
123115

124116
s.WHEPSessionsLock.Lock()
125-
if whepSession, ok := s.WHEPSessions[whepSessionID]; ok {
126-
whepSession.Close()
117+
_, ok := s.WHEPSessions[whepSessionID]
118+
if ok {
127119
delete(s.WHEPSessions, whepSessionID)
128-
} else {
129-
log.Println("Session.RemoveWHEPSession.InvalidSession:", s.StreamKey, " - ", whepSessionID)
130120
}
131121
s.WHEPSessionsLock.Unlock()
122+
123+
if !ok {
124+
return
125+
}
126+
132127
s.updateHostWHEPSessionsSnapshot()
133128

134129
if s.isEmpty() {
@@ -138,22 +133,26 @@ func (s *Session) removeWHEP(whepSessionID string) {
138133

139134
// Remove all Hosts and clients before closing down session
140135
func (s *Session) close() {
141-
s.WHEPSessionsLock.Lock()
142-
whepSessions := make([]*whep.WHEPSession, 0, len(s.WHEPSessions))
143-
for _, whepSession := range s.WHEPSessions {
144-
whepSessions = append(whepSessions, whepSession)
145-
}
146-
s.WHEPSessions = make(map[string]*whep.WHEPSession)
147-
s.WHEPSessionsLock.Unlock()
136+
s.closeOnce.Do(func() {
137+
s.WHEPSessionsLock.Lock()
138+
whepSessions := make([]*whep.WHEPSession, 0, len(s.WHEPSessions))
139+
for _, whepSession := range s.WHEPSessions {
140+
whepSessions = append(whepSessions, whepSession)
141+
}
142+
s.WHEPSessions = make(map[string]*whep.WHEPSession)
143+
s.WHEPSessionsLock.Unlock()
148144

149-
for _, whepSession := range whepSessions {
150-
whepSession.Close()
151-
}
152-
s.updateHostWHEPSessionsSnapshot()
145+
for _, whepSession := range whepSessions {
146+
whepSession.Close()
147+
}
148+
s.updateHostWHEPSessionsSnapshot()
153149

154-
s.RemoveHost()
150+
s.RemoveHost()
155151

156-
s.ActiveContextCancel()
152+
if s.onClose != nil {
153+
s.onClose()
154+
}
155+
})
157156
}
158157

159158
func (s *Session) Close() {

internal/webrtc/sessions/session/types.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package session
22

33
import (
4-
"context"
54
"sync"
65
"sync/atomic"
76
"time"
@@ -23,9 +22,8 @@ type Session struct {
2322

2423
Host atomic.Pointer[whip.WHIPSession]
2524

26-
// Context
27-
ActiveContext context.Context
28-
ActiveContextCancel func()
25+
closeOnce sync.Once
26+
onClose func()
2927

3028
// Protects WHEPSessions
3129
WHEPSessionsLock sync.RWMutex

internal/webrtc/sessions/whep/types.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package whep
22

33
import (
4-
"context"
54
"sync"
65
"sync/atomic"
76
"time"
@@ -21,12 +20,11 @@ type (
2120
IsWaitingForKeyframe atomic.Bool
2221
IsSessionClosed atomic.Bool
2322

24-
SSESubscribersLock sync.RWMutex
25-
SSESubscribers map[string]sseSubscriber
26-
SessionClose sync.Once
27-
ActiveContext context.Context
28-
ActiveContextCancel func()
29-
pliSender func()
23+
SSESubscribersLock sync.RWMutex
24+
SSESubscribers map[string]sseSubscriber
25+
SessionClose sync.Once
26+
onClose func(string)
27+
pliSender func()
3028

3129
PeerConnectionLock sync.RWMutex
3230
PeerConnection *webrtc.PeerConnection

internal/webrtc/sessions/whep/whep.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package whep
22

33
import (
4-
"context"
54
"log"
65
"time"
76

@@ -21,7 +20,6 @@ func CreateNewWHEP(
2120
) (w *WHEPSession) {
2221
log.Println("WHEPSession.CreateNewWHEP", whepSessionID)
2322

24-
activeContext, activeContextCancel := context.WithCancel(context.Background())
2523
w = &WHEPSession{
2624
SessionID: whepSessionID,
2725
AudioTrack: audioTrack,
@@ -30,8 +28,6 @@ func CreateNewWHEP(
3028
VideoTimestamp: 5000,
3129
SSESubscribers: make(map[string]sseSubscriber),
3230
PeerConnection: peerConnection,
33-
ActiveContext: activeContext,
34-
ActiveContextCancel: activeContextCancel,
3531
pliSender: pliSender,
3632
videoBitrateWindowStart: time.Now(),
3733
}
@@ -74,10 +70,16 @@ func (w *WHEPSession) Close() {
7470
w.SSESubscribers = make(map[string]sseSubscriber)
7571
w.SSESubscribersLock.Unlock()
7672

77-
w.ActiveContextCancel()
73+
if w.onClose != nil {
74+
w.onClose(w.SessionID)
75+
}
7876
})
7977
}
8078

79+
func (w *WHEPSession) SetOnClose(onClose func(string)) {
80+
w.onClose = onClose
81+
}
82+
8183
// Get the current status of the WHEP session
8284
func (w *WHEPSession) GetWHEPSessionStatus() (state SessionState) {
8385
w.AudioLock.RLock()

0 commit comments

Comments
 (0)