reverseproxy: collapse WebTransport handler into main proxy loop

The WT path duplicated upstream resolution, LB selection, header ops,
replacer vars, and in-flight counters. Route WT through the shared
ServeHTTP -> proxyLoopIteration -> reverseProxy flow and swap RoundTrip
for a small webTransportHijack that only does WT-specific work (writer
unwrap, upstream dial, client upgrade, pump).

Rename roundtripSucceededError -> terminalError. The existing name
described when it was emitted (after a successful round-trip); the
new name describes its contract with the retry loop (stop looping,
propagate error unchanged). The WebTransport upgrade case is a second
natural caller for that same signal.

Comes with two behavior improvements that fall out of the collapse:
  - WT upstream dial failures now surface as DialError, so the loop
    can fail over across upstreams like normal proxies (today: 502).
  - Passive health checks apply to WT dials (dial-failure countFailure
    and UnhealthyLatency on dial duration) via the shared path.

Addresses reviewer feedback that the duplicated setup was a
maintenance risk.
This commit is contained in:
tomholford 2026-04-23 22:46:32 -07:00
parent 901ec0e751
commit a33a4f8dfc
2 changed files with 75 additions and 140 deletions

View file

@ -502,18 +502,6 @@ func (h *Handler) Cleanup() error {
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyhttp.Handler) error {
repl := r.Context().Value(caddy.ReplacerCtxKey).(*caddy.Replacer)
// WebTransport: HTTP/3 Extended CONNECT with :protocol=webtransport
// can't flow through the normal HTTP round-trip — the session hosts
// many QUIC streams and datagrams that need bidirectional pumping.
// Detect it here the same way the handler detects a WebSocket
// upgrade: by request shape, not by a per-handler config flag. The
// underlying *webtransport.Server only exists when the parent
// server has enable_webtransport set, so serveWebTransport fails
// fast and clearly if a WT request reaches a non-WT server.
if isWebTransportExtendedConnect(r) {
return h.serveWebTransport(w, r)
}
// prepare the request for proxying; this is needed only once
clonedReq, err := h.prepareRequest(r, repl)
if err != nil {
@ -702,12 +690,13 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
return true, nil
}
// if the roundtrip was successful, don't retry the request or
// ding the health status of the upstream (an error can still
// occur after the roundtrip if, for example, a response handler
// after the roundtrip returns an error)
if succ, ok := proxyErr.(roundtripSucceededError); ok {
return true, succ.error
// if the handler has already committed a client-visible response
// (e.g. a successful roundtrip whose handle_response route errored,
// or a WebTransport upgrade that flushed 200 OK and hijacked the
// stream), don't retry against another upstream and don't ding the
// upstream's health status
if term, ok := proxyErr.(terminalError); ok {
return true, term.error
}
// remember this failure (if enabled); response-based retries
@ -1003,6 +992,18 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe
server := req.Context().Value(caddyhttp.ServerCtxKey).(*caddyhttp.Server)
shouldLogCredentials := server.Logs != nil && server.Logs.ShouldLogCredentials
// WebTransport: Extended CONNECT :protocol=webtransport can't flow
// through the normal HTTP round-trip — the session hosts many QUIC
// streams and datagrams that need bidirectional pumping. Swap
// RoundTrip for a small hijack helper. Upstream dial failures
// surface as DialError so the loop can retry across upstreams;
// pre-upgrade misconfig and post-upgrade failures return
// terminalError because the response is already committed to the
// client or no upstream can fix the condition.
if isWebTransportExtendedConnect(origReq) {
return h.webTransportHijack(rw, req, repl, di, server)
}
// Forward 1xx status codes, backported from https://github.com/golang/go/pull/53164
var (
roundTripMutex sync.Mutex
@ -1188,10 +1189,10 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origRe
res.Body.Close()
}
// wrap any route error in roundtripSucceededError so caller knows that
// the roundtrip was successful and to not retry
// wrap any route error in terminalError so the outer loop knows
// the response is committed and must not be retried
if routeErr != nil {
return roundtripSucceededError{routeErr}
return terminalError{routeErr}
}
// we're done handling the response, and we don't want to
@ -1788,9 +1789,16 @@ func matcherSetHasExpressionMatcher(matcherSet caddyhttp.MatcherSet) bool {
return false
}
// roundtripSucceededError is an error type that is returned if the
// roundtrip succeeded, but an error occurred after-the-fact.
type roundtripSucceededError struct{ error }
// terminalError signals that the proxy loop must stop and the wrapped
// error must be propagated unchanged. It is emitted in any situation
// where the handler has committed enough client-visible state that
// retrying against another upstream would be unsafe — for example, a
// handle_response route that ran after a successful round-trip, or a
// WebTransport upgrade that already flushed 200 OK and hijacked the
// stream. The inner error may be nil to signal terminal success.
type terminalError struct{ error }
func (e terminalError) Unwrap() error { return e.error }
// retryableResponseError is returned when the upstream response matched
// a retry_match entry, indicating the request should be retried with the

View file

@ -20,13 +20,10 @@ import (
"errors"
"fmt"
"net/http"
"time"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"github.com/quic-go/webtransport-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
@ -54,157 +51,87 @@ func isWebTransportExtendedConnect(r *http.Request) bool {
return r.ProtoMajor == 3 && r.Method == http.MethodConnect && r.Proto == webtransportProtocol
}
// serveWebTransport handles a WebTransport Extended CONNECT: selects an
// upstream, dials the upstream-side session, upgrades the client-side
// session, and runs the session pump until both sides close.
// webTransportHijack runs inside reverseProxy in place of RoundTrip when
// the request is a WebTransport Extended CONNECT. The outer proxy loop
// has already resolved the upstream set, selected an upstream, filled
// DialInfo, published reverse_proxy.upstream.* placeholders, applied
// transport and user request-header ops, cloned the request, directed
// the request URL at the upstream, and bumped in-flight counters — so
// this function only does WT-specific plumbing: upstream WT dial,
// client upgrade, and session pumping.
//
// The upstream is dialed *before* the client is upgraded so that a refused
// or unreachable upstream surfaces as a real 5xx on the client's Dial —
// not as a bare post-upgrade session close. There are no retries: WT
// sessions are long-lived and not idempotent.
// Error semantics match the outer loop's retry contract:
// - Pre-dial misconfiguration (WT not enabled on the server, writer
// stack unsupported, handler transport is not HTTP/3) returns
// terminalError — no upstream can fix these conditions.
// - Upstream dial failure returns DialError — safe to retry across
// upstreams because no client-visible bytes have been written.
// - Post-upgrade failures return terminalError because the 200 OK
// has been flushed and the stream is hijacked.
//
// The outgoing CONNECT is prepared with the same Rewrite, hop-by-hop
// stripping, X-Forwarded-*/Via, transport- and user-configured header ops
// as the normal proxy path. Response-header ops (gated by `Require`, if
// configured) apply to the headers the client sees on the 200 OK.
// Requests that reach this function are already known to be WebTransport;
// callers should gate with isWebTransportExtendedConnect.
func (h *Handler) serveWebTransport(w http.ResponseWriter, r *http.Request) error {
repl := r.Context().Value(caddy.ReplacerCtxKey).(*caddy.Replacer)
start := time.Now()
defer func() {
d := time.Since(start)
repl.Set("http.reverse_proxy.duration", d)
repl.Set("http.reverse_proxy.duration_ms", d.Seconds()*1e3)
}()
srv, ok := r.Context().Value(caddyhttp.ServerCtxKey).(*caddyhttp.Server)
if !ok || srv == nil {
return caddyhttp.Error(http.StatusInternalServerError,
errors.New("webtransport: no caddyhttp.Server in request context"))
}
wtServer, ok := srv.WebTransportServer().(*webtransport.Server)
func (h *Handler) webTransportHijack(rw http.ResponseWriter, req *http.Request, repl *caddy.Replacer, di DialInfo, server *caddyhttp.Server) error {
wtServer, ok := server.WebTransportServer().(*webtransport.Server)
if !ok || wtServer == nil {
return caddyhttp.Error(http.StatusInternalServerError,
errors.New("webtransport: server has enable_webtransport=false or HTTP/3 is not enabled"))
}
if h.LoadBalancing == nil || h.LoadBalancing.SelectionPolicy == nil {
return caddyhttp.Error(http.StatusInternalServerError,
errors.New("webtransport: load balancer is not configured"))
}
// Resolve the candidate upstream set (static or dynamic) and select
// one. WT sessions are long-lived and not idempotent, so there are no
// retries; picking once matches how operators expect WT to behave.
upstreams := h.resolveUpstreams(r, 0)
upstream := h.LoadBalancing.SelectionPolicy.Select(upstreams, r, w)
if upstream == nil {
return caddyhttp.Error(http.StatusBadGateway, errNoUpstream)
}
// Resolve per-upstream placeholders (addresses may include them) and
// publish the {http.reverse_proxy.upstream.*} replacer values before
// we commit to upgrading — so any client-side failure logs downstream
// see the selected upstream too.
dialInfo, err := upstream.fillDialInfo(repl)
if err != nil {
return caddyhttp.Error(http.StatusInternalServerError,
fmt.Errorf("webtransport: making dial info: %w", err))
}
setUpstreamReplacerVars(repl, upstream, dialInfo)
// Prepare the outgoing request the same way normal proxying does —
// Rewrite, hop-by-hop stripping, X-Forwarded-*, Via, etc. — then apply
// transport and user header ops. prepareRequest's body-buffering and
// Early-Data paths are no-ops for a CONNECT request (empty body).
clonedReq, err := h.prepareRequest(r, repl)
if err != nil {
return caddyhttp.Error(http.StatusInternalServerError,
fmt.Errorf("webtransport: preparing request: %w", err))
}
if h.transportHeaderOps != nil {
h.transportHeaderOps.ApplyToRequest(clonedReq)
}
if h.Headers != nil && h.Headers.Request != nil {
h.Headers.Request.ApplyToRequest(clonedReq)
return terminalError{caddyhttp.Error(http.StatusInternalServerError,
errors.New("webtransport: server has enable_webtransport=false or HTTP/3 is not enabled"))}
}
// Reach the naked http3 response writer so Upgrade's type assertions
// succeed through Caddy's wrapper chain. Done before dialing so we
// fail fast if the writer stack is unexpectedly incompatible.
naked, ok := caddyhttp.UnwrapResponseWriterAs[webtransportWriter](w)
naked, ok := caddyhttp.UnwrapResponseWriterAs[webtransportWriter](rw)
if !ok {
return caddyhttp.Error(http.StatusInternalServerError,
errors.New("webtransport: response writer does not support WebTransport upgrade"))
return terminalError{caddyhttp.Error(http.StatusInternalServerError,
errors.New("webtransport: response writer does not support WebTransport upgrade"))}
}
// A WT CONNECT reached this handler because the parent server has
// enable_webtransport=true. But the handler's transport still has to
// speak HTTP/3 to dial WT upstream. Fail fast and clearly if it
// doesn't, the same way we'd fail on an unreachable upstream.
// speak HTTP/3 to dial the WT upstream.
ht, ok := h.Transport.(*HTTPTransport)
if !ok {
return caddyhttp.Error(http.StatusBadGateway,
errors.New("webtransport: requires the 'http' transport with versions [\"3\"]"))
return terminalError{caddyhttp.Error(http.StatusBadGateway,
errors.New("webtransport: requires the 'http' transport with versions [\"3\"]"))}
}
if ht.h3Transport == nil {
return caddyhttp.Error(http.StatusBadGateway,
errors.New("webtransport: transport does not include HTTP/3; set versions to [\"3\"]"))
return terminalError{caddyhttp.Error(http.StatusBadGateway,
errors.New("webtransport: transport does not include HTTP/3; set versions to [\"3\"]"))}
}
// Dial the upstream BEFORE upgrading the client. If the upstream is
// unreachable or refuses the CONNECT, a proper 5xx goes back over the
// H3 stream and the client's Dial sees the real status — instead of
// an already-upgraded session closing immediately.
upstreamURL := buildWebTransportUpstreamURL(dialInfo.Address, clonedReq)
upstreamResp, upstreamSess, err := dialUpstreamWebTransport(r.Context(), ht.h3Transport.TLSClientConfig, upstreamURL, clonedReq.Header)
// an already-upgraded session closing immediately. DialError so the
// outer proxy loop can fail over to another upstream, same as any
// other dial failure.
upstreamURL := buildWebTransportUpstreamURL(di.Address, req)
upstreamResp, upstreamSess, err := dialUpstreamWebTransport(req.Context(), ht.h3Transport.TLSClientConfig, upstreamURL, req.Header)
if err != nil {
h.countFailure(upstream)
if c := h.logger.Check(zapcore.ErrorLevel, "webtransport upstream dial failed"); c != nil {
c.Write(
zap.String("upstream", upstreamURL),
zap.Error(err),
)
}
return caddyhttp.Error(http.StatusBadGateway,
fmt.Errorf("webtransport upstream dial: %w", err))
return DialError{fmt.Errorf("webtransport upstream dial: %w", err)}
}
defer upstreamResp.Body.Close()
// Response-header ops (gated by Require, if configured) are applied to
// the 200 OK the client will see. webtransport.Server.Upgrade flushes
// w.Header() along with the status, so setting these before Upgrade is
// sufficient. Matching against the upstream response mirrors the normal
// proxy path where upstream response == client response.
// Response-header ops (gated by Require, if configured) apply to the
// 200 OK the client will see. webtransport.Server.Upgrade flushes
// w.Header() along with the status, so setting these before Upgrade
// is sufficient. Matching against the upstream response mirrors the
// normal proxy path where upstream response == client response.
if h.Headers != nil && h.Headers.Response != nil {
if h.Headers.Response.Require == nil ||
h.Headers.Response.Require.Match(upstreamResp.StatusCode, upstreamResp.Header) {
h.Headers.Response.ApplyTo(w.Header(), repl)
h.Headers.Response.ApplyTo(rw.Header(), repl)
}
}
clientSess, err := wtServer.Upgrade(naked, r)
clientSess, err := wtServer.Upgrade(naked, req)
if err != nil {
_ = upstreamSess.CloseWithError(0, "client upgrade failed")
if c := h.logger.Check(zapcore.DebugLevel, "webtransport client upgrade failed"); c != nil {
c.Write(zap.Error(err))
}
return caddyhttp.Error(http.StatusBadRequest,
fmt.Errorf("webtransport upgrade: %w", err))
return terminalError{caddyhttp.Error(http.StatusBadRequest,
fmt.Errorf("webtransport upgrade: %w", err))}
}
// Track the session in the same upstream counters the normal proxy
// path maintains: Host.NumRequests drives MaxRequests gating and
// least-connections selection; the per-address in-flight counter
// feeds the admin API's upstream stats.
_ = dialInfo.Upstream.Host.countRequest(1)
incInFlightRequest(dialInfo.Address)
defer func() {
_ = dialInfo.Upstream.Host.countRequest(-1)
decInFlightRequest(dialInfo.Address)
}()
runWebTransportPump(clientSess, upstreamSess, h.logger)
return nil
}