summaryrefslogtreecommitdiff
path: root/internal/rules/connections.go
diff options
context:
space:
mode:
authorsergei <sergei@em-sysadmin.xyz>2026-04-14 06:23:55 +0400
committersergei <sergei@em-sysadmin.xyz>2026-04-14 06:23:55 +0400
commit3d51aa455006903345f554a2dd90034993796114 (patch)
tree62a7be2faf047f5eb7886feebc3b815556f03d7f /internal/rules/connections.go
downloadvpnem-3d51aa455006903345f554a2dd90034993796114.tar.gz
vpnem-3d51aa455006903345f554a2dd90034993796114.tar.bz2
vpnem-3d51aa455006903345f554a2dd90034993796114.zip
vpnem: VPN infrastructure with load-balanced multi-protocol nodesHEADmain
- Multi-protocol VPS nodes (VLESS-REALITY + Hysteria2 + SOCKS5) - Smart load balancing via recommendation API - Windows/Linux client (Go + Wails + sing-box) - Server API with RealIP detection and connection tracking - Auto-deployment via vpnui control plane - Silent Windows installer with UAC elevation - Load-based server recommendation (no sticky sessions) - Best Server one-click connection workflow
Diffstat (limited to 'internal/rules/connections.go')
-rw-r--r--internal/rules/connections.go336
1 files changed, 336 insertions, 0 deletions
diff --git a/internal/rules/connections.go b/internal/rules/connections.go
new file mode 100644
index 0000000..705bbf5
--- /dev/null
+++ b/internal/rules/connections.go
@@ -0,0 +1,336 @@
+package rules
+
+import (
+ "encoding/json"
+ "os"
+ "path/filepath"
+ "sort"
+ "sync"
+ "time"
+
+ "vpnem/internal/models"
+)
+
+const (
+ sessionExpiry = 1 * time.Hour // session considered stale after 1h
+ studioExpiry = 7 * 24 * time.Hour // studio record kept for 7 days
+ defaultMaxCap = 50 // default max clients per server
+)
+
+// ConnectionStore manages active sessions and studio assignments.
+type ConnectionStore struct {
+ mu sync.RWMutex
+ path string
+ sessions map[string]*models.ActiveSession // key: client_ip (one active session per studio)
+ studios map[string]*models.StudioRecord // key: client_ip
+ maxCap int
+ staleAfter time.Duration
+}
+
+// NewConnectionStore creates a store backed by a JSON file.
+func NewConnectionStore(dataDir string) *ConnectionStore {
+ return &ConnectionStore{
+ path: filepath.Join(dataDir, "connections.json"),
+ sessions: make(map[string]*models.ActiveSession),
+ studios: make(map[string]*models.StudioRecord),
+ maxCap: defaultMaxCap,
+ staleAfter: sessionExpiry,
+ }
+}
+
+// Load reads connections from disk.
+func (s *ConnectionStore) Load() error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ data, err := os.ReadFile(s.path)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return nil
+ }
+ return err
+ }
+
+ var store struct {
+ Sessions map[string]*models.ActiveSession `json:"sessions"`
+ Studios map[string]*models.StudioRecord `json:"studios"`
+ }
+ if err := json.Unmarshal(data, &store); err != nil {
+ return err
+ }
+
+ s.sessions = store.Sessions
+ if s.sessions == nil {
+ s.sessions = make(map[string]*models.ActiveSession)
+ }
+ s.studios = store.Studios
+ if s.studios == nil {
+ s.studios = make(map[string]*models.StudioRecord)
+ }
+
+ s.expireStaleLocked()
+ return s.saveLocked()
+}
+
+// Connect records a new active session.
+func (s *ConnectionStore) Connect(clientIP, serverIP, nodeID, osName, version string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ now := time.Now()
+
+ // Update or create active session
+ s.sessions[clientIP] = &models.ActiveSession{
+ ClientIP: clientIP,
+ ServerIP: serverIP,
+ NodeID: nodeID,
+ OS: osName,
+ Version: version,
+ ConnectedAt: now,
+ LastHeartbeat: now,
+ }
+
+ // Update or create studio record
+ studio, exists := s.studios[clientIP]
+ if !exists {
+ studio = &models.StudioRecord{
+ ClientIP: clientIP,
+ HomeServerIP: serverIP,
+ HomeNodeID: nodeID,
+ HomeAssignedAt: now,
+ LastSeen: now,
+ }
+ s.studios[clientIP] = studio
+ }
+ studio.LastSeen = now
+ studio.TotalClients++
+
+ // If studio has no home yet, assign one
+ if studio.HomeServerIP == "" {
+ studio.HomeServerIP = serverIP
+ studio.HomeNodeID = nodeID
+ studio.HomeAssignedAt = now
+ }
+
+ s.expireStaleLocked()
+ _ = s.saveLocked()
+}
+
+// Disconnect marks a session as inactive.
+func (s *ConnectionStore) Disconnect(clientIP string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ delete(s.sessions, clientIP)
+ _ = s.saveLocked()
+}
+
+// Heartbeat updates the last-seen time for a session.
+func (s *ConnectionStore) Heartbeat(clientIP string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if sess, ok := s.sessions[clientIP]; ok {
+ sess.LastHeartbeat = time.Now()
+ }
+ if studio, ok := s.studios[clientIP]; ok {
+ studio.LastSeen = time.Now()
+ }
+}
+
+// GetRecommendation returns the recommended server for a client IP.
+// Pure load-based: always picks the least loaded available + healthy server.
+// No sticky home — auto-balancing on every request.
+func (s *ConnectionStore) GetRecommendation(clientIP string, availableIPs []string, healthyIPs map[string]bool) models.RecommendationResponse {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ resp := models.RecommendationResponse{}
+
+ // Count active connections per server
+ load := s.activeLoadLocked(availableIPs)
+
+ // Always pick least loaded — no sticky
+ bestIP := s.findLeastLoadedLocked(availableIPs, load, healthyIPs)
+ if bestIP == "" {
+ resp.Reason = "нет доступных серверов"
+ return resp
+ }
+
+ resp.RecommendedServerIP = bestIP
+ resp.LoadInfo = s.formatLoadInfo(load)
+
+ // Count how many clients on this IP
+ resp.StudioClients = load[bestIP]
+
+ // Check if this is the same as the studio's previous choice
+ studio, hasStudio := s.studios[clientIP]
+ if hasStudio && studio.HomeServerIP == bestIP {
+ resp.Reason = "рекомендуемый сервер"
+ } else {
+ resp.Reason = "наименее загружен"
+ resp.IsRebalance = hasStudio && studio.HomeServerIP != ""
+ }
+
+ return resp
+}
+
+// GetLoadInfo returns load information for all available servers.
+func (s *ConnectionStore) GetLoadInfo(availableIPs []string) []models.ServerLoadInfo {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ load := s.activeLoadLocked(availableIPs)
+ var infos []models.ServerLoadInfo
+
+ for _, ip := range availableIPs {
+ clients := load[ip]
+ pct := 0
+ if s.maxCap > 0 {
+ pct = (clients * 100) / s.maxCap
+ }
+ infos = append(infos, models.ServerLoadInfo{
+ ServerIP: ip,
+ ActiveClients: clients,
+ LoadPercent: pct,
+ MaxCapacity: s.maxCap,
+ })
+ }
+
+ return infos
+}
+
+// activeLoadLocked counts active sessions per server IP. Must be called with lock held.
+func (s *ConnectionStore) activeLoadLocked(availableIPs []string) map[string]int {
+ load := make(map[string]int)
+ for _, ip := range availableIPs {
+ load[ip] = 0
+ }
+ now := time.Now()
+ for _, sess := range s.sessions {
+ if now.Sub(sess.LastHeartbeat) < s.staleAfter {
+ load[sess.ServerIP]++
+ }
+ }
+ return load
+}
+
+// findLeastLoadedLocked finds the least loaded available + healthy server.
+func (s *ConnectionStore) findLeastLoadedLocked(availableIPs []string, load map[string]int, healthyIPs map[string]bool) string {
+ type ipLoad struct {
+ ip string
+ count int
+ }
+ var candidates []ipLoad
+
+ for _, ip := range availableIPs {
+ if len(healthyIPs) > 0 && !healthyIPs[ip] {
+ continue
+ }
+ candidates = append(candidates, ipLoad{ip, load[ip]})
+ }
+
+ if len(candidates) == 0 {
+ return ""
+ }
+
+ sort.Slice(candidates, func(i, j int) bool {
+ return candidates[i].count < candidates[j].count
+ })
+
+ return candidates[0].ip
+}
+
+// expireStaleLocked removes stale sessions and old studio records.
+func (s *ConnectionStore) expireStaleLocked() {
+ now := time.Now()
+
+ // Expire stale sessions
+ for key, sess := range s.sessions {
+ if now.Sub(sess.LastHeartbeat) > s.staleAfter {
+ delete(s.sessions, key)
+ }
+ }
+
+ // Expire old studio records (kept for reference)
+ for key, studio := range s.studios {
+ if now.Sub(studio.LastSeen) > studioExpiry {
+ delete(s.studios, key)
+ }
+ }
+}
+
+// saveLocked writes state to disk.
+func (s *ConnectionStore) saveLocked() error {
+ if err := os.MkdirAll(filepath.Dir(s.path), 0o755); err != nil {
+ return err
+ }
+
+ store := struct {
+ Sessions map[string]*models.ActiveSession `json:"sessions"`
+ Studios map[string]*models.StudioRecord `json:"studios"`
+ }{
+ Sessions: s.sessions,
+ Studios: s.studios,
+ }
+
+ data, err := json.MarshalIndent(store, "", " ")
+ if err != nil {
+ return err
+ }
+
+ tmpPath := s.path + ".tmp"
+ if err := os.WriteFile(tmpPath, data, 0o644); err != nil {
+ return err
+ }
+ return os.Rename(tmpPath, s.path)
+}
+
+func (s *ConnectionStore) formatLoadInfo(load map[string]int) string {
+ var parts []string
+ // Sort for consistent output
+ var ips []string
+ for ip := range load {
+ ips = append(ips, ip)
+ }
+ sort.Strings(ips)
+
+ for _, ip := range ips {
+ parts = append(parts, ip+"="+itoaStr(load[ip]))
+ }
+ return "нагрузка: " + joinStr(parts, ", ")
+}
+
+// SetMaxCapacity sets the max clients per server for load calculation.
+func (s *ConnectionStore) SetMaxCapacity(n int) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if n > 0 {
+ s.maxCap = n
+ }
+}
+
+// itoaStr converts int to string without fmt.
+func itoaStr(n int) string {
+ if n == 0 {
+ return "0"
+ }
+ var digits []byte
+ for n > 0 {
+ digits = append([]byte{byte('0' + n%10)}, digits...)
+ n /= 10
+ }
+ return string(digits)
+}
+
+// joinStr joins strings with separator without strings import.
+func joinStr(parts []string, sep string) string {
+ if len(parts) == 0 {
+ return ""
+ }
+ result := parts[0]
+ for _, p := range parts[1:] {
+ result += sep + p
+ }
+ return result
+}