- Added user information (UserID, UserName, UserAvatar) to Client struct for presence tracking. - Implemented failure handling in the broadcastMessage function to manage send failures and disconnect clients if necessary. - Introduced document ownership and sharing capabilities: - Added OwnerID and Is_Public fields to Document model. - Created DocumentShare model for managing document sharing with permissions. - Implemented functions for creating, listing, and managing document shares in the Postgres store. - Added user management functionality: - Created User model and associated functions for user management in the Postgres store. - Implemented session management with token hashing for security. - Updated database schema with migrations for users, sessions, and document shares. - Enhanced frontend Yjs integration with awareness event logging for user connections and disconnections.
250 lines
6.2 KiB
Go
250 lines
6.2 KiB
Go
package hub
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
type Message struct {
|
|
RoomID string
|
|
Data []byte
|
|
sender *Client
|
|
}
|
|
|
|
type Client struct {
|
|
ID string
|
|
UserID *uuid.UUID // Authenticated user ID (nil for public share access)
|
|
UserName string // User's display name for presence
|
|
UserAvatar *string // User's avatar URL for presence
|
|
Conn *websocket.Conn
|
|
send chan []byte
|
|
sendMu sync.Mutex
|
|
sendClosed bool
|
|
hub *Hub
|
|
roomID string
|
|
mutex sync.Mutex
|
|
unregisterOnce sync.Once
|
|
failureCount int
|
|
failureMu sync.Mutex
|
|
}
|
|
type Room struct {
|
|
ID string
|
|
clients map[*Client]bool
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
type Hub struct {
|
|
rooms map[string]*Room
|
|
mu sync.RWMutex
|
|
Register chan *Client // Exported
|
|
Unregister chan *Client // Exported
|
|
Broadcast chan *Message // Exported
|
|
}
|
|
func NewHub() *Hub {
|
|
return &Hub{
|
|
rooms: make(map[string]*Room),
|
|
Register: make(chan *Client),
|
|
Unregister: make(chan *Client),
|
|
Broadcast: make(chan *Message),
|
|
}
|
|
}
|
|
|
|
func (h *Hub) Run() {
|
|
for {
|
|
select {
|
|
case client := <-h.Register:
|
|
h.registerClient(client)
|
|
case client := <-h.Unregister:
|
|
h.unregisterClient(client)
|
|
case message := <-h.Broadcast:
|
|
h.broadcastMessage(message)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Hub) registerClient(client *Client) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
room, exists := h.rooms[client.roomID]
|
|
if !exists {
|
|
room = &Room{
|
|
ID: client.roomID,
|
|
clients: make(map[*Client]bool),
|
|
}
|
|
h.rooms[client.roomID] = room
|
|
log.Printf("Created new room with ID: %s", client.roomID)
|
|
}
|
|
room.mu.Lock()
|
|
room.clients[client] = true
|
|
room.mu.Unlock()
|
|
log.Printf("Client %s joined room %s (total clients: %d)", client.ID, client.roomID, len(room.clients))
|
|
}
|
|
func (h *Hub) unregisterClient(client *Client) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
room, exists := h.rooms[client.roomID]
|
|
if !exists {
|
|
log.Printf("Room %s does not exist for client %s", client.roomID, client.ID)
|
|
return
|
|
}
|
|
|
|
room.mu.Lock()
|
|
defer room.mu.Unlock()
|
|
|
|
if _, ok := room.clients[client]; ok {
|
|
delete(room.clients, client)
|
|
|
|
// Safely close send channel exactly once
|
|
client.sendMu.Lock()
|
|
if !client.sendClosed {
|
|
close(client.send)
|
|
client.sendClosed = true
|
|
}
|
|
client.sendMu.Unlock()
|
|
|
|
log.Printf("Client %s disconnected from room %s (total clients: %d)",
|
|
client.ID, client.roomID, len(room.clients))
|
|
}
|
|
|
|
if len(room.clients) == 0 {
|
|
delete(h.rooms, client.roomID)
|
|
log.Printf("Deleted empty room with ID: %s", client.roomID)
|
|
}
|
|
}
|
|
|
|
const (
|
|
writeWait = 10 * time.Second
|
|
pongWait = 60 * time.Second
|
|
pingPeriod = (pongWait * 9) / 10 // 54 seconds
|
|
maxSendFailures = 5
|
|
)
|
|
|
|
func (h *Hub) broadcastMessage(message *Message) {
|
|
h.mu.RLock()
|
|
room, exists := h.rooms[message.RoomID]
|
|
h.mu.RUnlock()
|
|
if !exists {
|
|
log.Printf("Room %s does not exist for broadcasting", message.RoomID)
|
|
return
|
|
}
|
|
|
|
room.mu.RLock()
|
|
defer room.mu.RUnlock()
|
|
|
|
for client := range room.clients {
|
|
if client != message.sender {
|
|
select {
|
|
case client.send <- message.Data:
|
|
// Success - reset failure count
|
|
client.failureMu.Lock()
|
|
client.failureCount = 0
|
|
client.failureMu.Unlock()
|
|
|
|
default:
|
|
// Failed - increment failure count
|
|
client.failureMu.Lock()
|
|
client.failureCount++
|
|
currentFailures := client.failureCount
|
|
client.failureMu.Unlock()
|
|
|
|
log.Printf("Failed to send to client %s (channel full, failures: %d/%d)",
|
|
client.ID, currentFailures, maxSendFailures)
|
|
|
|
// Disconnect if threshold exceeded
|
|
if currentFailures >= maxSendFailures {
|
|
log.Printf("Client %s exceeded max send failures, disconnecting", client.ID)
|
|
go func(c *Client) {
|
|
c.unregister()
|
|
c.Conn.Close()
|
|
}(client)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func (c *Client) ReadPump() {
|
|
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
c.Conn.SetPongHandler(func(string) error {
|
|
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
return nil
|
|
})
|
|
defer func() {
|
|
c.unregister()
|
|
c.Conn.Close()
|
|
}()
|
|
for {
|
|
messageType, message, err := c.Conn.ReadMessage()
|
|
if err != nil {
|
|
log.Printf("Error reading message from client %s: %v", c.ID, err)
|
|
break
|
|
}
|
|
if messageType == websocket.BinaryMessage {
|
|
c.hub.Broadcast <- &Message{
|
|
RoomID: c.roomID,
|
|
Data: message,
|
|
sender: c,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) WritePump() {
|
|
ticker := time.NewTicker(pingPeriod)
|
|
defer func() {
|
|
ticker.Stop()
|
|
c.unregister() // NEW: Now WritePump also unregisters
|
|
c.Conn.Close()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case message, ok := <-c.send:
|
|
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if !ok {
|
|
// Hub closed the channel
|
|
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
|
|
return
|
|
}
|
|
|
|
err := c.Conn.WriteMessage(websocket.BinaryMessage, message)
|
|
if err != nil {
|
|
log.Printf("Error writing message to client %s: %v", c.ID, err)
|
|
return
|
|
}
|
|
|
|
case <-ticker.C:
|
|
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
log.Printf("Ping failed for client %s: %v", c.ID, err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
func NewClient(id string, userID *uuid.UUID, userName string, userAvatar *string, conn *websocket.Conn, hub *Hub, roomID string) *Client {
|
|
return &Client{
|
|
ID: id,
|
|
UserID: userID,
|
|
UserName: userName,
|
|
UserAvatar: userAvatar,
|
|
Conn: conn,
|
|
send: make(chan []byte, 256),
|
|
hub: hub,
|
|
roomID: roomID,
|
|
}
|
|
}
|
|
func (c *Client) unregister() {
|
|
c.unregisterOnce.Do(func() {
|
|
c.hub.Unregister <- c
|
|
})
|
|
} |