Rewriting A WebRTC SFU Application With Fiber And Fiber Socket.IO A Comprehensive Guide

by Sharif Sakr 88 views

Hey guys! So, you're diving into the world of WebRTC and trying to build a Selective Forwarding Unit (SFU) application using Fiber and Fiber Socket.IO? That's awesome! It's a fantastic way to create scalable and efficient real-time communication systems. You're wrestling with rewriting an existing example to fit your needs, specifically dealing with that pesky endless loop. Don't worry, I've got your back! Let's break down this challenge and figure out how to make this work.

Understanding the Challenge: Endless Loops and Socket.IO

The core issue you've highlighted revolves around an endless loop in your WebSocket handler and how to transition it to a more event-driven approach using Socket.IO. The original code uses a for loop to continuously read messages from the WebSocket connection. While this works, it's not the most elegant solution, especially when integrating with Socket.IO, which is designed to handle events asynchronously.

Why Endless Loops Can Be Tricky

  • Resource Consumption: An endless loop, if not carefully managed, can consume significant resources. If there are no messages to process, the loop might still be spinning, taking up CPU cycles.
  • Scalability: In a high-traffic SFU application, multiple endless loops can quickly become a bottleneck. Each connection requires its own loop, and managing these can be challenging.
  • Maintainability: Endless loops can make your code harder to read and maintain. It's not always immediately clear what the loop is doing and how it interacts with other parts of your application.

The Socket.IO Way: Event-Driven Architecture

Socket.IO shines when handling real-time communication through an event-driven model. Instead of continuously polling for messages, you define event handlers that are triggered when specific events occur (e.g., a new message arrives, a client connects, or a client disconnects). This approach is more efficient, scalable, and easier to manage.

Diving into the Code: A Step-by-Step Transformation

Let's look at the code snippet you provided and refactor it to use Fiber Socket.IO in a more idiomatic way. We'll focus on replacing the endless loop with Socket.IO event handlers.

The Original Code Snippet

Here's the relevant part of the original code, specifically the ConferenceWebsocketHandler function:

func (s *WSHandler) ConferenceWebsocketHandler(c *fiber.Ctx) error {
    return socketio.New(func(conn *socketio.Websocket) {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        // Initializing PeerConnection and session
        session, err := s.initPeerSession(ctx, conn)
        if err != nil {
            s.logger.Errorf("init peer session: %v", err)
            return
        }
        defer session.Close()

        // Registering a peer in the conference
        s.conference_usecase.AddPeer(session.PC, session.Writer)

        // Processing of ICE candidates
        session.PC.OnICECandidate(func(cand *webrtc.ICECandidate) {
            if cand == nil {
                return
            }
            if err := s.sendCandidate(session, cand); err != nil {
                s.logger.Errorf("send candidate: %v", err)
            }
        })

        // Processing connection status changes
        session.PC.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
            if state == webrtc.PeerConnectionStateClosed {
                if err := s.conference_usecase.SignalPeers(); err != nil {
                    s.logger.Errorf("signal peers after close: %v", err)
                }
            }
        })

        // Processing incoming media tracks
        session.PC.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
            s.handleTrack(session, t)
        })

        // Initial signaling of feasts
        if err := s.conference_usecase.SignalPeers(); err != nil {
            s.logger.Errorf("initial signal peers: %v", err)
        }

        // The main cycle of reading WebSocket messages from the client
        for {
            msg, err := session.ReadMessage()
            if err != nil {
                s.logger.Infof("read message error or connection closed: %v", err)
                return
            }

            if err := s.handleWebsocketMessage(session, msg); err != nil {
                s.logger.Errorf("handle websocket message: %v", err)
            }
        }
    })(c)
}

And the handleWebsocketMessage function:

func (s *WSHandler) handleWebsocketMessage(session *PeerSession, msg conference_utils.WebsocketMessage) error {
    switch msg.Event {
    case EventCandidate:
        var candidate webrtc.ICECandidateInit
        if err := json.Unmarshal([]byte(msg.Data), &candidate); err != nil {
            return err
        }
        return session.PC.AddICECandidate(candidate)

    case EventAnswer:
        var answer webrtc.SessionDescription
        if err := json.Unmarshal([]byte(msg.Data), &answer); err != nil {
            return err
        }
        return session.PC.SetRemoteDescription(answer)

    default:
        // Ignoring unknown events, you can add logging
        return nil
    }
}

The key part we want to change is the for loop within ConferenceWebsocketHandler. Instead of this loop, we'll use Socket.IO's event handling mechanism.

Refactoring with Socket.IO Event Handlers

Here's how we can refactor the code:

  1. Remove the Endless Loop: Get rid of the for loop in ConferenceWebsocketHandler.
  2. Register Event Handlers: Use conn.On() to register handlers for different events. Specifically, we'll need a handler for incoming messages.
  3. Move Message Handling Logic: Move the logic from the handleWebsocketMessage function into the event handler.

Here’s the refactored ConferenceWebsocketHandler function:

func (s *WSHandler) ConferenceWebsocketHandler(c *fiber.Ctx) error {
    return socketio.New(func(conn *socketio.Websocket) {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        // Initializing PeerConnection and session
        session, err := s.initPeerSession(ctx, conn)
        if err != nil {
            s.logger.Errorf("init peer session: %v", err)
            return
        }
        defer session.Close()

        // Registering a peer in the conference
        s.conference_usecase.AddPeer(session.PC, session.Writer)

        // Processing of ICE candidates
        session.PC.OnICECandidate(func(cand *webrtc.ICECandidate) {
            if cand == nil {
                return
            }
            if err := s.sendCandidate(session, cand); err != nil {
                s.logger.Errorf("send candidate: %v", err)
            }
        })

        // Processing connection status changes
        session.PC.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
            if state == webrtc.PeerConnectionStateClosed {
                if err := s.conference_usecase.SignalPeers(); err != nil {
                    s.logger.Errorf("signal peers after close: %v", err)
                }
            }
        })

        // Processing incoming media tracks
        session.PC.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
            s.handleTrack(session, t)
        })

        // Initial signaling of feasts
        if err := s.conference_usecase.SignalPeers(); err != nil {
            s.logger.Errorf("initial signal peers: %v", err)
        }

        // Handle incoming messages using Socket.IO events
        conn.On("message", func(msg conference_utils.WebsocketMessage) {
            if err := s.handleWebsocketMessage(session, msg); err != nil {
                s.logger.Errorf("handle websocket message: %v", err)
            }
        })

        // Handle disconnection
        conn.On("disconnect", func() {
            s.logger.Infof("connection closed")
            // Clean up resources, signal peers, etc.
            if err := s.conference_usecase.SignalPeers(); err != nil {
                s.logger.Errorf("signal peers after disconnect: %v", err)
            }
        })

    })(c)
}

Explanation of Changes

  • conn.On("message", ...): This is the key change. We're using conn.On() to register a handler function for the `