Skip to content

Commit b968bf8

Browse files
committed
internal: Trickle ICE
1 parent 551d83f commit b968bf8

3 files changed

Lines changed: 62 additions & 31 deletions

File tree

internal/webrtc/webrtc.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func getStream(streamKey string, whipSessionId string) (*stream, error) {
123123
return foundStream, nil
124124
}
125125

126-
func getPeerConnection(streamKey string, whipSessionId string) *webrtc.PeerConnection {
126+
func GetPeerConnection(whipSessionId string) *webrtc.PeerConnection {
127127
peerConnectionMapLock.Lock()
128128
defer peerConnectionMapLock.Unlock()
129129

@@ -383,7 +383,7 @@ func PopulateMediaEngine(m *webrtc.MediaEngine) error {
383383
return nil
384384
}
385385

386-
func newPeerConnection(api *webrtc.API, session_id string) (*webrtc.PeerConnection, error) {
386+
func newPeerConnection(api *webrtc.API, sessionId string) (*webrtc.PeerConnection, error) {
387387
cfg := webrtc.Configuration{}
388388

389389
if stunServers := os.Getenv("STUN_SERVERS"); stunServers != "" {
@@ -397,14 +397,14 @@ func newPeerConnection(api *webrtc.API, session_id string) (*webrtc.PeerConnecti
397397
peerConnectionMapLock.Lock()
398398
defer peerConnectionMapLock.Unlock()
399399

400-
_, ok := peerConnectionMap[session_id]
400+
_, ok := peerConnectionMap[sessionId]
401401
if ok {
402402
return nil, nil
403403
}
404404

405405
pc, err := api.NewPeerConnection(cfg)
406-
peerConnectionMap[session_id] = pc
407-
406+
peerConnectionMap[sessionId] = pc
407+
408408
return pc, err
409409
}
410410

internal/webrtc/whip.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,21 +140,21 @@ func videoWriter(remoteTrack *webrtc.TrackRemote, stream *stream, peerConnection
140140
}
141141
}
142142

143-
func WHIP(offer, streamKey string) (string, error) {
143+
func WHIP(offer, streamKey string) (string, string, error) {
144144
maybePrintOfferAnswer(offer, true)
145145

146146
whipSessionId := uuid.New().String()
147147

148148
peerConnection, err := newPeerConnection(apiWhip, whipSessionId)
149149
if err != nil {
150-
return "", err
150+
return "", "", err
151151
}
152152

153153
streamMapLock.Lock()
154154
defer streamMapLock.Unlock()
155155
stream, err := getStream(streamKey, whipSessionId)
156156
if err != nil {
157-
return "", err
157+
return "", "", err
158158
}
159159

160160
peerConnection.OnTrack(func(remoteTrack *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
@@ -179,18 +179,18 @@ func WHIP(offer, streamKey string) (string, error) {
179179
SDP: string(offer),
180180
Type: webrtc.SDPTypeOffer,
181181
}); err != nil {
182-
return "", err
182+
return "", "", err
183183
}
184184

185185
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
186186
answer, err := peerConnection.CreateAnswer(nil)
187187

188188
if err != nil {
189-
return "", err
189+
return "", "", err
190190
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
191-
return "", err
191+
return "", "", err
192192
}
193193

194194
<-gatherComplete
195-
return maybePrintOfferAnswer(appendAnswer(peerConnection.LocalDescription().SDP), false), nil
195+
return maybePrintOfferAnswer(appendAnswer(peerConnection.LocalDescription().SDP), false), whipSessionId, nil
196196
}

main.go

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/glimesh/broadcast-box/internal/networktest"
1919
"github.com/glimesh/broadcast-box/internal/webhook"
2020
"github.com/glimesh/broadcast-box/internal/webrtc"
21+
pionWebrtc "github.com/pion/webrtc/v4"
2122
"github.com/joho/godotenv"
2223
)
2324

@@ -36,6 +37,9 @@ var (
3637
errInvalidStreamKey = errors.New("invalid stream key format")
3738

3839
streamKeyRegex = regexp.MustCompile(`^[a-zA-Z0-9_\-\.~]+$`)
40+
ufragRegex = regexp.MustCompile(`a=ice-ufrag:(.*)`)
41+
pwdRegex = regexp.MustCompile(`a=ice-pwd:(.*)`)
42+
candidatesRegex = regexp.MustCompile(`a=(candidate:.*)`)
3943
)
4044

4145
type (
@@ -104,13 +108,13 @@ func whipPostHandler(res http.ResponseWriter, r *http.Request) {
104108
return
105109
}
106110

107-
answer, err := webrtc.WHIP(string(offer), streamKey)
111+
answer, sessionId, err := webrtc.WHIP(string(offer), streamKey)
108112
if err != nil {
109113
logHTTPError(res, err.Error(), http.StatusBadRequest)
110114
return
111115
}
112116

113-
res.Header().Add("Location", "/api/whip")
117+
res.Header().Add("Location", "/api/whip/"+sessionId)
114118
res.Header().Add("Content-Type", "application/sdp")
115119
res.WriteHeader(http.StatusCreated)
116120
if _, err = fmt.Fprint(res, answer); err != nil {
@@ -124,33 +128,58 @@ func whipPatchHandler(res http.ResponseWriter, r *http.Request) {
124128
return
125129
}
126130

127-
//TODO: get session id through a ressource id
128-
streamKey, err := getStreamKey("whip-connect", r)
129-
if err != nil {
130-
logHTTPError(res, err.Error(), http.StatusBadRequest)
131-
return
132-
}
131+
vals := strings.Split(r.URL.RequestURI(), "/")
132+
whipSessionId := vals[len(vals)-1]
133133

134134
patch, err := io.ReadAll(r.Body)
135135
if err != nil {
136136
logHTTPError(res, err.Error(), http.StatusBadRequest)
137137
return
138138
}
139139

140-
// regex_ufrag, _ := regexp.Compile("a=ice-ufrag:(.*)")
141-
// patch_ufrag := regex_ufrag.FindStringSubmatch(string(patch))[1]
142-
// regex_pwd, _ := regexp.Compile("a=ice-pwd:(.*)")
143-
// patch_pwd := regex_pwd.FindStringSubmatch(string(patch))[1]
140+
patchUfragMatches := ufragRegex.FindAllStringSubmatch(string(patch), -1)
141+
if len(patchUfragMatches) != 1 {
142+
logHTTPError(res, "More than one ice-ufrag", http.StatusBadRequest)
143+
}
144+
patchUfrag := string(patchUfragMatches[0][1])
144145

145-
log.Println("Patch for streamKey: " + string(streamKey))
146+
patchPwdMatches := pwdRegex.FindAllStringSubmatch(string(patch), -1)
147+
if len(patchPwdMatches) != 1 {
148+
logHTTPError(res, "More than one ice-pwd", http.StatusBadRequest)
149+
}
150+
patchPwd := string(patchPwdMatches[0][1])
146151

147-
regex_candidates, _ := regexp.Compile("a=(candidate:.*)")
148-
patch_candidates := regex_candidates.FindAllStringSubmatch(string(patch), -1)[0:]
149-
for i := range patch_candidates {
150-
log.Println(patch_candidates[i])
152+
patchCandidatesMatches := candidatesRegex.FindAllStringSubmatch(string(patch), -1)
153+
if len(patchCandidatesMatches) == 0 {
154+
return
151155
}
152156

153-
// pc := getPeerConnection()
157+
pc := webrtc.GetPeerConnection(whipSessionId)
158+
159+
localDescription := pc.RemoteDescription().SDP
160+
currentUfragsMatches := ufragRegex.FindAllStringSubmatch(string(localDescription), -1)
161+
if len(currentUfragsMatches) == 0 {
162+
logHTTPError(res, "No remote ice-ufrag", http.StatusExpectationFailed)
163+
return
164+
}
165+
currentPwdsMatches := pwdRegex.FindAllStringSubmatch(string(localDescription), -1)
166+
if len(currentPwdsMatches) == 0 {
167+
logHTTPError(res, "No remote ice-pwd", http.StatusExpectationFailed)
168+
return
169+
}
170+
currentUfrags := currentUfragsMatches[len(currentUfragsMatches)-1]
171+
currentUfrag := currentUfrags[1]
172+
currentPwds := currentPwdsMatches[len(currentPwdsMatches)-1]
173+
currentPwd := currentPwds[1]
174+
175+
if patchUfrag == currentUfrag && patchPwd == currentPwd {
176+
for i := range patchCandidatesMatches {
177+
log.Println("Adding candidate via Trickle ICE: " + patchCandidatesMatches[i][1])
178+
pc.AddICECandidate(pionWebrtc.ICECandidateInit{Candidate: string(patchCandidatesMatches[i][1])})
179+
}
180+
} else {
181+
//TODO: pc.RestartICE();
182+
}
154183

155184
res.Header().Add("Content-Type", "application/trickle-ice-sdpfrag")
156185
res.WriteHeader(http.StatusOK)
@@ -182,7 +211,7 @@ func whepHandler(res http.ResponseWriter, req *http.Request) {
182211
apiPath := req.Host + strings.TrimSuffix(req.URL.RequestURI(), "whep")
183212
res.Header().Add("Link", `<`+apiPath+"sse/"+whepSessionId+`>; rel="urn:ietf:params:whep:ext:core:server-sent-events"; events="layers"`)
184213
res.Header().Add("Link", `<`+apiPath+"layer/"+whepSessionId+`>; rel="urn:ietf:params:whep:ext:core:layer"`)
185-
res.Header().Add("Location", "/api/whep")
214+
res.Header().Add("Location", "/api/whep/"+whepSessionId)
186215
res.Header().Add("Content-Type", "application/sdp")
187216
res.WriteHeader(http.StatusCreated)
188217
if _, err = fmt.Fprint(res, answer); err != nil {
@@ -342,7 +371,9 @@ func main() {
342371
mux.Handle("/", indexHTMLWhenNotFound(http.Dir("./web/build")))
343372
}
344373
mux.HandleFunc("/api/whip", corsHandler(whipHandler))
374+
mux.HandleFunc("/api/whip/", corsHandler(whipHandler))
345375
mux.HandleFunc("/api/whep", corsHandler(whepHandler))
376+
mux.HandleFunc("/api/whep/", corsHandler(whepHandler))
346377
mux.HandleFunc("/api/sse/", corsHandler(whepServerSentEventsHandler))
347378
mux.HandleFunc("/api/layer/", corsHandler(whepLayerHandler))
348379
mux.HandleFunc("/api/status", corsHandler(statusHandler))

0 commit comments

Comments
 (0)