Merge branch 'MHSanaei:main' into main

This commit is contained in:
Farhad H. P. Shirvan 2026-05-11 16:34:40 +02:00 committed by GitHub
commit fd906aac98
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -21,39 +21,64 @@ type trafficWriteRequest struct {
}
var (
twMu sync.Mutex
twQueue chan *trafficWriteRequest
twCtx context.Context
twCancel context.CancelFunc
twDone chan struct{}
twOnce sync.Once
)
// StartTrafficWriter spins up the serial writer goroutine. Safe to call again
// after StopTrafficWriter — each Start/Stop cycle gets fresh channels. The
// previous sync.Once-based implementation deadlocked after a SIGHUP-driven
// panel restart: Stop killed the consumer goroutine but Once prevented Start
// from spawning a new one, so every later submitTrafficWrite blocked forever
// on <-req.done with no consumer (including the AddTraffic call inside
// XrayService.GetXrayConfig that runs from startTask).
func StartTrafficWriter() {
twOnce.Do(func() {
twQueue = make(chan *trafficWriteRequest, trafficWriterQueueSize)
twCtx, twCancel = context.WithCancel(context.Background())
twDone = make(chan struct{})
go runTrafficWriter()
})
twMu.Lock()
defer twMu.Unlock()
if twQueue != nil {
return
}
queue := make(chan *trafficWriteRequest, trafficWriterQueueSize)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
twQueue = queue
twCancel = cancel
twDone = done
go runTrafficWriter(queue, ctx, done)
}
// StopTrafficWriter cancels the writer context and waits for the goroutine to
// drain any pending requests before returning. Resets the package state so a
// subsequent StartTrafficWriter can spawn a fresh consumer.
func StopTrafficWriter() {
if twCancel != nil {
twCancel()
<-twDone
twMu.Lock()
cancel := twCancel
done := twDone
twQueue = nil
twCancel = nil
twDone = nil
twMu.Unlock()
if cancel != nil {
cancel()
}
if done != nil {
<-done
}
}
func runTrafficWriter() {
defer close(twDone)
func runTrafficWriter(queue chan *trafficWriteRequest, ctx context.Context, done chan struct{}) {
defer close(done)
for {
select {
case req := <-twQueue:
case req := <-queue:
req.done <- safeApply(req.apply)
case <-twCtx.Done():
case <-ctx.Done():
for {
select {
case req := <-twQueue:
case req := <-queue:
req.done <- safeApply(req.apply)
default:
return
@ -74,12 +99,16 @@ func safeApply(fn func() error) (err error) {
}
func submitTrafficWrite(fn func() error) error {
if twQueue == nil {
twMu.Lock()
queue := twQueue
twMu.Unlock()
if queue == nil {
return safeApply(fn)
}
req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)}
select {
case twQueue <- req:
case queue <- req:
case <-time.After(trafficWriterSubmitTimeout):
return errors.New("traffic writer queue full")
}