package rules import ( "encoding/json" "os" "path/filepath" "sort" "sync" "time" "vpnem/internal/models" ) const ( sessionExpiry = 1 * time.Hour // session considered stale after 1h studioExpiry = 7 * 24 * time.Hour // studio record kept for 7 days defaultMaxCap = 50 // default max clients per server ) // ConnectionStore manages active sessions and studio assignments. type ConnectionStore struct { mu sync.RWMutex path string sessions map[string]*models.ActiveSession // key: client_ip (one active session per studio) studios map[string]*models.StudioRecord // key: client_ip maxCap int staleAfter time.Duration } // NewConnectionStore creates a store backed by a JSON file. func NewConnectionStore(dataDir string) *ConnectionStore { return &ConnectionStore{ path: filepath.Join(dataDir, "connections.json"), sessions: make(map[string]*models.ActiveSession), studios: make(map[string]*models.StudioRecord), maxCap: defaultMaxCap, staleAfter: sessionExpiry, } } // Load reads connections from disk. func (s *ConnectionStore) Load() error { s.mu.Lock() defer s.mu.Unlock() data, err := os.ReadFile(s.path) if err != nil { if os.IsNotExist(err) { return nil } return err } var store struct { Sessions map[string]*models.ActiveSession `json:"sessions"` Studios map[string]*models.StudioRecord `json:"studios"` } if err := json.Unmarshal(data, &store); err != nil { return err } s.sessions = store.Sessions if s.sessions == nil { s.sessions = make(map[string]*models.ActiveSession) } s.studios = store.Studios if s.studios == nil { s.studios = make(map[string]*models.StudioRecord) } s.expireStaleLocked() return s.saveLocked() } // Connect records a new active session. func (s *ConnectionStore) Connect(clientIP, serverIP, nodeID, osName, version string) { s.mu.Lock() defer s.mu.Unlock() now := time.Now() // Update or create active session s.sessions[clientIP] = &models.ActiveSession{ ClientIP: clientIP, ServerIP: serverIP, NodeID: nodeID, OS: osName, Version: version, ConnectedAt: now, LastHeartbeat: now, } // Update or create studio record studio, exists := s.studios[clientIP] if !exists { studio = &models.StudioRecord{ ClientIP: clientIP, HomeServerIP: serverIP, HomeNodeID: nodeID, HomeAssignedAt: now, LastSeen: now, } s.studios[clientIP] = studio } studio.LastSeen = now studio.TotalClients++ // If studio has no home yet, assign one if studio.HomeServerIP == "" { studio.HomeServerIP = serverIP studio.HomeNodeID = nodeID studio.HomeAssignedAt = now } s.expireStaleLocked() _ = s.saveLocked() } // Disconnect marks a session as inactive. func (s *ConnectionStore) Disconnect(clientIP string) { s.mu.Lock() defer s.mu.Unlock() delete(s.sessions, clientIP) _ = s.saveLocked() } // Heartbeat updates the last-seen time for a session. func (s *ConnectionStore) Heartbeat(clientIP string) { s.mu.Lock() defer s.mu.Unlock() if sess, ok := s.sessions[clientIP]; ok { sess.LastHeartbeat = time.Now() } if studio, ok := s.studios[clientIP]; ok { studio.LastSeen = time.Now() } } // GetRecommendation returns the recommended server for a client IP. // Pure load-based: always picks the least loaded available + healthy server. // No sticky home — auto-balancing on every request. func (s *ConnectionStore) GetRecommendation(clientIP string, availableIPs []string, healthyIPs map[string]bool) models.RecommendationResponse { s.mu.RLock() defer s.mu.RUnlock() resp := models.RecommendationResponse{} // Count active connections per server load := s.activeLoadLocked(availableIPs) // Always pick least loaded — no sticky bestIP := s.findLeastLoadedLocked(availableIPs, load, healthyIPs) if bestIP == "" { resp.Reason = "нет доступных серверов" return resp } resp.RecommendedServerIP = bestIP resp.LoadInfo = s.formatLoadInfo(load) // Count how many clients on this IP resp.StudioClients = load[bestIP] // Check if this is the same as the studio's previous choice studio, hasStudio := s.studios[clientIP] if hasStudio && studio.HomeServerIP == bestIP { resp.Reason = "рекомендуемый сервер" } else { resp.Reason = "наименее загружен" resp.IsRebalance = hasStudio && studio.HomeServerIP != "" } return resp } // GetLoadInfo returns load information for all available servers. func (s *ConnectionStore) GetLoadInfo(availableIPs []string) []models.ServerLoadInfo { s.mu.RLock() defer s.mu.RUnlock() load := s.activeLoadLocked(availableIPs) var infos []models.ServerLoadInfo for _, ip := range availableIPs { clients := load[ip] pct := 0 if s.maxCap > 0 { pct = (clients * 100) / s.maxCap } infos = append(infos, models.ServerLoadInfo{ ServerIP: ip, ActiveClients: clients, LoadPercent: pct, MaxCapacity: s.maxCap, }) } return infos } // activeLoadLocked counts active sessions per server IP. Must be called with lock held. func (s *ConnectionStore) activeLoadLocked(availableIPs []string) map[string]int { load := make(map[string]int) for _, ip := range availableIPs { load[ip] = 0 } now := time.Now() for _, sess := range s.sessions { if now.Sub(sess.LastHeartbeat) < s.staleAfter { load[sess.ServerIP]++ } } return load } // findLeastLoadedLocked finds the least loaded available + healthy server. func (s *ConnectionStore) findLeastLoadedLocked(availableIPs []string, load map[string]int, healthyIPs map[string]bool) string { type ipLoad struct { ip string count int } var candidates []ipLoad for _, ip := range availableIPs { if len(healthyIPs) > 0 && !healthyIPs[ip] { continue } candidates = append(candidates, ipLoad{ip, load[ip]}) } if len(candidates) == 0 { return "" } sort.Slice(candidates, func(i, j int) bool { return candidates[i].count < candidates[j].count }) return candidates[0].ip } // expireStaleLocked removes stale sessions and old studio records. func (s *ConnectionStore) expireStaleLocked() { now := time.Now() // Expire stale sessions for key, sess := range s.sessions { if now.Sub(sess.LastHeartbeat) > s.staleAfter { delete(s.sessions, key) } } // Expire old studio records (kept for reference) for key, studio := range s.studios { if now.Sub(studio.LastSeen) > studioExpiry { delete(s.studios, key) } } } // saveLocked writes state to disk. func (s *ConnectionStore) saveLocked() error { if err := os.MkdirAll(filepath.Dir(s.path), 0o755); err != nil { return err } store := struct { Sessions map[string]*models.ActiveSession `json:"sessions"` Studios map[string]*models.StudioRecord `json:"studios"` }{ Sessions: s.sessions, Studios: s.studios, } data, err := json.MarshalIndent(store, "", " ") if err != nil { return err } tmpPath := s.path + ".tmp" if err := os.WriteFile(tmpPath, data, 0o644); err != nil { return err } return os.Rename(tmpPath, s.path) } func (s *ConnectionStore) formatLoadInfo(load map[string]int) string { var parts []string // Sort for consistent output var ips []string for ip := range load { ips = append(ips, ip) } sort.Strings(ips) for _, ip := range ips { parts = append(parts, ip+"="+itoaStr(load[ip])) } return "нагрузка: " + joinStr(parts, ", ") } // SetMaxCapacity sets the max clients per server for load calculation. func (s *ConnectionStore) SetMaxCapacity(n int) { s.mu.Lock() defer s.mu.Unlock() if n > 0 { s.maxCap = n } } // itoaStr converts int to string without fmt. func itoaStr(n int) string { if n == 0 { return "0" } var digits []byte for n > 0 { digits = append([]byte{byte('0' + n%10)}, digits...) n /= 10 } return string(digits) } // joinStr joins strings with separator without strings import. func joinStr(parts []string, sep string) string { if len(parts) == 0 { return "" } result := parts[0] for _, p := range parts[1:] { result += sep + p } return result }