From eaa9316c7b8807ee0b88cc4a3875f26c6878ecaa Mon Sep 17 00:00:00 2001 From: Fangliding Date: Sun, 17 May 2026 19:49:42 +0800 Subject: [PATCH] Timeout --- proxy/socks/protocol.go | 8 ++--- proxy/socks/server.go | 7 +++-- proxy/socks/temp_udp_listen.go | 56 ++++++++++++++++++++++++++++------ 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/proxy/socks/protocol.go b/proxy/socks/protocol.go index a5233508..58a6681a 100644 --- a/proxy/socks/protocol.go +++ b/proxy/socks/protocol.go @@ -137,7 +137,7 @@ func (s *ServerSession) auth5(nMethod byte, reader io.Reader, writer io.Writer) return "", nil } -func (s *ServerSession) handshake5(nMethod byte, reader io.Reader, writer io.Writer) (*protocol.RequestHeader, *TempUDPConn, error) { +func (s *ServerSession) handshake5(nMethod byte, reader io.Reader, writer net.Conn) (*protocol.RequestHeader, *TempUDPConn, error) { var ( username string err error @@ -205,9 +205,7 @@ func (s *ServerSession) handshake5(nMethod byte, reader io.Reader, writer io.Wri return nil, nil, errors.New("failed to create UDP listener").Base(err) } responsePort = net.Port(udpHub.LocalAddr().(*net.UDPAddr).Port) - tempUDPConn = &TempUDPConn{ - UDPConn: udpHub, - } + tempUDPConn = NewTempUDPConn(udpHub, writer) } if err := writeSocks5Response(writer, statusSuccess, responseAddress, responsePort); err != nil { common.CloseIfExists(tempUDPConn) @@ -218,7 +216,7 @@ func (s *ServerSession) handshake5(nMethod byte, reader io.Reader, writer io.Wri } // Handshake performs a Socks4/4a/5 handshake. -func (s *ServerSession) Handshake(reader io.Reader, writer io.Writer) (*protocol.RequestHeader, *TempUDPConn, error) { +func (s *ServerSession) Handshake(reader io.Reader, writer net.Conn) (*protocol.RequestHeader, *TempUDPConn, error) { buffer := buf.StackNew() if _, err := buffer.ReadFullFrom(reader, 2); err != nil { buffer.Release() diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 8c4dc694..fea81515 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -166,13 +166,16 @@ func (s *Server) processTCP(ctx context.Context, conn stat.Connection, dispatche if tempUDPConn == nil { return errors.New("UDP associate with listen port failed") } - ctx, cancel := context.WithCancel(ctx) + tempUDPConn.SetTimeout(plcy.Timeouts.ConnectionIdle) errCh := make(chan error, 1) go func() { errCh <- s.handleUDPPayload(ctx, tempUDPConn, dispatcher) }() + // Asociate TCP keeps the UDP alive + // Close UDP if TCP connection is closed + // Or Close TCP if UDP is idle timeout io.Copy(buf.DiscardBytes, conn) - cancel() + tempUDPConn.Close() return <-errCh } return nil diff --git a/proxy/socks/temp_udp_listen.go b/proxy/socks/temp_udp_listen.go index 5fec57f3..70150ef7 100644 --- a/proxy/socks/temp_udp_listen.go +++ b/proxy/socks/temp_udp_listen.go @@ -1,30 +1,54 @@ package socks import ( + "context" "net" - sync "sync" + "sync/atomic" + "time" "github.com/xtls/xray-core/common/errors" + "github.com/xtls/xray-core/common/signal" ) +func NewTempUDPConn(udpConn *net.UDPConn, tcpConn net.Conn) *TempUDPConn { + return &TempUDPConn{ + UDPConn: udpConn, + AssociateTCPConn: tcpConn, + } +} + +// TempUDPConn wait for the first packet to determine the remote address +// SetTimeout MUST be called before any read/write operation type TempUDPConn struct { *net.UDPConn - once sync.Once - remote net.Addr + AssociateTCPConn net.Conn + + timer *signal.ActivityTimer + firstPacketDone atomic.Bool + remote *net.UDPAddr } func (c *TempUDPConn) Read(b []byte) (n int, err error) { - n, addr, err := c.ReadFrom(b) - if err != nil { - return 0, err + c.timer.Update() + if c.firstPacketDone.CompareAndSwap(false, true) { + n, remote, err := c.UDPConn.ReadFromUDP(b) + c.remote = remote + return n, err + } + for { + n, remote, err := c.UDPConn.ReadFromUDP(b) + if err != nil { + return n, err + } + if remote.AddrPort() != c.remote.AddrPort() { + continue + } + return n, err } - c.once.Do(func() { - c.remote = addr - }) - return n, nil } func (c *TempUDPConn) Write(b []byte) (n int, err error) { + c.timer.Update() if c.remote == nil { return 0, errors.New("remote address not determined yet") } @@ -34,3 +58,15 @@ func (c *TempUDPConn) Write(b []byte) (n int, err error) { func (c *TempUDPConn) RemoteAddr() net.Addr { return c.remote } + +func (c *TempUDPConn) SetTimeout(d time.Duration) { + c.timer = signal.CancelAfterInactivity(context.Background(), func() { + c.Close() + }, d) +} + +func (c *TempUDPConn) Close() error { + c.timer.SetTimeout(0) + c.AssociateTCPConn.Close() + return c.UDPConn.Close() +}