suggestion: tail latency with goSched by AlliBalliBaba · Pull Request #2033 · php/frankenphp

Expand Up @@ -6,8 +6,10 @@ import ( "fmt" "os" "path/filepath" "runtime" "strings" "sync" "sync/atomic" "time"
"github.com/dunglas/frankenphp/internal/fastabs" Expand All @@ -28,6 +30,7 @@ type worker struct { maxConsecutiveFailures int onThreadReady func(int) onThreadShutdown func(int) queuedRequests atomic.Int32 }
var ( Expand Down Expand Up @@ -253,24 +256,30 @@ func (worker *worker) isAtThreadLimit() bool { func (worker *worker) handleRequest(ch contextHolder) error { metrics.StartWorkerRequest(worker.name)
// dispatch requests to all worker threads in order worker.threadMutex.RLock() for _, thread := range worker.threads { select { case thread.requestChan <- ch: worker.threadMutex.RUnlock() <-ch.frankenPHPContext.done metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) runtime.Gosched()
return nil default: // thread is busy, continue if worker.queuedRequests.Load() == 0 { // dispatch requests to all worker threads in order worker.threadMutex.RLock() for _, thread := range worker.threads { select { case thread.requestChan <- ch: worker.threadMutex.RUnlock() <-ch.frankenPHPContext.done metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
return nil default: // thread is busy, continue } } worker.threadMutex.RUnlock() } worker.threadMutex.RUnlock()
// if no thread was available, mark the request as queued and apply the scaling strategy worker.queuedRequests.Add(1) metrics.QueuedWorkerRequest(worker.name)
for { workerScaleChan := scaleChan if worker.isAtThreadLimit() { Expand All @@ -279,6 +288,7 @@ func (worker *worker) handleRequest(ch contextHolder) error {
select { case worker.requestChan <- ch: worker.queuedRequests.Add(-1) metrics.DequeuedWorkerRequest(worker.name) <-ch.frankenPHPContext.done metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) Expand All @@ -288,7 +298,9 @@ func (worker *worker) handleRequest(ch contextHolder) error { // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): // the request has timed out stalling worker.queuedRequests.Add(-1) metrics.DequeuedWorkerRequest(worker.name) metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded)
Expand Down