This commit is contained in:
Fangliding
2026-05-17 19:49:42 +08:00
parent 8503230891
commit eaa9316c7b
3 changed files with 54 additions and 17 deletions
+3 -5
View File
@@ -137,7 +137,7 @@ func (s *ServerSession) auth5(nMethod byte, reader io.Reader, writer io.Writer)
return "", nil
}
func (s *ServerSession) handshake5(nMethod byte, reader io.Reader, writer io.Writer) (*protocol.RequestHeader, *TempUDPConn, error) {
func (s *ServerSession) handshake5(nMethod byte, reader io.Reader, writer net.Conn) (*protocol.RequestHeader, *TempUDPConn, error) {
var (
username string
err error
@@ -205,9 +205,7 @@ func (s *ServerSession) handshake5(nMethod byte, reader io.Reader, writer io.Wri
return nil, nil, errors.New("failed to create UDP listener").Base(err)
}
responsePort = net.Port(udpHub.LocalAddr().(*net.UDPAddr).Port)
tempUDPConn = &TempUDPConn{
UDPConn: udpHub,
}
tempUDPConn = NewTempUDPConn(udpHub, writer)
}
if err := writeSocks5Response(writer, statusSuccess, responseAddress, responsePort); err != nil {
common.CloseIfExists(tempUDPConn)
@@ -218,7 +216,7 @@ func (s *ServerSession) handshake5(nMethod byte, reader io.Reader, writer io.Wri
}
// Handshake performs a Socks4/4a/5 handshake.
func (s *ServerSession) Handshake(reader io.Reader, writer io.Writer) (*protocol.RequestHeader, *TempUDPConn, error) {
func (s *ServerSession) Handshake(reader io.Reader, writer net.Conn) (*protocol.RequestHeader, *TempUDPConn, error) {
buffer := buf.StackNew()
if _, err := buffer.ReadFullFrom(reader, 2); err != nil {
buffer.Release()
+5 -2
View File
@@ -166,13 +166,16 @@ func (s *Server) processTCP(ctx context.Context, conn stat.Connection, dispatche
if tempUDPConn == nil {
return errors.New("UDP associate with listen port failed")
}
ctx, cancel := context.WithCancel(ctx)
tempUDPConn.SetTimeout(plcy.Timeouts.ConnectionIdle)
errCh := make(chan error, 1)
go func() {
errCh <- s.handleUDPPayload(ctx, tempUDPConn, dispatcher)
}()
// Asociate TCP keeps the UDP alive
// Close UDP if TCP connection is closed
// Or Close TCP if UDP is idle timeout
io.Copy(buf.DiscardBytes, conn)
cancel()
tempUDPConn.Close()
return <-errCh
}
return nil
+46 -10
View File
@@ -1,30 +1,54 @@
package socks
import (
"context"
"net"
sync "sync"
"sync/atomic"
"time"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/signal"
)
func NewTempUDPConn(udpConn *net.UDPConn, tcpConn net.Conn) *TempUDPConn {
return &TempUDPConn{
UDPConn: udpConn,
AssociateTCPConn: tcpConn,
}
}
// TempUDPConn wait for the first packet to determine the remote address
// SetTimeout MUST be called before any read/write operation
type TempUDPConn struct {
*net.UDPConn
once sync.Once
remote net.Addr
AssociateTCPConn net.Conn
timer *signal.ActivityTimer
firstPacketDone atomic.Bool
remote *net.UDPAddr
}
func (c *TempUDPConn) Read(b []byte) (n int, err error) {
n, addr, err := c.ReadFrom(b)
if err != nil {
return 0, err
c.timer.Update()
if c.firstPacketDone.CompareAndSwap(false, true) {
n, remote, err := c.UDPConn.ReadFromUDP(b)
c.remote = remote
return n, err
}
for {
n, remote, err := c.UDPConn.ReadFromUDP(b)
if err != nil {
return n, err
}
if remote.AddrPort() != c.remote.AddrPort() {
continue
}
return n, err
}
c.once.Do(func() {
c.remote = addr
})
return n, nil
}
func (c *TempUDPConn) Write(b []byte) (n int, err error) {
c.timer.Update()
if c.remote == nil {
return 0, errors.New("remote address not determined yet")
}
@@ -34,3 +58,15 @@ func (c *TempUDPConn) Write(b []byte) (n int, err error) {
func (c *TempUDPConn) RemoteAddr() net.Addr {
return c.remote
}
func (c *TempUDPConn) SetTimeout(d time.Duration) {
c.timer = signal.CancelAfterInactivity(context.Background(), func() {
c.Close()
}, d)
}
func (c *TempUDPConn) Close() error {
c.timer.SetTimeout(0)
c.AssociateTCPConn.Close()
return c.UDPConn.Close()
}