From 3d51aa455006903345f554a2dd90034993796114 Mon Sep 17 00:00:00 2001 From: sergei Date: Tue, 14 Apr 2026 06:23:55 +0400 Subject: vpnem: VPN infrastructure with load-balanced multi-protocol nodes - Multi-protocol VPS nodes (VLESS-REALITY + Hysteria2 + SOCKS5) - Smart load balancing via recommendation API - Windows/Linux client (Go + Wails + sing-box) - Server API with RealIP detection and connection tracking - Auto-deployment via vpnui control plane - Silent Windows installer with UAC elevation - Load-based server recommendation (no sticky sessions) - Best Server one-click connection workflow --- internal/engine/engine.go | 138 +++++++++++++++++++++++++++++++++ internal/engine/healthcheck.go | 63 ++++++++++++++++ internal/engine/healthcheck_test.go | 38 ++++++++++ internal/engine/httpclient.go | 45 +++++++++++ internal/engine/logger.go | 62 +++++++++++++++ internal/engine/proxy_port.go | 37 +++++++++ internal/engine/watchdog.go | 147 ++++++++++++++++++++++++++++++++++++ 7 files changed, 530 insertions(+) create mode 100644 internal/engine/engine.go create mode 100644 internal/engine/healthcheck.go create mode 100644 internal/engine/healthcheck_test.go create mode 100644 internal/engine/httpclient.go create mode 100644 internal/engine/logger.go create mode 100644 internal/engine/proxy_port.go create mode 100644 internal/engine/watchdog.go (limited to 'internal/engine') diff --git a/internal/engine/engine.go b/internal/engine/engine.go new file mode 100644 index 0000000..fa145ee --- /dev/null +++ b/internal/engine/engine.go @@ -0,0 +1,138 @@ +package engine + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "path/filepath" + "sync" + + box "github.com/sagernet/sing-box" + "github.com/sagernet/sing-box/include" + "github.com/sagernet/sing-box/option" + + "vpnem/internal/config" + "vpnem/internal/models" +) + +type Engine struct { + mu sync.Mutex + instance *box.Box + cancel context.CancelFunc + running bool + configPath string + dataDir string + localProxyPort int +} + +func New(dataDir string) *Engine { + return &Engine{ + dataDir: dataDir, + configPath: filepath.Join(dataDir, "config.json"), + } +} + +func (e *Engine) Start(server models.Server, mode config.Mode, ruleSets []models.RuleSet, serverIPs []string) error { + return e.StartFull(server, mode, ruleSets, serverIPs, nil, config.LocalProxyPort, nil) +} + +func (e *Engine) StartFull(server models.Server, mode config.Mode, ruleSets []models.RuleSet, serverIPs []string, customBypass []string, localProxyPort int, policy *models.RoutingPolicy) error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.running { + return fmt.Errorf("already running") + } + + cfg := config.BuildConfigFullWithLocalProxy(server, mode, ruleSets, serverIPs, customBypass, localProxyPort, policy) + data, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return fmt.Errorf("marshal config: %w", err) + } + + os.MkdirAll(e.dataDir, 0o755) + _ = os.WriteFile(e.configPath, data, 0o644) + log.Printf("engine: config saved (%d bytes)", len(data)) + + var opts option.Options + ctx := include.Context(context.Background()) + if err := opts.UnmarshalJSONContext(ctx, data); err != nil { + log.Printf("engine: parse FAILED: %v", err) + return fmt.Errorf("parse config: %w", err) + } + + boxCtx, cancel := context.WithCancel(ctx) + e.cancel = cancel + + instance, err := box.New(box.Options{ + Context: boxCtx, + Options: opts, + }) + if err != nil { + cancel() + log.Printf("engine: create FAILED: %v", err) + return fmt.Errorf("create sing-box: %w", err) + } + + if err := instance.Start(); err != nil { + instance.Close() + cancel() + log.Printf("engine: start FAILED: %v", err) + return fmt.Errorf("start sing-box: %w", err) + } + + e.instance = instance + e.running = true + e.localProxyPort = localProxyPort + log.Println("engine: started ok") + return nil +} + +func (e *Engine) Stop() error { + e.mu.Lock() + defer e.mu.Unlock() + + if !e.running { + return nil + } + + if e.instance != nil { + e.instance.Close() + e.instance = nil + } + if e.cancel != nil { + e.cancel() + } + e.running = false + e.localProxyPort = 0 + log.Println("engine: stopped") + return nil +} + +func (e *Engine) Restart(server models.Server, mode config.Mode, ruleSets []models.RuleSet, serverIPs []string) error { + e.Stop() + return e.Start(server, mode, ruleSets, serverIPs) +} + +func (e *Engine) RestartFull(server models.Server, mode config.Mode, ruleSets []models.RuleSet, serverIPs []string, customBypass []string, localProxyPort int, policy *models.RoutingPolicy) error { + e.Stop() + return e.StartFull(server, mode, ruleSets, serverIPs, customBypass, localProxyPort, policy) +} + +func (e *Engine) IsRunning() bool { + e.mu.Lock() + defer e.mu.Unlock() + return e.running +} + +func (e *Engine) ConfigPath() string { + return e.configPath +} + +func (e *Engine) LocalProxyPort() int { + e.mu.Lock() + defer e.mu.Unlock() + return e.localProxyPort +} diff --git a/internal/engine/healthcheck.go b/internal/engine/healthcheck.go new file mode 100644 index 0000000..a856608 --- /dev/null +++ b/internal/engine/healthcheck.go @@ -0,0 +1,63 @@ +package engine + +import ( + "io" + "net/http" + "strings" + "time" + + "vpnem/internal/config" +) + +const DefaultBlockedSiteProbeURL = "https://rutracker.org" + +func ModeRequiresExitIP(mode config.Mode) bool { + return mode.Final == "proxy" +} + +func CheckExitIP(localProxyPort int) string { + client, err := HTTPClientViaSOCKS5(config.LocalProxyHost, localProxyPort, 5*time.Second) + if err != nil { + return "" + } + resp, err := client.Get("http://ifconfig.me/ip") + if err != nil { + return "" + } + defer resp.Body.Close() + + body, err := io.ReadAll(io.LimitReader(resp.Body, 64)) + if err != nil { + return "" + } + return strings.TrimSpace(string(body)) +} + +func ProbeBlockedSite(localProxyPort int, rawURL string, timeout time.Duration) (int, error) { + client, err := HTTPClientViaSOCKS5(config.LocalProxyHost, localProxyPort, timeout) + if err != nil { + return 0, err + } + + req, err := http.NewRequest(http.MethodGet, rawURL, nil) + if err != nil { + return 0, err + } + req.Header.Set("User-Agent", "vpnem-health/1.0") + + resp, err := client.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 256)) + return resp.StatusCode, nil +} + +func DeepCheckRequiresRestart(mode config.Mode, exitIP string, probeErr error) bool { + if ModeRequiresExitIP(mode) { + return exitIP == "" + } + return probeErr != nil +} diff --git a/internal/engine/healthcheck_test.go b/internal/engine/healthcheck_test.go new file mode 100644 index 0000000..1d55ed0 --- /dev/null +++ b/internal/engine/healthcheck_test.go @@ -0,0 +1,38 @@ +package engine + +import ( + "errors" + "testing" + + "vpnem/internal/config" +) + +func TestModeRequiresExitIP(t *testing.T) { + proxyMode := config.Mode{Name: "Full", Final: "proxy"} + directMode := config.Mode{Name: "Combo", Final: "direct"} + + if !ModeRequiresExitIP(proxyMode) { + t.Fatal("expected proxy-final mode to require exit IP") + } + if ModeRequiresExitIP(directMode) { + t.Fatal("did not expect direct-final mode to require exit IP") + } +} + +func TestDeepCheckRequiresRestart(t *testing.T) { + proxyMode := config.Mode{Name: "Full", Final: "proxy"} + directMode := config.Mode{Name: "Combo", Final: "direct"} + + if !DeepCheckRequiresRestart(proxyMode, "", nil) { + t.Fatal("expected proxy-final mode without exit IP to restart") + } + if DeepCheckRequiresRestart(proxyMode, "89.124.96.166", errors.New("probe failed")) { + t.Fatal("did not expect proxy-final mode with exit IP to restart") + } + if !DeepCheckRequiresRestart(directMode, "", errors.New("probe failed")) { + t.Fatal("expected direct-final mode with failed blocked probe to restart") + } + if DeepCheckRequiresRestart(directMode, "", nil) { + t.Fatal("did not expect direct-final mode with successful blocked probe to restart") + } +} diff --git a/internal/engine/httpclient.go b/internal/engine/httpclient.go new file mode 100644 index 0000000..e491cf5 --- /dev/null +++ b/internal/engine/httpclient.go @@ -0,0 +1,45 @@ +package engine + +import ( + "context" + "fmt" + "net" + "net/http" + "time" + + "golang.org/x/net/proxy" +) + +func HTTPClientViaSOCKS5(host string, port int, timeout time.Duration) (*http.Client, error) { + if host == "" || port <= 0 { + return nil, fmt.Errorf("invalid local socks5 endpoint") + } + + addr := fmt.Sprintf("%s:%d", host, port) + dialer, err := proxy.SOCKS5("tcp", addr, nil, proxy.Direct) + if err != nil { + return nil, err + } + + contextDialer, ok := dialer.(proxy.ContextDialer) + if !ok { + return nil, fmt.Errorf("socks5 dialer does not implement context dialing") + } + + transport := &http.Transport{ + DialContext: func(ctx context.Context, network, address string) (net.Conn, error) { + return contextDialer.DialContext(ctx, network, address) + }, + ForceAttemptHTTP2: true, + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + TLSHandshakeTimeout: timeout, + ResponseHeaderTimeout: timeout, + ExpectContinueTimeout: time.Second, + } + + return &http.Client{ + Timeout: timeout, + Transport: transport, + }, nil +} diff --git a/internal/engine/logger.go b/internal/engine/logger.go new file mode 100644 index 0000000..c448a56 --- /dev/null +++ b/internal/engine/logger.go @@ -0,0 +1,62 @@ +package engine + +import ( + "os" + "path/filepath" + "sync" +) + +// RingLog keeps last N log lines in memory and optionally writes to file. +type RingLog struct { + mu sync.Mutex + lines []string + max int + file *os.File +} + +// NewRingLog creates a ring buffer logger. +func NewRingLog(maxLines int, dataDir string) *RingLog { + rl := &RingLog{ + lines: make([]string, 0, maxLines), + max: maxLines, + } + if dataDir != "" { + f, err := os.OpenFile(filepath.Join(dataDir, "vpnem.log"), + os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err == nil { + rl.file = f + } + } + return rl +} + +// Add appends a line. +func (rl *RingLog) Add(line string) { + rl.mu.Lock() + defer rl.mu.Unlock() + + if len(rl.lines) >= rl.max { + rl.lines = rl.lines[1:] + } + rl.lines = append(rl.lines, line) + + if rl.file != nil { + rl.file.WriteString(line + "\n") + } +} + +// Lines returns all current lines. +func (rl *RingLog) Lines() []string { + rl.mu.Lock() + defer rl.mu.Unlock() + cp := make([]string, len(rl.lines)) + copy(cp, rl.lines) + return cp +} + +// Close closes the log file. +func (rl *RingLog) Close() { + if rl.file != nil { + rl.file.Close() + } +} diff --git a/internal/engine/proxy_port.go b/internal/engine/proxy_port.go new file mode 100644 index 0000000..5e657c0 --- /dev/null +++ b/internal/engine/proxy_port.go @@ -0,0 +1,37 @@ +package engine + +import ( + "fmt" + "net" + + "vpnem/internal/config" +) + +var localProxyPortCandidates = []int{config.LocalProxyPort, 10808, 10880, 18080, 20800} + +func ResolveLocalProxyPort() (int, error) { + for _, port := range localProxyPortCandidates { + if localProxyPortAvailable(port) { + return port, nil + } + } + listener, err := net.Listen("tcp", net.JoinHostPort(config.LocalProxyHost, "0")) + if err != nil { + return 0, err + } + defer listener.Close() + addr, ok := listener.Addr().(*net.TCPAddr) + if !ok { + return 0, fmt.Errorf("unexpected listener addr type %T", listener.Addr()) + } + return addr.Port, nil +} + +func localProxyPortAvailable(port int) bool { + listener, err := net.Listen("tcp", net.JoinHostPort(config.LocalProxyHost, fmt.Sprintf("%d", port))) + if err != nil { + return false + } + _ = listener.Close() + return true +} diff --git a/internal/engine/watchdog.go b/internal/engine/watchdog.go new file mode 100644 index 0000000..a27945f --- /dev/null +++ b/internal/engine/watchdog.go @@ -0,0 +1,147 @@ +package engine + +import ( + "context" + "log" + "time" + + "vpnem/internal/config" + "vpnem/internal/models" +) + +// WatchdogConfig holds watchdog parameters. +type WatchdogConfig struct { + CheckInterval time.Duration // how often to check sing-box is alive (default 2s) + DeepCheckInterval time.Duration // how often to verify exit IP (default 30s) + ReconnectCooldown time.Duration // min time between reconnect attempts (default 5s) +} + +// DefaultWatchdogConfig returns the default watchdog settings (from vpn.py). +func DefaultWatchdogConfig() WatchdogConfig { + return WatchdogConfig{ + CheckInterval: 2 * time.Second, + DeepCheckInterval: 30 * time.Second, + ReconnectCooldown: 5 * time.Second, + } +} + +// Watchdog monitors sing-box and auto-reconnects on failure. +type Watchdog struct { + engine *Engine + cfg WatchdogConfig + cancel context.CancelFunc + running bool + + // Reconnect parameters (set via StartWatching) + server models.Server + mode config.Mode + ruleSets []models.RuleSet + serverIPs []string + customBypass []string + localProxyPort int + policy *models.RoutingPolicy +} + +// NewWatchdog creates a new watchdog for the given engine. +func NewWatchdog(engine *Engine, cfg WatchdogConfig) *Watchdog { + return &Watchdog{ + engine: engine, + cfg: cfg, + } +} + +// StartWatching begins monitoring. It stores the connection params for reconnection. +func (w *Watchdog) StartWatching(server models.Server, mode config.Mode, ruleSets []models.RuleSet, serverIPs []string, customBypass []string, localProxyPort int, policy *models.RoutingPolicy) { + w.StopWatching() + + w.server = server + w.mode = mode + w.ruleSets = ruleSets + w.serverIPs = serverIPs + w.customBypass = append([]string{}, customBypass...) + w.localProxyPort = localProxyPort + w.policy = policy + + ctx, cancel := context.WithCancel(context.Background()) + w.cancel = cancel + w.running = true + + go w.loop(ctx) +} + +// StopWatching stops the watchdog. +func (w *Watchdog) StopWatching() { + if w.cancel != nil { + w.cancel() + } + w.running = false +} + +// IsWatching returns whether the watchdog is active. +func (w *Watchdog) IsWatching() bool { + return w.running +} + +func (w *Watchdog) loop(ctx context.Context) { + ticker := time.NewTicker(w.cfg.CheckInterval) + defer ticker.Stop() + + deepTicker := time.NewTicker(w.cfg.DeepCheckInterval) + defer deepTicker.Stop() + + lastReconnect := time.Time{} + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + if !w.engine.IsRunning() { + if time.Since(lastReconnect) < w.cfg.ReconnectCooldown { + continue + } + localProxyPort, err := ResolveLocalProxyPort() + if err != nil { + log.Printf("watchdog: local proxy port selection failed: %v", err) + continue + } + w.localProxyPort = localProxyPort + log.Println("watchdog: sing-box not running, reconnecting...") + if err := w.engine.StartFull(w.server, w.mode, w.ruleSets, w.serverIPs, w.customBypass, w.localProxyPort, w.policy); err != nil { + log.Printf("watchdog: reconnect failed: %v", err) + } else { + log.Println("watchdog: reconnected successfully") + } + lastReconnect = time.Now() + } + + case <-deepTicker.C: + if !w.engine.IsRunning() { + continue + } + exitIP := CheckExitIP(w.localProxyPort) + _, probeErr := ProbeBlockedSite(w.localProxyPort, DefaultBlockedSiteProbeURL, 8*time.Second) + if DeepCheckRequiresRestart(w.mode, exitIP, probeErr) { + if ModeRequiresExitIP(w.mode) { + log.Println("watchdog: deep check failed (no exit IP), restarting...") + } else { + log.Printf("watchdog: deep check failed for direct-final mode (%v), restarting...", probeErr) + } + if time.Since(lastReconnect) < w.cfg.ReconnectCooldown { + continue + } + localProxyPort, err := ResolveLocalProxyPort() + if err != nil { + log.Printf("watchdog: local proxy port selection failed: %v", err) + continue + } + w.localProxyPort = localProxyPort + if err := w.engine.RestartFull(w.server, w.mode, w.ruleSets, w.serverIPs, w.customBypass, w.localProxyPort, w.policy); err != nil { + log.Printf("watchdog: restart failed: %v", err) + } + lastReconnect = time.Now() + } + } + } +} -- cgit v1.2.3