feat(logger): update logger configuration to set log level to Fatal to eliminate IO lock contention
fix(redis): silence Redis internal logging and optimize connection pool settings to reduce mutex contention feat(userlist): enhance user list component with avatar support and improved styling test(load): add production-style load test script for WebSocket connections and Redis PubSub stress testing chore(loadtest): create script to run load tests with pprof profiling for performance analysis
This commit is contained in:
@@ -62,3 +62,14 @@ GITHUB_CLIENT_SECRET=your-github-client-secret
|
||||
|
||||
# Redis (for future use)
|
||||
REDIS_URL=redis://localhost:6379
|
||||
|
||||
# ===================
|
||||
# PROFILING (pprof)
|
||||
# ===================
|
||||
# Enable pprof endpoints at /debug/pprof/* (non-production only)
|
||||
# ENABLE_PPROF=1
|
||||
# Only allow requests from localhost/loopback (recommended)
|
||||
# PPROF_LOCAL_ONLY=true
|
||||
# Optional: contention profiling (adds overhead; best for short windows)
|
||||
# PPROF_BLOCK_RATE=1
|
||||
# PPROF_MUTEX_FRACTION=1
|
||||
|
||||
@@ -37,10 +37,10 @@ type Client struct {
|
||||
idsMu sync.Mutex
|
||||
}
|
||||
type Room struct {
|
||||
ID string
|
||||
clients map[*Client]bool
|
||||
mu sync.RWMutex
|
||||
cancel context.CancelFunc
|
||||
ID string
|
||||
clients map[*Client]bool
|
||||
mu sync.RWMutex
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
type Hub struct {
|
||||
@@ -55,10 +55,40 @@ type Hub struct {
|
||||
logger *zap.Logger
|
||||
serverID string
|
||||
fallbackMode bool
|
||||
|
||||
// P0 fix: bounded worker pool for Redis Publish
|
||||
publishQueue chan *Message // buffered queue consumed by fixed workers
|
||||
publishDone chan struct{} // close to signal workers to exit
|
||||
|
||||
subscribeMu sync.Mutex
|
||||
|
||||
// Bounded worker pool for Redis SetAwareness
|
||||
awarenessQueue chan awarenessItem
|
||||
}
|
||||
|
||||
const (
|
||||
// publishWorkerCount is the number of fixed goroutines consuming from publishQueue.
|
||||
// 50 workers can handle ~2000 msg/sec assuming ~25ms avg Redis RTT per publish.
|
||||
publishWorkerCount = 50
|
||||
|
||||
// publishQueueSize is the buffer size for the publish queue channel.
|
||||
publishQueueSize = 4096
|
||||
|
||||
// awarenessWorkerCount is the number of fixed goroutines consuming from awarenessQueue.
|
||||
awarenessWorkerCount = 8
|
||||
|
||||
// awarenessQueueSize is the buffer size for awareness updates.
|
||||
awarenessQueueSize = 4096
|
||||
)
|
||||
|
||||
type awarenessItem struct {
|
||||
roomID string
|
||||
clientIDs []uint64
|
||||
data []byte
|
||||
}
|
||||
|
||||
func NewHub(messagebus messagebus.MessageBus, serverID string, logger *zap.Logger) *Hub {
|
||||
return &Hub{
|
||||
h := &Hub{
|
||||
rooms: make(map[string]*Room),
|
||||
Register: make(chan *Client, 2048),
|
||||
Unregister: make(chan *Client, 2048),
|
||||
@@ -67,8 +97,80 @@ func NewHub(messagebus messagebus.MessageBus, serverID string, logger *zap.Logge
|
||||
messagebus: messagebus,
|
||||
serverID: serverID,
|
||||
logger: logger,
|
||||
fallbackMode: false, // 默认 Redis 正常工作
|
||||
fallbackMode: false,
|
||||
// P0 fix: bounded publish worker pool
|
||||
publishQueue: make(chan *Message, publishQueueSize),
|
||||
publishDone: make(chan struct{}),
|
||||
// bounded awareness worker pool
|
||||
awarenessQueue: make(chan awarenessItem, awarenessQueueSize),
|
||||
}
|
||||
|
||||
// Start the fixed worker pool for Redis publishing
|
||||
h.startPublishWorkers(publishWorkerCount)
|
||||
h.startAwarenessWorkers(awarenessWorkerCount)
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// startPublishWorkers launches n goroutines that consume from publishQueue
|
||||
// and publish messages to Redis. Workers exit when publishDone is closed.
|
||||
func (h *Hub) startPublishWorkers(n int) {
|
||||
for i := 0; i < n; i++ {
|
||||
go func(workerID int) {
|
||||
for {
|
||||
select {
|
||||
case <-h.publishDone:
|
||||
h.logger.Info("Publish worker exiting", zap.Int("worker_id", workerID))
|
||||
return
|
||||
case msg, ok := <-h.publishQueue:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
|
||||
err := h.messagebus.Publish(ctx, msg.RoomID, msg.Data)
|
||||
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
h.logger.Error("Redis Publish failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
h.logger.Info("Publish worker pool started", zap.Int("workers", n))
|
||||
}
|
||||
|
||||
func (h *Hub) startAwarenessWorkers(n int) {
|
||||
for i := 0; i < n; i++ {
|
||||
go func(workerID int) {
|
||||
for {
|
||||
select {
|
||||
case <-h.publishDone:
|
||||
h.logger.Info("Awareness worker exiting", zap.Int("worker_id", workerID))
|
||||
return
|
||||
case item, ok := <-h.awarenessQueue:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if h.fallbackMode || h.messagebus == nil {
|
||||
continue
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
for _, clientID := range item.clientIDs {
|
||||
if err := h.messagebus.SetAwareness(ctx, item.roomID, clientID, item.data); err != nil {
|
||||
h.logger.Warn("Failed to cache awareness in Redis",
|
||||
zap.Uint64("yjs_id", clientID),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
h.logger.Info("Awareness worker pool started", zap.Int("workers", n))
|
||||
}
|
||||
|
||||
func (h *Hub) Run() {
|
||||
@@ -85,10 +187,12 @@ func (h *Hub) Run() {
|
||||
}
|
||||
|
||||
func (h *Hub) registerClient(client *Client) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
var room *Room
|
||||
var exists bool
|
||||
var needSubscribe bool
|
||||
|
||||
room, exists := h.rooms[client.roomID]
|
||||
h.mu.Lock()
|
||||
room, exists = h.rooms[client.roomID]
|
||||
|
||||
// --- 1. 初始化房间 (仅针对该服务器上的第一个人) ---
|
||||
if !exists {
|
||||
@@ -100,22 +204,40 @@ func (h *Hub) registerClient(client *Client) {
|
||||
}
|
||||
h.rooms[client.roomID] = room
|
||||
h.logger.Info("Created new local room instance", zap.String("room_id", client.roomID))
|
||||
}
|
||||
if room.cancel == nil && !h.fallbackMode && h.messagebus != nil {
|
||||
needSubscribe = true
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
// 开启跨服订阅
|
||||
if !h.fallbackMode && h.messagebus != nil {
|
||||
// 开启跨服订阅(避免在 h.mu 下做网络 I/O)
|
||||
if needSubscribe {
|
||||
h.subscribeMu.Lock()
|
||||
h.mu.RLock()
|
||||
room = h.rooms[client.roomID]
|
||||
alreadySubscribed := room != nil && room.cancel != nil
|
||||
h.mu.RUnlock()
|
||||
if !alreadySubscribed {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
room.cancel = cancel
|
||||
|
||||
msgChan, err := h.messagebus.Subscribe(ctx, client.roomID)
|
||||
if err != nil {
|
||||
h.logger.Error("Redis Subscribe failed", zap.Error(err))
|
||||
cancel()
|
||||
room.cancel = nil
|
||||
} else {
|
||||
// 启动转发协程:确保以后别的服务器的消息能传给这台机器的人
|
||||
go h.startRoomMessageForwarding(ctx, client.roomID, msgChan)
|
||||
h.mu.Lock()
|
||||
room = h.rooms[client.roomID]
|
||||
if room == nil {
|
||||
h.mu.Unlock()
|
||||
cancel()
|
||||
_ = h.messagebus.Unsubscribe(context.Background(), client.roomID)
|
||||
} else {
|
||||
room.cancel = cancel
|
||||
h.mu.Unlock()
|
||||
go h.startRoomMessageForwarding(ctx, client.roomID, msgChan)
|
||||
}
|
||||
}
|
||||
}
|
||||
h.subscribeMu.Unlock()
|
||||
}
|
||||
|
||||
// --- 2. 将客户端加入本地房间列表 ---
|
||||
@@ -129,61 +251,61 @@ func (h *Hub) registerClient(client *Client) {
|
||||
// 无论是不是第一个人,只要有人进来,我们就去 Redis 抓取所有人的状态发给他
|
||||
// hub/hub.go 内部的 registerClient 函数
|
||||
|
||||
// ... 之前的代码保持不变 ...
|
||||
// ... 之前的代码保持不变 ...
|
||||
|
||||
if !h.fallbackMode && h.messagebus != nil {
|
||||
go func(c *Client) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
if !h.fallbackMode && h.messagebus != nil {
|
||||
go func(c *Client) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// 1. 从 Redis 抓取
|
||||
awarenessMap, err := h.messagebus.GetAllAwareness(ctx, c.roomID)
|
||||
if err != nil {
|
||||
h.logger.Error("Redis sync failed in goroutine",
|
||||
zap.String("client_id", c.ID),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
// 1. 从 Redis 抓取
|
||||
awarenessMap, err := h.messagebus.GetAllAwareness(ctx, c.roomID)
|
||||
if err != nil {
|
||||
h.logger.Error("Redis sync failed in goroutine",
|
||||
zap.String("client_id", c.ID),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if len(awarenessMap) == 0 {
|
||||
h.logger.Debug("No awareness data found in Redis for sync", zap.String("room_id", c.roomID))
|
||||
return
|
||||
}
|
||||
if len(awarenessMap) == 0 {
|
||||
h.logger.Debug("No awareness data found in Redis for sync", zap.String("room_id", c.roomID))
|
||||
return
|
||||
}
|
||||
|
||||
h.logger.Info("Starting state delivery to joiner",
|
||||
zap.String("client_id", c.ID),
|
||||
zap.Int("items", len(awarenessMap)))
|
||||
h.logger.Info("Starting state delivery to joiner",
|
||||
zap.String("client_id", c.ID),
|
||||
zap.Int("items", len(awarenessMap)))
|
||||
|
||||
// 2. 逐条发送,带锁保护
|
||||
sentCount := 0
|
||||
for clientID, data := range awarenessMap {
|
||||
c.sendMu.Lock()
|
||||
// 🛑 核心防御:检查通道是否已被 unregisterClient 关闭
|
||||
if c.sendClosed {
|
||||
c.sendMu.Unlock()
|
||||
h.logger.Warn("Sync aborted: client channel closed while sending",
|
||||
zap.String("client_id", c.ID),
|
||||
zap.Uint64("target_yjs_id", clientID))
|
||||
return // 直接退出协程,不发了
|
||||
}
|
||||
// 2. 逐条发送,带锁保护
|
||||
sentCount := 0
|
||||
for clientID, data := range awarenessMap {
|
||||
c.sendMu.Lock()
|
||||
// 🛑 核心防御:检查通道是否已被 unregisterClient 关闭
|
||||
if c.sendClosed {
|
||||
c.sendMu.Unlock()
|
||||
h.logger.Warn("Sync aborted: client channel closed while sending",
|
||||
zap.String("client_id", c.ID),
|
||||
zap.Uint64("target_yjs_id", clientID))
|
||||
return // 直接退出协程,不发了
|
||||
}
|
||||
|
||||
select {
|
||||
case c.send <- data:
|
||||
sentCount++
|
||||
default:
|
||||
// 缓冲区满了(通常是因为网络太卡),记录一条警告
|
||||
h.logger.Warn("Sync item skipped: client send buffer full",
|
||||
zap.String("client_id", c.ID),
|
||||
zap.Uint64("target_yjs_id", clientID))
|
||||
}
|
||||
c.sendMu.Unlock()
|
||||
}
|
||||
select {
|
||||
case c.send <- data:
|
||||
sentCount++
|
||||
default:
|
||||
// 缓冲区满了(通常是因为网络太卡),记录一条警告
|
||||
h.logger.Warn("Sync item skipped: client send buffer full",
|
||||
zap.String("client_id", c.ID),
|
||||
zap.Uint64("target_yjs_id", clientID))
|
||||
}
|
||||
c.sendMu.Unlock()
|
||||
}
|
||||
|
||||
h.logger.Info("State sync completed successfully",
|
||||
zap.String("client_id", c.ID),
|
||||
zap.Int("delivered", sentCount))
|
||||
}(client)
|
||||
}
|
||||
h.logger.Info("State sync completed successfully",
|
||||
zap.String("client_id", c.ID),
|
||||
zap.Int("delivered", sentCount))
|
||||
}(client)
|
||||
}
|
||||
}
|
||||
func (h *Hub) unregisterClient(client *Client) {
|
||||
h.mu.Lock()
|
||||
@@ -228,9 +350,9 @@ func (h *Hub) unregisterClient(client *Client) {
|
||||
// 遍历该客户端在本机观察到的所有 Yjs ID
|
||||
for clientID := range client.observedYjsIDs {
|
||||
err := h.messagebus.DeleteAwareness(ctx, client.roomID, clientID)
|
||||
h.logger.Info("DEBUG: IDs to cleanup",
|
||||
zap.String("client_id", client.ID),
|
||||
zap.Any("ids", client.observedYjsIDs))
|
||||
h.logger.Info("DEBUG: IDs to cleanup",
|
||||
zap.String("client_id", client.ID),
|
||||
zap.Any("ids", client.observedYjsIDs))
|
||||
if err != nil {
|
||||
h.logger.Warn("Failed to delete awareness from Redis",
|
||||
zap.Uint64("yjs_id", clientID),
|
||||
@@ -347,19 +469,16 @@ func (h *Hub) broadcastMessage(message *Message) {
|
||||
h.broadcastToLocalClients(room, message.Data, message.sender)
|
||||
|
||||
// 只有本地客户端发出的消息 (sender != nil) 才推送到 Redis
|
||||
// P0 fix: send to bounded worker pool instead of spawning unbounded goroutines
|
||||
if message.sender != nil && !h.fallbackMode && h.messagebus != nil {
|
||||
go func() { // 建议异步 Publish,不阻塞 Hub 的主循环
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err := h.messagebus.Publish(ctx, message.RoomID, message.Data)
|
||||
if err != nil {
|
||||
h.logger.Error("MessageBus publish failed",
|
||||
zap.String("room_id", message.RoomID),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case h.publishQueue <- message:
|
||||
// Successfully queued for async publish by worker pool
|
||||
default:
|
||||
// Queue full — drop to protect the system (same pattern as broadcastToLocalClients)
|
||||
h.logger.Warn("Publish queue full, dropping Redis publish",
|
||||
zap.String("room_id", message.RoomID))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -379,7 +498,6 @@ func (h *Hub) broadcastToLocalClients(room *Room, data []byte, sender *Client) {
|
||||
client.failureMu.Unlock()
|
||||
|
||||
default:
|
||||
|
||||
client.handleSendFailure()
|
||||
}
|
||||
}
|
||||
@@ -559,18 +677,23 @@ func (c *Client) ReadPump() {
|
||||
c.idsMu.Unlock()
|
||||
|
||||
// Cache awareness in Redis for cross-server sync
|
||||
// Use a bounded worker pool to avoid blocking ReadPump on Redis I/O.
|
||||
if !c.hub.fallbackMode && c.hub.messagebus != nil {
|
||||
go func(cm map[uint64]uint64, msg []byte) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
for clientID := range cm {
|
||||
if err := c.hub.messagebus.SetAwareness(ctx, c.roomID, clientID, msg); err != nil {
|
||||
c.hub.logger.Warn("Failed to cache awareness in Redis",
|
||||
zap.Uint64("yjs_id", clientID),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
}(clockMap, message)
|
||||
clientIDs := make([]uint64, 0, len(clockMap))
|
||||
for clientID := range clockMap {
|
||||
clientIDs = append(clientIDs, clientID)
|
||||
}
|
||||
select {
|
||||
case c.hub.awarenessQueue <- awarenessItem{
|
||||
roomID: c.roomID,
|
||||
clientIDs: clientIDs,
|
||||
data: message,
|
||||
}:
|
||||
default:
|
||||
c.hub.logger.Warn("Awareness queue full, dropping update",
|
||||
zap.String("room_id", c.roomID),
|
||||
zap.Int("clients", len(clientIDs)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -628,6 +751,26 @@ func (c *Client) WritePump() {
|
||||
return
|
||||
}
|
||||
|
||||
// P2 fix: write coalescing — drain all queued messages in a tight loop
|
||||
for {
|
||||
select {
|
||||
case extra, ok := <-c.send:
|
||||
if !ok {
|
||||
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if err := c.Conn.WriteMessage(websocket.BinaryMessage, extra); err != nil {
|
||||
return
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
if len(c.send) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
|
||||
@@ -9,28 +9,28 @@ import (
|
||||
|
||||
// NewLogger creates a production-grade logger with appropriate configuration
|
||||
func NewLogger(isDevelopment bool) (*zap.Logger, error) {
|
||||
var config zap.Config
|
||||
var config zap.Config
|
||||
|
||||
if isDevelopment {
|
||||
config = zap.NewDevelopmentConfig()
|
||||
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
|
||||
} else {
|
||||
config = zap.NewProductionConfig()
|
||||
config.EncoderConfig.TimeKey = "timestamp"
|
||||
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
}
|
||||
if isDevelopment {
|
||||
config = zap.NewDevelopmentConfig()
|
||||
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
|
||||
} else {
|
||||
config = zap.NewProductionConfig()
|
||||
config.EncoderConfig.TimeKey = "timestamp"
|
||||
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
}
|
||||
|
||||
// Allow DEBUG level in development
|
||||
if isDevelopment {
|
||||
config.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel)
|
||||
}
|
||||
// 👇 关键修改:直接拉到 Fatal 级别
|
||||
// 这样 Error, Warn, Info, Debug 全部都会被忽略
|
||||
// 彻底消除 IO 锁竞争
|
||||
config.Level = zap.NewAtomicLevelAt(zapcore.FatalLevel)
|
||||
|
||||
logger, err := config.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logger, err := config.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return logger, nil
|
||||
return logger, nil
|
||||
}
|
||||
|
||||
// NewLoggerFromEnv creates logger based on environment
|
||||
|
||||
@@ -3,12 +3,16 @@ package messagebus
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
goredis "github.com/redis/go-redis/v9"
|
||||
goredislogging "github.com/redis/go-redis/v9/logging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@@ -33,6 +37,14 @@ type subscription struct {
|
||||
|
||||
// NewRedisMessageBus creates a new Redis-backed message bus
|
||||
func NewRedisMessageBus(redisURL string, serverID string, logger *zap.Logger) (*RedisMessageBus, error) {
|
||||
// ================================
|
||||
// CRITICAL: Silence Redis internal logging globally
|
||||
// ================================
|
||||
// go-redis v9 uses its own logger + std log.
|
||||
// Disable go-redis logger and discard std log to remove lock contention.
|
||||
goredislogging.Disable()
|
||||
log.SetOutput(io.Discard)
|
||||
|
||||
opts, err := goredis.ParseURL(redisURL)
|
||||
if err != nil {
|
||||
logger.Error("Redis URL failed",
|
||||
@@ -41,8 +53,64 @@ func NewRedisMessageBus(redisURL string, serverID string, logger *zap.Logger) (*
|
||||
)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ================================
|
||||
// CRITICAL FIX: Prevent Redis connection churn to reduce internal logging
|
||||
// ================================
|
||||
// Redis client uses Go's standard log package for connection pool events.
|
||||
// By optimizing pool settings to prevent connection churn, we eliminate
|
||||
// the 43.26s mutex contention (99.50% of total delay) caused by
|
||||
// log.(*Logger).output mutex in connection dial operations.
|
||||
|
||||
// ================================
|
||||
// Connection Pool Configuration (tuned for worker pool architecture)
|
||||
// ================================
|
||||
// With 50 publish workers + 10 PubSub subscriptions + awareness ops,
|
||||
// we need ~100 concurrent connections max, not 2000.
|
||||
// Oversized pool causes checkMinIdleConns to spawn hundreds of dial goroutines.
|
||||
opts.PoolSize = 200
|
||||
|
||||
// MinIdleConns: keep a small base ready for the worker pool
|
||||
// 50 workers + headroom. Too high = hundreds of maintenance goroutines dialing.
|
||||
opts.MinIdleConns = 30
|
||||
|
||||
// PoolTimeout: How long to wait for a connection from the pool
|
||||
// - With bounded worker pool, fail fast is better than blocking workers
|
||||
opts.PoolTimeout = 5 * time.Second
|
||||
|
||||
// ConnMaxIdleTime: Close idle connections after this duration
|
||||
// - Set to 0 to never close idle connections (good for stable load)
|
||||
// - Prevents connection churn that causes dialConn overhead
|
||||
opts.ConnMaxIdleTime = 0
|
||||
|
||||
// ConnMaxLifetime: Maximum lifetime of any connection
|
||||
// - Set high to avoid unnecessary reconnections during stable operation
|
||||
// - Redis will handle stale connections via TCP keepalive
|
||||
opts.ConnMaxLifetime = 1 * time.Hour
|
||||
|
||||
client := goredis.NewClient(opts)
|
||||
|
||||
// ================================
|
||||
// Connection Pool Pre-warming
|
||||
// ================================
|
||||
// Force the pool to establish MinIdleConns connections BEFORE accepting traffic.
|
||||
// This prevents the "thundering herd" problem where all 1000 users dial simultaneously.
|
||||
logger.Info("Pre-warming Redis connection pool...", zap.Int("target_conns", opts.MinIdleConns))
|
||||
|
||||
warmupCtx, warmupCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer warmupCancel()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < opts.MinIdleConns; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_ = client.Ping(warmupCtx).Err() // Ignore errors, best-effort warmup
|
||||
}()
|
||||
}
|
||||
wg.Wait() // Block until warmup completes
|
||||
|
||||
logger.Info("Connection pool pre-warming completed")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
|
||||
@@ -103,59 +171,112 @@ func (r *RedisMessageBus) Subscribe(ctx context.Context, roomID string) (<-chan
|
||||
r.logger.Debug("returning existing subscription", zap.String("roomID", roomID))
|
||||
return sub.channel, nil
|
||||
}
|
||||
|
||||
// Subscribe to Redis channel
|
||||
channel := fmt.Sprintf("room:%s:messages", roomID)
|
||||
pubsub := r.client.Subscribe(ctx, channel)
|
||||
|
||||
if _, err := pubsub.Receive(ctx); err != nil {
|
||||
pubsub.Close()
|
||||
return nil, fmt.Errorf("failed to verify subscription: %w", err)
|
||||
}
|
||||
r.logger.Info("Creating new Redis subscription",
|
||||
zap.String("roomID", roomID),
|
||||
zap.Int("current_map_size", len(r.subscriptions)),
|
||||
)
|
||||
|
||||
subCtx, cancel := context.WithCancel(context.Background())
|
||||
msgChan := make(chan []byte, 256)
|
||||
sub := &subscription{
|
||||
pubsub: pubsub,
|
||||
channel: msgChan,
|
||||
cancel: cancel,
|
||||
}
|
||||
r.subscriptions[roomID] = sub
|
||||
|
||||
go r.forwardMessages(subCtx, roomID, sub.pubsub, msgChan)
|
||||
go r.readLoop(subCtx, roomID, sub, msgChan)
|
||||
|
||||
r.logger.Info("successfully subscribed to room",
|
||||
zap.String("roomID", roomID),
|
||||
zap.String("channel", channel),
|
||||
)
|
||||
return msgChan, nil
|
||||
}
|
||||
|
||||
// forwardMessages receives from Redis PubSub and forwards to local channel
|
||||
func (r *RedisMessageBus) forwardMessages(ctx context.Context, roomID string, pubsub *goredis.PubSub, msgChan chan []byte) {
|
||||
// readLoop uses ReceiveTimeout to avoid the go-redis channel helper and its health-check goroutine.
|
||||
func (r *RedisMessageBus) readLoop(ctx context.Context, roomID string, sub *subscription, msgChan chan []byte) {
|
||||
defer func() {
|
||||
close(msgChan)
|
||||
r.logger.Info("forwarder stopped", zap.String("roomID", roomID))
|
||||
}()
|
||||
|
||||
//Get the Redis channel from pubsub
|
||||
ch := pubsub.Channel()
|
||||
channel := fmt.Sprintf("room:%s:messages", roomID)
|
||||
backoff := 200 * time.Millisecond
|
||||
maxBackoff := 5 * time.Second
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
r.logger.Info("stopping the channel due to context cancellation", zap.String("roomID", roomID))
|
||||
if ctx.Err() != nil {
|
||||
r.logger.Info("stopping read loop due to context", zap.String("roomID", roomID))
|
||||
return
|
||||
}
|
||||
|
||||
case msg, ok := <-ch:
|
||||
// Check if channel is closed (!ok)
|
||||
if !ok {
|
||||
r.logger.Warn("redis pubsub channel closed unexpectedly", zap.String("roomID", roomID))
|
||||
pubsub := r.client.Subscribe(ctx, channel)
|
||||
if _, err := pubsub.Receive(ctx); err != nil {
|
||||
pubsub.Close()
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
time.Sleep(backoff)
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse envelope: serverID + separator + payload
|
||||
raw := []byte(msg.Payload)
|
||||
// attach latest pubsub for Unsubscribe to close
|
||||
r.subMu.Lock()
|
||||
if cur, ok := r.subscriptions[roomID]; ok && cur == sub {
|
||||
sub.pubsub = pubsub
|
||||
} else {
|
||||
r.subMu.Unlock()
|
||||
pubsub.Close()
|
||||
return
|
||||
}
|
||||
r.subMu.Unlock()
|
||||
|
||||
backoff = 200 * time.Millisecond
|
||||
if err := r.receiveOnce(ctx, roomID, pubsub, msgChan); err != nil {
|
||||
pubsub.Close()
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
time.Sleep(backoff)
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RedisMessageBus) receiveOnce(ctx context.Context, roomID string, pubsub *goredis.PubSub, msgChan chan []byte) error {
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
msg, err := pubsub.ReceiveTimeout(ctx, 5*time.Second)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||||
return err
|
||||
}
|
||||
if errors.Is(err, goredis.Nil) {
|
||||
continue
|
||||
}
|
||||
r.logger.Warn("pubsub receive error, closing subscription",
|
||||
zap.String("roomID", roomID),
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
switch m := msg.(type) {
|
||||
case *goredis.Message:
|
||||
raw := []byte(m.Payload)
|
||||
sepIdx := bytes.Index(raw, envelopeSeparator)
|
||||
if sepIdx == -1 {
|
||||
r.logger.Warn("received message without server envelope, skipping",
|
||||
@@ -164,12 +285,10 @@ func (r *RedisMessageBus) forwardMessages(ctx context.Context, roomID string, pu
|
||||
}
|
||||
|
||||
senderID := string(raw[:sepIdx])
|
||||
payload := raw[sepIdx+len(envelopeSeparator):]
|
||||
|
||||
// Skip messages published by this same server (prevent echo)
|
||||
if senderID == r.serverID {
|
||||
continue
|
||||
}
|
||||
payload := raw[sepIdx+len(envelopeSeparator):]
|
||||
|
||||
select {
|
||||
case msgChan <- payload:
|
||||
@@ -181,6 +300,10 @@ func (r *RedisMessageBus) forwardMessages(ctx context.Context, roomID string, pu
|
||||
r.logger.Warn("message dropped: consumer too slow",
|
||||
zap.String("roomID", roomID))
|
||||
}
|
||||
case *goredis.Subscription:
|
||||
continue
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -188,26 +311,28 @@ func (r *RedisMessageBus) forwardMessages(ctx context.Context, roomID string, pu
|
||||
// Unsubscribe stops listening to a room
|
||||
func (r *RedisMessageBus) Unsubscribe(ctx context.Context, roomID string) error {
|
||||
r.subMu.Lock()
|
||||
defer r.subMu.Unlock()
|
||||
|
||||
// Check if subscription exists
|
||||
sub, ok := r.subscriptions[roomID]
|
||||
if !ok {
|
||||
r.subMu.Unlock()
|
||||
r.logger.Debug("unsubscribe ignored: room not found", zap.String("roomID", roomID))
|
||||
return nil
|
||||
}
|
||||
// Cancel the context (stops forwardMessages goroutine)
|
||||
delete(r.subscriptions, roomID)
|
||||
r.subMu.Unlock()
|
||||
|
||||
// Cancel the context (stops readLoop goroutine)
|
||||
sub.cancel()
|
||||
|
||||
// Close the Redis pubsub connection
|
||||
if err := sub.pubsub.Close(); err != nil {
|
||||
r.logger.Error("failed to close redis pubsub",
|
||||
zap.String("roomID", roomID),
|
||||
zap.Error(err),
|
||||
)
|
||||
// Close the Redis pubsub connection (outside lock to avoid blocking others)
|
||||
if sub.pubsub != nil {
|
||||
if err := sub.pubsub.Close(); err != nil {
|
||||
r.logger.Error("failed to close redis pubsub",
|
||||
zap.String("roomID", roomID),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
// Remove from subscriptions map
|
||||
delete(r.subscriptions, roomID)
|
||||
r.logger.Info("successfully unsubscribed", zap.String("roomID", roomID))
|
||||
|
||||
return nil
|
||||
@@ -354,28 +479,28 @@ func (r *RedisMessageBus) StartHealthMonitoring(ctx context.Context, interval ti
|
||||
|
||||
func (r *RedisMessageBus) Close() error {
|
||||
r.subMu.Lock()
|
||||
defer r.subMu.Unlock()
|
||||
|
||||
r.logger.Info("gracefully shutting down message bus", zap.Int("active_subs", len(r.subscriptions)))
|
||||
subs := r.subscriptions
|
||||
r.subscriptions = make(map[string]*subscription)
|
||||
r.subMu.Unlock()
|
||||
|
||||
// 1. 关闭所有正在运行的订阅
|
||||
for roomID, sub := range r.subscriptions {
|
||||
// 停止对应的 forwardMessages 协程
|
||||
for roomID, sub := range subs {
|
||||
// 停止对应的 readLoop 协程
|
||||
sub.cancel()
|
||||
|
||||
// 关闭物理连接
|
||||
if err := sub.pubsub.Close(); err != nil {
|
||||
r.logger.Error("failed to close pubsub connection",
|
||||
zap.String("roomID", roomID),
|
||||
zap.Error(err),
|
||||
)
|
||||
if sub.pubsub != nil {
|
||||
if err := sub.pubsub.Close(); err != nil {
|
||||
r.logger.Error("failed to close pubsub connection",
|
||||
zap.String("roomID", roomID),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 清空 Map,释放引用以便 GC 回收
|
||||
r.subscriptions = make(map[string]*subscription)
|
||||
|
||||
// 3. 关闭主 Redis 客户端连接池
|
||||
// 2. 关闭主 Redis 客户端连接池
|
||||
if err := r.client.Close(); err != nil {
|
||||
r.logger.Error("failed to close redis client", zap.Error(err))
|
||||
return err
|
||||
@@ -384,9 +509,10 @@ func (r *RedisMessageBus) Close() error {
|
||||
r.logger.Info("Redis message bus closed successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClearAllAwareness 彻底删除该房间的感知数据 Hash
|
||||
func (r *RedisMessageBus) ClearAllAwareness(ctx context.Context, roomID string) error {
|
||||
key := fmt.Sprintf("room:%s:awareness", roomID)
|
||||
// 直接使用 Del 命令删除整个 Key
|
||||
return r.client.Del(ctx, key).Err()
|
||||
key := fmt.Sprintf("room:%s:awareness", roomID)
|
||||
// 直接使用 Del 命令删除整个 Key
|
||||
return r.client.Del(ctx, key).Err()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user