reverseproxy: extract shared upstream-selection helpers

The WebTransport proxy path in serveWebTransport duplicated the
dynamic-upstream-fallback block and the {http.reverse_proxy.upstream.*}
replacer-variable block from proxyLoopIteration. Francis flagged this
as a maintenance burden in review of #7669.

Extract two helpers:

  * resolveUpstreams(r) returns the candidate upstream set — dynamic
    when configured (with provisioning + fallback-on-error), static
    otherwise. Caller runs the LB selection policy, since the two call
    sites diverge on how selection failure is reported (retry loop vs.
    fast 502 for long-lived WT sessions).

  * setUpstreamReplacerVars(repl, up, di) publishes the seven
    placeholders describing the selected upstream.

Both are used by proxyLoopIteration and serveWebTransport with
identical semantics to the inlined code they replace. No behavior
change for either path.
This commit is contained in:
tomholford 2026-04-23 11:03:20 -07:00
parent 29737fec9b
commit 7f4ff3a1db
2 changed files with 54 additions and 56 deletions

View file

@ -72,6 +72,56 @@ func getInFlightRequests() map[string]int64 {
return copyMap
}
// resolveUpstreams returns the candidate upstream set for this request:
// dynamic upstreams when configured (with fallback to static on error),
// static upstreams otherwise. Any dynamic upstream entries are provisioned
// before return. When retries > 0 and the dynamic source implements
// CachingUpstreamSource, its cache is reset first so the next attempt
// does not keep retrying a known-bad upstream.
func (h *Handler) resolveUpstreams(r *http.Request, retries int) []*Upstream {
upstreams := h.Upstreams
if h.DynamicUpstreams == nil {
return upstreams
}
if retries > 0 {
// after a failure (and thus during a retry), give dynamic upstream modules an opportunity
// to purge their relevant cache entries so we don't keep retrying bad upstreams
if cachingDynamicUpstreams, ok := h.DynamicUpstreams.(CachingUpstreamSource); ok {
if err := cachingDynamicUpstreams.ResetCache(r); err != nil {
if c := h.logger.Check(zapcore.ErrorLevel, "failed clearing dynamic upstream source's cache"); c != nil {
c.Write(zap.Error(err))
}
}
}
}
dUpstreams, err := h.DynamicUpstreams.GetUpstreams(r)
if err != nil {
if c := h.logger.Check(zapcore.ErrorLevel, "failed getting dynamic upstreams; falling back to static upstreams"); c != nil {
c.Write(zap.Error(err))
}
return upstreams
}
for _, dUp := range dUpstreams {
h.provisionUpstream(dUp, true)
}
if c := h.logger.Check(zapcore.DebugLevel, "provisioned dynamic upstreams"); c != nil {
c.Write(zap.Int("count", len(dUpstreams)))
}
return dUpstreams
}
// setUpstreamReplacerVars populates the {http.reverse_proxy.upstream.*}
// placeholders describing the selected upstream.
func setUpstreamReplacerVars(repl *caddy.Replacer, upstream *Upstream, di DialInfo) {
repl.Set("http.reverse_proxy.upstream.address", di.String())
repl.Set("http.reverse_proxy.upstream.hostport", di.Address)
repl.Set("http.reverse_proxy.upstream.host", di.Host)
repl.Set("http.reverse_proxy.upstream.port", di.Port)
repl.Set("http.reverse_proxy.upstream.requests", upstream.Host.NumRequests())
repl.Set("http.reverse_proxy.upstream.max_requests", upstream.MaxRequests)
repl.Set("http.reverse_proxy.upstream.fails", upstream.Host.Fails())
}
func init() {
caddy.RegisterModule(Handler{})
}
@ -592,34 +642,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
repl *caddy.Replacer, reqHeader http.Header, reqHost string, next caddyhttp.Handler,
) (bool, error) {
// get the updated list of upstreams
upstreams := h.Upstreams
if h.DynamicUpstreams != nil {
if retries > 0 {
// after a failure (and thus during a retry), give dynamic upstream modules an opportunity
// to purge their relevant cache entries so we don't keep retrying bad upstreams
if cachingDynamicUpstreams, ok := h.DynamicUpstreams.(CachingUpstreamSource); ok {
if err := cachingDynamicUpstreams.ResetCache(r); err != nil {
if c := h.logger.Check(zapcore.ErrorLevel, "failed clearing dynamic upstream source's cache"); c != nil {
c.Write(zap.Error(err))
}
}
}
}
dUpstreams, err := h.DynamicUpstreams.GetUpstreams(r)
if err != nil {
if c := h.logger.Check(zapcore.ErrorLevel, "failed getting dynamic upstreams; falling back to static upstreams"); c != nil {
c.Write(zap.Error(err))
}
} else {
upstreams = dUpstreams
for _, dUp := range dUpstreams {
h.provisionUpstream(dUp, true)
}
if c := h.logger.Check(zapcore.DebugLevel, "provisioned dynamic upstreams"); c != nil {
c.Write(zap.Int("count", len(dUpstreams)))
}
}
}
upstreams := h.resolveUpstreams(r, retries)
// choose an available upstream
upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w)
@ -654,13 +677,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
caddyhttp.SetVar(r.Context(), dialInfoVarKey, dialInfo)
// set placeholders with information about this upstream
repl.Set("http.reverse_proxy.upstream.address", dialInfo.String())
repl.Set("http.reverse_proxy.upstream.hostport", dialInfo.Address)
repl.Set("http.reverse_proxy.upstream.host", dialInfo.Host)
repl.Set("http.reverse_proxy.upstream.port", dialInfo.Port)
repl.Set("http.reverse_proxy.upstream.requests", upstream.Host.NumRequests())
repl.Set("http.reverse_proxy.upstream.max_requests", upstream.MaxRequests)
repl.Set("http.reverse_proxy.upstream.fails", upstream.Host.Fails())
setUpstreamReplacerVars(repl, upstream, dialInfo)
// mutate request headers according to this upstream;
// because we're in a retry loop, we have to copy headers

