summaryrefslogtreecommitdiff
path: root/internal/engine
diff options
context:
space:
mode:
authorsergei <sergei@em-sysadmin.xyz>2026-04-14 06:23:55 +0400
committersergei <sergei@em-sysadmin.xyz>2026-04-14 06:23:55 +0400
commit3d51aa455006903345f554a2dd90034993796114 (patch)
tree62a7be2faf047f5eb7886feebc3b815556f03d7f /internal/engine
downloadvpnem-main.tar.gz
vpnem-main.tar.bz2
vpnem-main.zip
vpnem: VPN infrastructure with load-balanced multi-protocol nodesHEADmain
- Multi-protocol VPS nodes (VLESS-REALITY + Hysteria2 + SOCKS5) - Smart load balancing via recommendation API - Windows/Linux client (Go + Wails + sing-box) - Server API with RealIP detection and connection tracking - Auto-deployment via vpnui control plane - Silent Windows installer with UAC elevation - Load-based server recommendation (no sticky sessions) - Best Server one-click connection workflow
Diffstat (limited to 'internal/engine')
-rw-r--r--internal/engine/engine.go138
-rw-r--r--internal/engine/healthcheck.go63
-rw-r--r--internal/engine/healthcheck_test.go38
-rw-r--r--internal/engine/httpclient.go45
-rw-r--r--internal/engine/logger.go62
-rw-r--r--internal/engine/proxy_port.go37
-rw-r--r--internal/engine/watchdog.go147
7 files changed, 530 insertions, 0 deletions
diff --git a/internal/engine/engine.go b/internal/engine/engine.go
new file mode 100644
index 0000000..fa145ee
--- /dev/null
+++ b/internal/engine/engine.go
@@ -0,0 +1,138 @@
+package engine
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log"
+ "os"
+ "path/filepath"
+ "sync"
+
+ box "github.com/sagernet/sing-box"
+ "github.com/sagernet/sing-box/include"
+ "github.com/sagernet/sing-box/option"
+
+ "vpnem/internal/config"
+ "vpnem/internal/models"
+)
+
+type Engine struct {
+ mu sync.Mutex
+ instance *box.Box
+ cancel context.CancelFunc
+ running bool
+ configPath string
+ dataDir string
+ localProxyPort int
+}
+
+func New(dataDir string) *Engine {
+ return &Engine{
+ dataDir: dataDir,
+ configPath: filepath.Join(dataDir, "config.json"),
+ }
+}
+
+func (e *Engine) Start(server models.Server, mode config.Mode, ruleSets []models.RuleSet, serverIPs []string) error {
+ return e.StartFull(server, mode, ruleSets, serverIPs, nil, config.LocalProxyPort, nil)
+}
+
+func (e *Engine) StartFull(server models.Server, mode config.Mode, ruleSets []models.RuleSet, serverIPs []string, customBypass []string, localProxyPort int, policy *models.RoutingPolicy) error {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ if e.running {
+ return fmt.Errorf("already running")
+ }
+
+ cfg := config.BuildConfigFullWithLocalProxy(server, mode, ruleSets, serverIPs, customBypass, localProxyPort, policy)
+ data, err := json.MarshalIndent(cfg, "", " ")
+ if err != nil {
+ return fmt.Errorf("marshal config: %w", err)
+ }
+
+ os.MkdirAll(e.dataDir, 0o755)
+ _ = os.WriteFile(e.configPath, data, 0o644)
+ log.Printf("engine: config saved (%d bytes)", len(data))
+
+ var opts option.Options
+ ctx := include.Context(context.Background())
+ if err := opts.UnmarshalJSONContext(ctx, data); err != nil {
+ log.Printf("engine: parse FAILED: %v", err)
+ return fmt.Errorf("parse config: %w", err)
+ }
+
+ boxCtx, cancel := context.WithCancel(ctx)
+ e.cancel = cancel
+
+ instance, err := box.New(box.Options{
+ Context: boxCtx,
+ Options: opts,
+ })
+ if err != nil {
+ cancel()
+ log.Printf("engine: create FAILED: %v", err)
+ return fmt.Errorf("create sing-box: %w", err)
+ }
+
+ if err := instance.Start(); err != nil {
+ instance.Close()
+ cancel()
+ log.Printf("engine: start FAILED: %v", err)
+ return fmt.Errorf("start sing-box: %w", err)
+ }
+
+ e.instance = instance
+ e.running = true
+ e.localProxyPort = localProxyPort
+ log.Println("engine: started ok")
+ return nil
+}
+
+func (e *Engine) Stop() error {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ if !e.running {
+ return nil
+ }
+
+ if e.instance != nil {
+ e.instance.Close()
+ e.instance = nil
+ }
+ if e.cancel != nil {
+ e.cancel()
+ }
+ e.running = false
+ e.localProxyPort = 0
+ log.Println("engine: stopped")
+ return nil
+}
+
+func (e *Engine) Restart(server models.Server, mode config.Mode, ruleSets []models.RuleSet, serverIPs []string) error {
+ e.Stop()
+ return e.Start(server, mode, ruleSets, serverIPs)
+}
+
+func (e *Engine) RestartFull(server models.Server, mode config.Mode, ruleSets []models.RuleSet, serverIPs []string, customBypass []string, localProxyPort int, policy *models.RoutingPolicy) error {
+ e.Stop()
+ return e.StartFull(server, mode, ruleSets, serverIPs, customBypass, localProxyPort, policy)
+}
+
+func (e *Engine) IsRunning() bool {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ return e.running
+}
+
+func (e *Engine) ConfigPath() string {
+ return e.configPath
+}
+
+func (e *Engine) LocalProxyPort() int {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ return e.localProxyPort
+}
diff --git a/internal/engine/healthcheck.go b/internal/engine/healthcheck.go
new file mode 100644
index 0000000..a856608
--- /dev/null
+++ b/internal/engine/healthcheck.go
@@ -0,0 +1,63 @@
+package engine
+
+import (
+ "io"
+ "net/http"
+ "strings"
+ "time"
+
+ "vpnem/internal/config"
+)
+
+const DefaultBlockedSiteProbeURL = "https://rutracker.org"
+
+func ModeRequiresExitIP(mode config.Mode) bool {
+ return mode.Final == "proxy"
+}
+
+func CheckExitIP(localProxyPort int) string {
+ client, err := HTTPClientViaSOCKS5(config.LocalProxyHost, localProxyPort, 5*time.Second)
+ if err != nil {
+ return ""
+ }
+ resp, err := client.Get("http://ifconfig.me/ip")
+ if err != nil {
+ return ""
+ }
+ defer resp.Body.Close()
+
+ body, err := io.ReadAll(io.LimitReader(resp.Body, 64))
+ if err != nil {
+ return ""
+ }
+ return strings.TrimSpace(string(body))
+}
+
+func ProbeBlockedSite(localProxyPort int, rawURL string, timeout time.Duration) (int, error) {
+ client, err := HTTPClientViaSOCKS5(config.LocalProxyHost, localProxyPort, timeout)
+ if err != nil {
+ return 0, err
+ }
+
+ req, err := http.NewRequest(http.MethodGet, rawURL, nil)
+ if err != nil {
+ return 0, err
+ }
+ req.Header.Set("User-Agent", "vpnem-health/1.0")
+
+ resp, err := client.Do(req)
+ if err != nil {
+ return 0, err
+ }
+ defer resp.Body.Close()
+
+ _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, 256))
+ return resp.StatusCode, nil
+}
+
+func DeepCheckRequiresRestart(mode config.Mode, exitIP string, probeErr error) bool {
+ if ModeRequiresExitIP(mode) {
+ return exitIP == ""
+ }
+ return probeErr != nil
+}
diff --git a/internal/engine/healthcheck_test.go b/internal/engine/healthcheck_test.go
new file mode 100644
index 0000000..1d55ed0
--- /dev/null
+++ b/internal/engine/healthcheck_test.go
@@ -0,0 +1,38 @@
+package engine
+
+import (
+ "errors"
+ "testing"
+
+ "vpnem/internal/config"
+)
+
+func TestModeRequiresExitIP(t *testing.T) {
+ proxyMode := config.Mode{Name: "Full", Final: "proxy"}
+ directMode := config.Mode{Name: "Combo", Final: "direct"}
+
+ if !ModeRequiresExitIP(proxyMode) {
+ t.Fatal("expected proxy-final mode to require exit IP")
+ }
+ if ModeRequiresExitIP(directMode) {
+ t.Fatal("did not expect direct-final mode to require exit IP")
+ }
+}
+
+func TestDeepCheckRequiresRestart(t *testing.T) {
+ proxyMode := config.Mode{Name: "Full", Final: "proxy"}
+ directMode := config.Mode{Name: "Combo", Final: "direct"}
+
+ if !DeepCheckRequiresRestart(proxyMode, "", nil) {
+ t.Fatal("expected proxy-final mode without exit IP to restart")
+ }
+ if DeepCheckRequiresRestart(proxyMode, "89.124.96.166", errors.New("probe failed")) {
+ t.Fatal("did not expect proxy-final mode with exit IP to restart")
+ }
+ if !DeepCheckRequiresRestart(directMode, "", errors.New("probe failed")) {
+ t.Fatal("expected direct-final mode with failed blocked probe to restart")
+ }
+ if DeepCheckRequiresRestart(directMode, "", nil) {
+ t.Fatal("did not expect direct-final mode with successful blocked probe to restart")
+ }
+}
diff --git a/internal/engine/httpclient.go b/internal/engine/httpclient.go
new file mode 100644
index 0000000..e491cf5
--- /dev/null
+++ b/internal/engine/httpclient.go
@@ -0,0 +1,45 @@
+package engine
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "net/http"
+ "time"
+
+ "golang.org/x/net/proxy"
+)
+
+func HTTPClientViaSOCKS5(host string, port int, timeout time.Duration) (*http.Client, error) {
+ if host == "" || port <= 0 {
+ return nil, fmt.Errorf("invalid local socks5 endpoint")
+ }
+
+ addr := fmt.Sprintf("%s:%d", host, port)
+ dialer, err := proxy.SOCKS5("tcp", addr, nil, proxy.Direct)
+ if err != nil {
+ return nil, err
+ }
+
+ contextDialer, ok := dialer.(proxy.ContextDialer)
+ if !ok {
+ return nil, fmt.Errorf("socks5 dialer does not implement context dialing")
+ }
+
+ transport := &http.Transport{
+ DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
+ return contextDialer.DialContext(ctx, network, address)
+ },
+ ForceAttemptHTTP2: true,
+ MaxIdleConns: 10,
+ IdleConnTimeout: 30 * time.Second,
+ TLSHandshakeTimeout: timeout,
+ ResponseHeaderTimeout: timeout,
+ ExpectContinueTimeout: time.Second,
+ }
+
+ return &http.Client{
+ Timeout: timeout,
+ Transport: transport,
+ }, nil
+}
diff --git a/internal/engine/logger.go b/internal/engine/logger.go
new file mode 100644
index 0000000..c448a56
--- /dev/null
+++ b/internal/engine/logger.go
@@ -0,0 +1,62 @@
+package engine
+
+import (
+ "os"
+ "path/filepath"
+ "sync"
+)
+
+// RingLog keeps last N log lines in memory and optionally writes to file.
+type RingLog struct {
+ mu sync.Mutex
+ lines []string
+ max int
+ file *os.File
+}
+
+// NewRingLog creates a ring buffer logger.
+func NewRingLog(maxLines int, dataDir string) *RingLog {
+ rl := &RingLog{
+ lines: make([]string, 0, maxLines),
+ max: maxLines,
+ }
+ if dataDir != "" {
+ f, err := os.OpenFile(filepath.Join(dataDir, "vpnem.log"),
+ os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
+ if err == nil {
+ rl.file = f
+ }
+ }
+ return rl
+}
+
+// Add appends a line.
+func (rl *RingLog) Add(line string) {
+ rl.mu.Lock()
+ defer rl.mu.Unlock()
+
+ if len(rl.lines) >= rl.max {
+ rl.lines = rl.lines[1:]
+ }
+ rl.lines = append(rl.lines, line)
+
+ if rl.file != nil {
+ rl.file.WriteString(line + "\n")
+ }
+}
+
+// Lines returns all current lines.
+func (rl *RingLog) Lines() []string {
+ rl.mu.Lock()
+ defer rl.mu.Unlock()
+ cp := make([]string, len(rl.lines))
+ copy(cp, rl.lines)
+ return cp
+}
+
+// Close closes the log file.
+func (rl *RingLog) Close() {
+ if rl.file != nil {
+ rl.file.Close()
+ }
+}
diff --git a/internal/engine/proxy_port.go b/internal/engine/proxy_port.go
new file mode 100644
index 0000000..5e657c0
--- /dev/null
+++ b/internal/engine/proxy_port.go
@@ -0,0 +1,37 @@
+package engine
+
+import (
+ "fmt"
+ "net"
+
+ "vpnem/internal/config"
+)
+
+var localProxyPortCandidates = []int{config.LocalProxyPort, 10808, 10880, 18080, 20800}
+
+func ResolveLocalProxyPort() (int, error) {
+ for _, port := range localProxyPortCandidates {
+ if localProxyPortAvailable(port) {
+ return port, nil
+ }
+ }
+ listener, err := net.Listen("tcp", net.JoinHostPort(config.LocalProxyHost, "0"))
+ if err != nil {
+ return 0, err
+ }
+ defer listener.Close()
+ addr, ok := listener.Addr().(*net.TCPAddr)
+ if !ok {
+ return 0, fmt.Errorf("unexpected listener addr type %T", listener.Addr())
+ }
+ return addr.Port, nil
+}
+
+func localProxyPortAvailable(port int) bool {
+ listener, err := net.Listen("tcp", net.JoinHostPort(config.LocalProxyHost, fmt.Sprintf("%d", port)))
+ if err != nil {
+ return false
+ }
+ _ = listener.Close()
+ return true
+}
diff --git a/internal/engine/watchdog.go b/internal/engine/watchdog.go
new file mode 100644
index 0000000..a27945f
--- /dev/null
+++ b/internal/engine/watchdog.go
@@ -0,0 +1,147 @@
+package engine
+
+import (
+ "context"
+ "log"
+ "time"
+
+ "vpnem/internal/config"
+ "vpnem/internal/models"
+)
+
+// WatchdogConfig holds watchdog parameters.
+type WatchdogConfig struct {
+ CheckInterval time.Duration // how often to check sing-box is alive (default 2s)
+ DeepCheckInterval time.Duration // how often to verify exit IP (default 30s)
+ ReconnectCooldown time.Duration // min time between reconnect attempts (default 5s)
+}
+
+// DefaultWatchdogConfig returns the default watchdog settings (from vpn.py).
+func DefaultWatchdogConfig() WatchdogConfig {
+ return WatchdogConfig{
+ CheckInterval: 2 * time.Second,
+ DeepCheckInterval: 30 * time.Second,
+ ReconnectCooldown: 5 * time.Second,
+ }
+}
+
+// Watchdog monitors sing-box and auto-reconnects on failure.
+type Watchdog struct {
+ engine *Engine
+ cfg WatchdogConfig
+ cancel context.CancelFunc
+ running bool
+
+ // Reconnect parameters (set via StartWatching)
+ server models.Server
+ mode config.Mode
+ ruleSets []models.RuleSet
+ serverIPs []string
+ customBypass []string
+ localProxyPort int
+ policy *models.RoutingPolicy
+}
+
+// NewWatchdog creates a new watchdog for the given engine.
+func NewWatchdog(engine *Engine, cfg WatchdogConfig) *Watchdog {
+ return &Watchdog{
+ engine: engine,
+ cfg: cfg,
+ }
+}
+
+// StartWatching begins monitoring. It stores the connection params for reconnection.
+func (w *Watchdog) StartWatching(server models.Server, mode config.Mode, ruleSets []models.RuleSet, serverIPs []string, customBypass []string, localProxyPort int, policy *models.RoutingPolicy) {
+ w.StopWatching()
+
+ w.server = server
+ w.mode = mode
+ w.ruleSets = ruleSets
+ w.serverIPs = serverIPs
+ w.customBypass = append([]string{}, customBypass...)
+ w.localProxyPort = localProxyPort
+ w.policy = policy
+
+ ctx, cancel := context.WithCancel(context.Background())
+ w.cancel = cancel
+ w.running = true
+
+ go w.loop(ctx)
+}
+
+// StopWatching stops the watchdog.
+func (w *Watchdog) StopWatching() {
+ if w.cancel != nil {
+ w.cancel()
+ }
+ w.running = false
+}
+
+// IsWatching returns whether the watchdog is active.
+func (w *Watchdog) IsWatching() bool {
+ return w.running
+}
+
+func (w *Watchdog) loop(ctx context.Context) {
+ ticker := time.NewTicker(w.cfg.CheckInterval)
+ defer ticker.Stop()
+
+ deepTicker := time.NewTicker(w.cfg.DeepCheckInterval)
+ defer deepTicker.Stop()
+
+ lastReconnect := time.Time{}
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+
+ case <-ticker.C:
+ if !w.engine.IsRunning() {
+ if time.Since(lastReconnect) < w.cfg.ReconnectCooldown {
+ continue
+ }
+ localProxyPort, err := ResolveLocalProxyPort()
+ if err != nil {
+ log.Printf("watchdog: local proxy port selection failed: %v", err)
+ continue
+ }
+ w.localProxyPort = localProxyPort
+ log.Println("watchdog: sing-box not running, reconnecting...")
+ if err := w.engine.StartFull(w.server, w.mode, w.ruleSets, w.serverIPs, w.customBypass, w.localProxyPort, w.policy); err != nil {
+ log.Printf("watchdog: reconnect failed: %v", err)
+ } else {
+ log.Println("watchdog: reconnected successfully")
+ }
+ lastReconnect = time.Now()
+ }
+
+ case <-deepTicker.C:
+ if !w.engine.IsRunning() {
+ continue
+ }
+ exitIP := CheckExitIP(w.localProxyPort)
+ _, probeErr := ProbeBlockedSite(w.localProxyPort, DefaultBlockedSiteProbeURL, 8*time.Second)
+ if DeepCheckRequiresRestart(w.mode, exitIP, probeErr) {
+ if ModeRequiresExitIP(w.mode) {
+ log.Println("watchdog: deep check failed (no exit IP), restarting...")
+ } else {
+ log.Printf("watchdog: deep check failed for direct-final mode (%v), restarting...", probeErr)
+ }
+ if time.Since(lastReconnect) < w.cfg.ReconnectCooldown {
+ continue
+ }
+ localProxyPort, err := ResolveLocalProxyPort()
+ if err != nil {
+ log.Printf("watchdog: local proxy port selection failed: %v", err)
+ continue
+ }
+ w.localProxyPort = localProxyPort
+ if err := w.engine.RestartFull(w.server, w.mode, w.ruleSets, w.serverIPs, w.customBypass, w.localProxyPort, w.policy); err != nil {
+ log.Printf("watchdog: restart failed: %v", err)
+ }
+ lastReconnect = time.Now()
+ }
+ }
+ }
+}