diff options
| author | sergei <sergei@em-sysadmin.xyz> | 2026-04-14 06:23:55 +0400 |
|---|---|---|
| committer | sergei <sergei@em-sysadmin.xyz> | 2026-04-14 06:23:55 +0400 |
| commit | 3d51aa455006903345f554a2dd90034993796114 (patch) | |
| tree | 62a7be2faf047f5eb7886feebc3b815556f03d7f /internal/rules/connections.go | |
| download | vpnem-main.tar.gz vpnem-main.tar.bz2 vpnem-main.zip | |
- Multi-protocol VPS nodes (VLESS-REALITY + Hysteria2 + SOCKS5)
- Smart load balancing via recommendation API
- Windows/Linux client (Go + Wails + sing-box)
- Server API with RealIP detection and connection tracking
- Auto-deployment via vpnui control plane
- Silent Windows installer with UAC elevation
- Load-based server recommendation (no sticky sessions)
- Best Server one-click connection workflow
Diffstat (limited to 'internal/rules/connections.go')
| -rw-r--r-- | internal/rules/connections.go | 336 |
1 files changed, 336 insertions, 0 deletions
diff --git a/internal/rules/connections.go b/internal/rules/connections.go new file mode 100644 index 0000000..705bbf5 --- /dev/null +++ b/internal/rules/connections.go @@ -0,0 +1,336 @@ +package rules + +import ( + "encoding/json" + "os" + "path/filepath" + "sort" + "sync" + "time" + + "vpnem/internal/models" +) + +const ( + sessionExpiry = 1 * time.Hour // session considered stale after 1h + studioExpiry = 7 * 24 * time.Hour // studio record kept for 7 days + defaultMaxCap = 50 // default max clients per server +) + +// ConnectionStore manages active sessions and studio assignments. +type ConnectionStore struct { + mu sync.RWMutex + path string + sessions map[string]*models.ActiveSession // key: client_ip (one active session per studio) + studios map[string]*models.StudioRecord // key: client_ip + maxCap int + staleAfter time.Duration +} + +// NewConnectionStore creates a store backed by a JSON file. +func NewConnectionStore(dataDir string) *ConnectionStore { + return &ConnectionStore{ + path: filepath.Join(dataDir, "connections.json"), + sessions: make(map[string]*models.ActiveSession), + studios: make(map[string]*models.StudioRecord), + maxCap: defaultMaxCap, + staleAfter: sessionExpiry, + } +} + +// Load reads connections from disk. +func (s *ConnectionStore) Load() error { + s.mu.Lock() + defer s.mu.Unlock() + + data, err := os.ReadFile(s.path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + var store struct { + Sessions map[string]*models.ActiveSession `json:"sessions"` + Studios map[string]*models.StudioRecord `json:"studios"` + } + if err := json.Unmarshal(data, &store); err != nil { + return err + } + + s.sessions = store.Sessions + if s.sessions == nil { + s.sessions = make(map[string]*models.ActiveSession) + } + s.studios = store.Studios + if s.studios == nil { + s.studios = make(map[string]*models.StudioRecord) + } + + s.expireStaleLocked() + return s.saveLocked() +} + +// Connect records a new active session. +func (s *ConnectionStore) Connect(clientIP, serverIP, nodeID, osName, version string) { + s.mu.Lock() + defer s.mu.Unlock() + + now := time.Now() + + // Update or create active session + s.sessions[clientIP] = &models.ActiveSession{ + ClientIP: clientIP, + ServerIP: serverIP, + NodeID: nodeID, + OS: osName, + Version: version, + ConnectedAt: now, + LastHeartbeat: now, + } + + // Update or create studio record + studio, exists := s.studios[clientIP] + if !exists { + studio = &models.StudioRecord{ + ClientIP: clientIP, + HomeServerIP: serverIP, + HomeNodeID: nodeID, + HomeAssignedAt: now, + LastSeen: now, + } + s.studios[clientIP] = studio + } + studio.LastSeen = now + studio.TotalClients++ + + // If studio has no home yet, assign one + if studio.HomeServerIP == "" { + studio.HomeServerIP = serverIP + studio.HomeNodeID = nodeID + studio.HomeAssignedAt = now + } + + s.expireStaleLocked() + _ = s.saveLocked() +} + +// Disconnect marks a session as inactive. +func (s *ConnectionStore) Disconnect(clientIP string) { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.sessions, clientIP) + _ = s.saveLocked() +} + +// Heartbeat updates the last-seen time for a session. +func (s *ConnectionStore) Heartbeat(clientIP string) { + s.mu.Lock() + defer s.mu.Unlock() + + if sess, ok := s.sessions[clientIP]; ok { + sess.LastHeartbeat = time.Now() + } + if studio, ok := s.studios[clientIP]; ok { + studio.LastSeen = time.Now() + } +} + +// GetRecommendation returns the recommended server for a client IP. +// Pure load-based: always picks the least loaded available + healthy server. +// No sticky home — auto-balancing on every request. +func (s *ConnectionStore) GetRecommendation(clientIP string, availableIPs []string, healthyIPs map[string]bool) models.RecommendationResponse { + s.mu.RLock() + defer s.mu.RUnlock() + + resp := models.RecommendationResponse{} + + // Count active connections per server + load := s.activeLoadLocked(availableIPs) + + // Always pick least loaded — no sticky + bestIP := s.findLeastLoadedLocked(availableIPs, load, healthyIPs) + if bestIP == "" { + resp.Reason = "нет доступных серверов" + return resp + } + + resp.RecommendedServerIP = bestIP + resp.LoadInfo = s.formatLoadInfo(load) + + // Count how many clients on this IP + resp.StudioClients = load[bestIP] + + // Check if this is the same as the studio's previous choice + studio, hasStudio := s.studios[clientIP] + if hasStudio && studio.HomeServerIP == bestIP { + resp.Reason = "рекомендуемый сервер" + } else { + resp.Reason = "наименее загружен" + resp.IsRebalance = hasStudio && studio.HomeServerIP != "" + } + + return resp +} + +// GetLoadInfo returns load information for all available servers. +func (s *ConnectionStore) GetLoadInfo(availableIPs []string) []models.ServerLoadInfo { + s.mu.RLock() + defer s.mu.RUnlock() + + load := s.activeLoadLocked(availableIPs) + var infos []models.ServerLoadInfo + + for _, ip := range availableIPs { + clients := load[ip] + pct := 0 + if s.maxCap > 0 { + pct = (clients * 100) / s.maxCap + } + infos = append(infos, models.ServerLoadInfo{ + ServerIP: ip, + ActiveClients: clients, + LoadPercent: pct, + MaxCapacity: s.maxCap, + }) + } + + return infos +} + +// activeLoadLocked counts active sessions per server IP. Must be called with lock held. +func (s *ConnectionStore) activeLoadLocked(availableIPs []string) map[string]int { + load := make(map[string]int) + for _, ip := range availableIPs { + load[ip] = 0 + } + now := time.Now() + for _, sess := range s.sessions { + if now.Sub(sess.LastHeartbeat) < s.staleAfter { + load[sess.ServerIP]++ + } + } + return load +} + +// findLeastLoadedLocked finds the least loaded available + healthy server. +func (s *ConnectionStore) findLeastLoadedLocked(availableIPs []string, load map[string]int, healthyIPs map[string]bool) string { + type ipLoad struct { + ip string + count int + } + var candidates []ipLoad + + for _, ip := range availableIPs { + if len(healthyIPs) > 0 && !healthyIPs[ip] { + continue + } + candidates = append(candidates, ipLoad{ip, load[ip]}) + } + + if len(candidates) == 0 { + return "" + } + + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].count < candidates[j].count + }) + + return candidates[0].ip +} + +// expireStaleLocked removes stale sessions and old studio records. +func (s *ConnectionStore) expireStaleLocked() { + now := time.Now() + + // Expire stale sessions + for key, sess := range s.sessions { + if now.Sub(sess.LastHeartbeat) > s.staleAfter { + delete(s.sessions, key) + } + } + + // Expire old studio records (kept for reference) + for key, studio := range s.studios { + if now.Sub(studio.LastSeen) > studioExpiry { + delete(s.studios, key) + } + } +} + +// saveLocked writes state to disk. +func (s *ConnectionStore) saveLocked() error { + if err := os.MkdirAll(filepath.Dir(s.path), 0o755); err != nil { + return err + } + + store := struct { + Sessions map[string]*models.ActiveSession `json:"sessions"` + Studios map[string]*models.StudioRecord `json:"studios"` + }{ + Sessions: s.sessions, + Studios: s.studios, + } + + data, err := json.MarshalIndent(store, "", " ") + if err != nil { + return err + } + + tmpPath := s.path + ".tmp" + if err := os.WriteFile(tmpPath, data, 0o644); err != nil { + return err + } + return os.Rename(tmpPath, s.path) +} + +func (s *ConnectionStore) formatLoadInfo(load map[string]int) string { + var parts []string + // Sort for consistent output + var ips []string + for ip := range load { + ips = append(ips, ip) + } + sort.Strings(ips) + + for _, ip := range ips { + parts = append(parts, ip+"="+itoaStr(load[ip])) + } + return "нагрузка: " + joinStr(parts, ", ") +} + +// SetMaxCapacity sets the max clients per server for load calculation. +func (s *ConnectionStore) SetMaxCapacity(n int) { + s.mu.Lock() + defer s.mu.Unlock() + if n > 0 { + s.maxCap = n + } +} + +// itoaStr converts int to string without fmt. +func itoaStr(n int) string { + if n == 0 { + return "0" + } + var digits []byte + for n > 0 { + digits = append([]byte{byte('0' + n%10)}, digits...) + n /= 10 + } + return string(digits) +} + +// joinStr joins strings with separator without strings import. +func joinStr(parts []string, sep string) string { + if len(parts) == 0 { + return "" + } + result := parts[0] + for _, p := range parts[1:] { + result += sep + p + } + return result +} |