View file

@ -82,20 +82,7 @@ func (h *Handler) serveWebTransport(w http.ResponseWriter, r *http.Request) erro
// Resolve the candidate upstream set (static or dynamic) and select
// one. WT sessions are long-lived and not idempotent, so there are no
// retries; picking once matches how operators expect WT to behave.
upstreams := h.Upstreams
if h.DynamicUpstreams != nil {
dynUpstreams, err := h.DynamicUpstreams.GetUpstreams(r)
if err != nil {
if c := h.logger.Check(zapcore.WarnLevel, "webtransport: dynamic upstreams failed; falling back to static"); c != nil {
c.Write(zap.Error(err))
}
} else {
upstreams = dynUpstreams
for _, dUp := range dynUpstreams {
h.provisionUpstream(dUp, true)
}
}
}
upstreams := h.resolveUpstreams(r, 0)
upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w)
if upstream == nil {
return caddyhttp.Error(http.StatusBadGateway, errNoUpstream)
@ -110,13 +97,7 @@ func (h *Handler) serveWebTransport(w http.ResponseWriter, r *http.Request) erro
return caddyhttp.Error(http.StatusInternalServerError,
fmt.Errorf("webtransport: making dial info: %w", err))
}
repl.Set("http.reverse_proxy.upstream.address", dialInfo.String())
repl.Set("http.reverse_proxy.upstream.hostport", dialInfo.Address)
repl.Set("http.reverse_proxy.upstream.host", dialInfo.Host)
repl.Set("http.reverse_proxy.upstream.port", dialInfo.Port)
repl.Set("http.reverse_proxy.upstream.requests", upstream.Host.NumRequests())
repl.Set("http.reverse_proxy.upstream.max_requests", upstream.MaxRequests)
repl.Set("http.reverse_proxy.upstream.fails", upstream.Host.Fails())
setUpstreamReplacerVars(repl, upstream, dialInfo)
// Prepare the outgoing request the same way normal proxying does —
// Rewrite, hop-by-hop stripping, X-Forwarded-*, Via, etc. — then apply