Rewriting A WebRTC SFU Application With Fiber And Fiber Socket.IO A Comprehensive Guide
Hey guys! So, you're diving into the world of WebRTC and trying to build a Selective Forwarding Unit (SFU) application using Fiber and Fiber Socket.IO? That's awesome! It's a fantastic way to create scalable and efficient real-time communication systems. You're wrestling with rewriting an existing example to fit your needs, specifically dealing with that pesky endless loop. Don't worry, I've got your back! Let's break down this challenge and figure out how to make this work.
Understanding the Challenge: Endless Loops and Socket.IO
The core issue you've highlighted revolves around an endless loop in your WebSocket handler and how to transition it to a more event-driven approach using Socket.IO. The original code uses a for
loop to continuously read messages from the WebSocket connection. While this works, it's not the most elegant solution, especially when integrating with Socket.IO, which is designed to handle events asynchronously.
Why Endless Loops Can Be Tricky
- Resource Consumption: An endless loop, if not carefully managed, can consume significant resources. If there are no messages to process, the loop might still be spinning, taking up CPU cycles.
- Scalability: In a high-traffic SFU application, multiple endless loops can quickly become a bottleneck. Each connection requires its own loop, and managing these can be challenging.
- Maintainability: Endless loops can make your code harder to read and maintain. It's not always immediately clear what the loop is doing and how it interacts with other parts of your application.
The Socket.IO Way: Event-Driven Architecture
Socket.IO shines when handling real-time communication through an event-driven model. Instead of continuously polling for messages, you define event handlers that are triggered when specific events occur (e.g., a new message arrives, a client connects, or a client disconnects). This approach is more efficient, scalable, and easier to manage.
Diving into the Code: A Step-by-Step Transformation
Let's look at the code snippet you provided and refactor it to use Fiber Socket.IO in a more idiomatic way. We'll focus on replacing the endless loop with Socket.IO event handlers.
The Original Code Snippet
Here's the relevant part of the original code, specifically the ConferenceWebsocketHandler
function:
func (s *WSHandler) ConferenceWebsocketHandler(c *fiber.Ctx) error {
return socketio.New(func(conn *socketio.Websocket) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initializing PeerConnection and session
session, err := s.initPeerSession(ctx, conn)
if err != nil {
s.logger.Errorf("init peer session: %v", err)
return
}
defer session.Close()
// Registering a peer in the conference
s.conference_usecase.AddPeer(session.PC, session.Writer)
// Processing of ICE candidates
session.PC.OnICECandidate(func(cand *webrtc.ICECandidate) {
if cand == nil {
return
}
if err := s.sendCandidate(session, cand); err != nil {
s.logger.Errorf("send candidate: %v", err)
}
})
// Processing connection status changes
session.PC.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
if state == webrtc.PeerConnectionStateClosed {
if err := s.conference_usecase.SignalPeers(); err != nil {
s.logger.Errorf("signal peers after close: %v", err)
}
}
})
// Processing incoming media tracks
session.PC.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
s.handleTrack(session, t)
})
// Initial signaling of feasts
if err := s.conference_usecase.SignalPeers(); err != nil {
s.logger.Errorf("initial signal peers: %v", err)
}
// The main cycle of reading WebSocket messages from the client
for {
msg, err := session.ReadMessage()
if err != nil {
s.logger.Infof("read message error or connection closed: %v", err)
return
}
if err := s.handleWebsocketMessage(session, msg); err != nil {
s.logger.Errorf("handle websocket message: %v", err)
}
}
})(c)
}
And the handleWebsocketMessage
function:
func (s *WSHandler) handleWebsocketMessage(session *PeerSession, msg conference_utils.WebsocketMessage) error {
switch msg.Event {
case EventCandidate:
var candidate webrtc.ICECandidateInit
if err := json.Unmarshal([]byte(msg.Data), &candidate); err != nil {
return err
}
return session.PC.AddICECandidate(candidate)
case EventAnswer:
var answer webrtc.SessionDescription
if err := json.Unmarshal([]byte(msg.Data), &answer); err != nil {
return err
}
return session.PC.SetRemoteDescription(answer)
default:
// Ignoring unknown events, you can add logging
return nil
}
}
The key part we want to change is the for
loop within ConferenceWebsocketHandler
. Instead of this loop, we'll use Socket.IO's event handling mechanism.
Refactoring with Socket.IO Event Handlers
Here's how we can refactor the code:
- Remove the Endless Loop: Get rid of the
for
loop inConferenceWebsocketHandler
. - Register Event Handlers: Use
conn.On()
to register handlers for different events. Specifically, we'll need a handler for incoming messages. - Move Message Handling Logic: Move the logic from the
handleWebsocketMessage
function into the event handler.
Here’s the refactored ConferenceWebsocketHandler
function:
func (s *WSHandler) ConferenceWebsocketHandler(c *fiber.Ctx) error {
return socketio.New(func(conn *socketio.Websocket) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initializing PeerConnection and session
session, err := s.initPeerSession(ctx, conn)
if err != nil {
s.logger.Errorf("init peer session: %v", err)
return
}
defer session.Close()
// Registering a peer in the conference
s.conference_usecase.AddPeer(session.PC, session.Writer)
// Processing of ICE candidates
session.PC.OnICECandidate(func(cand *webrtc.ICECandidate) {
if cand == nil {
return
}
if err := s.sendCandidate(session, cand); err != nil {
s.logger.Errorf("send candidate: %v", err)
}
})
// Processing connection status changes
session.PC.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
if state == webrtc.PeerConnectionStateClosed {
if err := s.conference_usecase.SignalPeers(); err != nil {
s.logger.Errorf("signal peers after close: %v", err)
}
}
})
// Processing incoming media tracks
session.PC.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
s.handleTrack(session, t)
})
// Initial signaling of feasts
if err := s.conference_usecase.SignalPeers(); err != nil {
s.logger.Errorf("initial signal peers: %v", err)
}
// Handle incoming messages using Socket.IO events
conn.On("message", func(msg conference_utils.WebsocketMessage) {
if err := s.handleWebsocketMessage(session, msg); err != nil {
s.logger.Errorf("handle websocket message: %v", err)
}
})
// Handle disconnection
conn.On("disconnect", func() {
s.logger.Infof("connection closed")
// Clean up resources, signal peers, etc.
if err := s.conference_usecase.SignalPeers(); err != nil {
s.logger.Errorf("signal peers after disconnect: %v", err)
}
})
})(c)
}
Explanation of Changes
conn.On("message", ...)
: This is the key change. We're usingconn.On()
to register a handler function for the `