Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
root = true

[*]
charset = utf-8
end_of_line = lf
indent_size = 2
indent_style = space
insert_final_newline = true
trim_trailing_whitespace = true
max_line_length = 120

[*.go]
indent_style = tab

[Makefile]
indent_style = tab
2 changes: 2 additions & 0 deletions .github/workflows/test-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ jobs:
- TestACLAutogroupTagged
- TestACLAutogroupSelf
- TestACLPolicyPropagationOverTime
- TestACLTagPropagation
- TestACLTagPropagationPortSpecific
- TestAPIAuthenticationBypass
- TestAPIAuthenticationBypassCurl
- TestGRPCAuthenticationBypass
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.integration
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ RUN CGO_ENABLED=0 GOOS=linux go build -gcflags="all=-N -l" -o /go/bin/headscale
FROM debian:trixie-slim

RUN apt-get --update install --no-install-recommends --yes \
less jq sqlite3 dnsutils ca-certificates procps bash findutils curl traceroute python3 \
bash ca-certificates curl dnsutils findutils iproute2 jq less procps python3 sqlite3 \
&& apt-get dist-clean

RUN mkdir -p /var/run/headscale
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.integration-ci
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
FROM debian:trixie-slim

RUN apt-get --update install --no-install-recommends --yes \
less jq sqlite3 dnsutils ca-certificates procps bash findutils curl traceroute \
bash ca-certificates curl dnsutils findutils iproute2 jq less procps python3 sqlite3 \
&& apt-get dist-clean

RUN mkdir -p /var/run/headscale
Expand Down
4 changes: 3 additions & 1 deletion Dockerfile.tailscale-HEAD
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ RUN GOARCH=$TARGETARCH go install -tags="${BUILD_TAGS}" -ldflags="\
-v ./cmd/tailscale ./cmd/tailscaled ./cmd/containerboot

FROM alpine:3.22
RUN apk add --no-cache ca-certificates iptables iproute2 ip6tables curl traceroute
# Upstream: ca-certificates ip6tables iptables iproute2
# Tests: curl python3 (traceroute via BusyBox)
RUN apk add --no-cache ca-certificates curl ip6tables iptables iproute2 python3

COPY --from=build-env /go/bin/* /usr/local/bin/
# For compat with the previous run.sh, although ideally you should be
Expand Down
3 changes: 0 additions & 3 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,6 @@
buf
clang-tools # clang-format
protobuf-language-server

# Add hi to make it even easier to use ci runner.
hi
]
++ lib.optional pkgs.stdenv.isLinux [ traceroute ];

Expand Down
8 changes: 4 additions & 4 deletions hscontrol/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (h *Headscale) scheduledTasks(ctx context.Context) {
return

case <-expireTicker.C:
var expiredNodeChanges []change.ChangeSet
var expiredNodeChanges []change.Change
var changed bool

lastExpiryCheck, expiredNodeChanges, changed = h.state.ExpireExpiredNodes(lastExpiryCheck)
Expand Down Expand Up @@ -305,15 +305,15 @@ func (h *Headscale) scheduledTasks(ctx context.Context) {
}
h.state.SetDERPMap(derpMap)

h.Change(change.DERPSet)
h.Change(change.DERPMap())

case records, ok := <-extraRecordsUpdate:
if !ok {
continue
}
h.cfg.TailcfgDNSConfig.ExtraRecords = records

h.Change(change.ExtraRecordsSet)
h.Change(change.ExtraRecords())
}
}
}
Expand Down Expand Up @@ -988,7 +988,7 @@ func readOrCreatePrivateKey(path string) (*key.MachinePrivate, error) {
// Change is used to send changes to nodes.
// All change should be enqueued here and empty will be automatically
// ignored.
func (h *Headscale) Change(cs ...change.ChangeSet) {
func (h *Headscale) Change(cs ...change.Change) {
h.mapBatcher.AddWork(cs...)
}

Expand Down
8 changes: 2 additions & 6 deletions hscontrol/assets/style.css
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@
--md-primary-fg-color: #4051b5;
--md-accent-fg-color: #526cfe;
--md-typeset-a-color: var(--md-primary-fg-color);
--md-text-font:
"Roboto", -apple-system, BlinkMacSystemFont, "Segoe UI", "Helvetica Neue",
Arial, sans-serif;
--md-code-font:
"Roboto Mono", "SF Mono", Monaco, "Cascadia Code", Consolas, "Courier New",
monospace;
--md-text-font: "Roboto", -apple-system, BlinkMacSystemFont, "Segoe UI", "Helvetica Neue", Arial, sans-serif;
--md-code-font: "Roboto Mono", "SF Mono", Monaco, "Cascadia Code", Consolas, "Courier New", monospace;
}

/* Base Typography */
Expand Down
14 changes: 5 additions & 9 deletions hscontrol/grpcv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,9 @@ func (api headscaleV1APIServer) CreateUser(
return nil, status.Errorf(codes.Internal, "failed to create user: %s", err)
}

c := change.UserAdded(types.UserID(user.ID))

// TODO(kradalby): Both of these might be policy changes, find a better way to merge.
if !policyChanged.Empty() {
c.Change = change.Policy
}

api.h.Change(c)
// CreateUser returns a policy change response if the user creation affected policy.
// This triggers a full policy re-evaluation for all connected nodes.
api.h.Change(policyChanged)

return &v1.CreateUserResponse{User: user.Proto()}, nil
}
Expand Down Expand Up @@ -109,7 +104,8 @@ func (api headscaleV1APIServer) DeleteUser(
return nil, err
}

api.h.Change(change.UserRemoved(types.UserID(user.ID)))
// User deletion may affect policy, trigger a full policy re-evaluation.
api.h.Change(change.UserRemoved())

return &v1.DeleteUserResponse{}, nil
}
Expand Down
173 changes: 47 additions & 126 deletions hscontrol/mapper/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,13 @@ import (
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog/log"
"tailscale.com/tailcfg"
"tailscale.com/types/ptr"
)

var (
mapResponseGenerated = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "headscale",
Name: "mapresponse_generated_total",
Help: "total count of mapresponses generated by response type and change type",
}, []string{"response_type", "change_type"})

errNodeNotFoundInNodeStore = errors.New("node not found in NodeStore")
)
var mapResponseGenerated = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "headscale",
Name: "mapresponse_generated_total",
Help: "total count of mapresponses generated by response type",
}, []string{"response_type"})

type batcherFunc func(cfg *types.Config, state *state.State) Batcher

Expand All @@ -36,8 +31,8 @@ type Batcher interface {
RemoveNode(id types.NodeID, c chan<- *tailcfg.MapResponse) bool
IsConnected(id types.NodeID) bool
ConnectedMap() *xsync.Map[types.NodeID, bool]
AddWork(c ...change.ChangeSet)
MapResponseFromChange(id types.NodeID, c change.ChangeSet) (*tailcfg.MapResponse, error)
AddWork(r ...change.Change)
MapResponseFromChange(id types.NodeID, r change.Change) (*tailcfg.MapResponse, error)
DebugMapResponses() (map[types.NodeID][]tailcfg.MapResponse, error)
}

Expand All @@ -51,7 +46,7 @@ func NewBatcher(batchTime time.Duration, workers int, mapper *mapper) *LockFreeB
workCh: make(chan work, workers*200),
nodes: xsync.NewMap[types.NodeID, *multiChannelNodeConn](),
connected: xsync.NewMap[types.NodeID, *time.Time](),
pendingChanges: xsync.NewMap[types.NodeID, []change.ChangeSet](),
pendingChanges: xsync.NewMap[types.NodeID, []change.Change](),
}
}

Expand All @@ -69,15 +64,21 @@ type nodeConnection interface {
nodeID() types.NodeID
version() tailcfg.CapabilityVersion
send(data *tailcfg.MapResponse) error
// computePeerDiff returns peers that were previously sent but are no longer in the current list.
computePeerDiff(currentPeers []tailcfg.NodeID) (removed []tailcfg.NodeID)
// updateSentPeers updates the tracking of which peers have been sent to this node.
updateSentPeers(resp *tailcfg.MapResponse)
}

// generateMapResponse generates a [tailcfg.MapResponse] for the given NodeID that is based on the provided [change.ChangeSet].
func generateMapResponse(nodeID types.NodeID, version tailcfg.CapabilityVersion, mapper *mapper, c change.ChangeSet) (*tailcfg.MapResponse, error) {
if c.Empty() {
return nil, nil
// generateMapResponse generates a [tailcfg.MapResponse] for the given NodeID based on the provided [change.Change].
func generateMapResponse(nc nodeConnection, mapper *mapper, r change.Change) (*tailcfg.MapResponse, error) {
nodeID := nc.nodeID()
version := nc.version()

if r.IsEmpty() {
return nil, nil //nolint:nilnil // Empty response means nothing to send
}

// Validate inputs before processing
if nodeID == 0 {
return nil, fmt.Errorf("invalid nodeID: %d", nodeID)
}
Expand All @@ -86,141 +87,58 @@ func generateMapResponse(nodeID types.NodeID, version tailcfg.CapabilityVersion,
return nil, fmt.Errorf("mapper is nil for nodeID %d", nodeID)
}

// Handle self-only responses
if r.IsSelfOnly() && r.TargetNode != nodeID {
return nil, nil //nolint:nilnil // No response needed for other nodes when self-only
}

var (
mapResp *tailcfg.MapResponse
err error
responseType string
mapResp *tailcfg.MapResponse
err error
)

// Record metric when function exits
defer func() {
if err == nil && mapResp != nil && responseType != "" {
mapResponseGenerated.WithLabelValues(responseType, c.Change.String()).Inc()
}
}()

switch c.Change {
case change.DERP:
responseType = "derp"
mapResp, err = mapper.derpMapResponse(nodeID)

case change.NodeCameOnline, change.NodeWentOffline:
if c.IsSubnetRouter {
// TODO(kradalby): This can potentially be a peer update of the old and new subnet router.
responseType = "full"
mapResp, err = mapper.fullMapResponse(nodeID, version)
} else {
// Trust the change type for online/offline status to avoid race conditions
// between NodeStore updates and change processing
responseType = string(patchResponseDebug)
onlineStatus := c.Change == change.NodeCameOnline

mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{
{
NodeID: c.NodeID.NodeID(),
Online: ptr.To(onlineStatus),
},
})
}

case change.NodeNewOrUpdate:
// If the node is the one being updated, we send a self update that preserves peer information
// to ensure the node sees changes to its own properties (e.g., hostname/DNS name changes)
// without losing its view of peer status during rapid reconnection cycles
if c.IsSelfUpdate(nodeID) {
responseType = "self"
mapResp, err = mapper.selfMapResponse(nodeID, version)
} else {
responseType = "change"
mapResp, err = mapper.peerChangeResponse(nodeID, version, c.NodeID)
}

case change.NodeRemove:
responseType = "remove"
mapResp, err = mapper.peerRemovedResponse(nodeID, c.NodeID)

case change.NodeKeyExpiry:
// If the node is the one whose key is expiring, we send a "full" self update
// as nodes will ignore patch updates about themselves (?).
if c.IsSelfUpdate(nodeID) {
responseType = "self"
mapResp, err = mapper.selfMapResponse(nodeID, version)
// mapResp, err = mapper.fullMapResponse(nodeID, version)
} else {
responseType = "patch"
mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{
{
NodeID: c.NodeID.NodeID(),
KeyExpiry: c.NodeExpiry,
},
})
}

case change.NodeEndpoint, change.NodeDERP:
// Endpoint or DERP changes can be sent as lightweight patches.
// Query the NodeStore for the current peer state to construct the PeerChange.
// Even if only endpoint or only DERP changed, we include both in the patch
// since they're often updated together and it's minimal overhead.
responseType = "patch"

peer, found := mapper.state.GetNodeByID(c.NodeID)
if !found {
return nil, fmt.Errorf("%w: %d", errNodeNotFoundInNodeStore, c.NodeID)
}
// Track metric using categorized type, not free-form reason
mapResponseGenerated.WithLabelValues(r.Type()).Inc()

peerChange := &tailcfg.PeerChange{
NodeID: c.NodeID.NodeID(),
Endpoints: peer.Endpoints().AsSlice(),
DERPRegion: 0, // Will be set below if available
}
// Check if this requires runtime peer visibility computation (e.g., policy changes)
if r.RequiresRuntimePeerComputation {
currentPeers := mapper.state.ListPeers(nodeID)

// Extract DERP region from Hostinfo if available
if hi := peer.AsStruct().Hostinfo; hi != nil && hi.NetInfo != nil {
peerChange.DERPRegion = hi.NetInfo.PreferredDERP
currentPeerIDs := make([]tailcfg.NodeID, 0, currentPeers.Len())
for _, peer := range currentPeers.All() {
currentPeerIDs = append(currentPeerIDs, peer.ID().NodeID())
}

mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{peerChange})

default:
// The following will always hit this:
// change.Full, change.Policy
responseType = "full"
mapResp, err = mapper.fullMapResponse(nodeID, version)
removedPeers := nc.computePeerDiff(currentPeerIDs)
mapResp, err = mapper.policyChangeResponse(nodeID, version, removedPeers, currentPeers)
} else {
mapResp, err = mapper.buildFromChange(nodeID, version, &r)
}

if err != nil {
return nil, fmt.Errorf("generating map response for nodeID %d: %w", nodeID, err)
}

// TODO(kradalby): Is this necessary?
// Validate the generated map response - only check for nil response
// Note: mapResp.Node can be nil for peer updates, which is valid
if mapResp == nil && c.Change != change.DERP && c.Change != change.NodeRemove {
return nil, fmt.Errorf("generated nil map response for nodeID %d change %s", nodeID, c.Change.String())
}

return mapResp, nil
}

// handleNodeChange generates and sends a [tailcfg.MapResponse] for a given node and [change.ChangeSet].
func handleNodeChange(nc nodeConnection, mapper *mapper, c change.ChangeSet) error {
// handleNodeChange generates and sends a [tailcfg.MapResponse] for a given node and [change.Change].
func handleNodeChange(nc nodeConnection, mapper *mapper, r change.Change) error {
if nc == nil {
return errors.New("nodeConnection is nil")
}

nodeID := nc.nodeID()

log.Debug().Caller().Uint64("node.id", nodeID.Uint64()).Str("change.type", c.Change.String()).Msg("Node change processing started because change notification received")
log.Debug().Caller().Uint64("node.id", nodeID.Uint64()).Str("reason", r.Reason).Msg("Node change processing started because change notification received")

var data *tailcfg.MapResponse
var err error
data, err = generateMapResponse(nodeID, nc.version(), mapper, c)
data, err := generateMapResponse(nc, mapper, r)
if err != nil {
return fmt.Errorf("generating map response for node %d: %w", nodeID, err)
}

if data == nil {
// No data to send is valid for some change types
// No data to send is valid for some response types
return nil
}

Expand All @@ -230,6 +148,9 @@ func handleNodeChange(nc nodeConnection, mapper *mapper, c change.ChangeSet) err
return fmt.Errorf("sending map response to node %d: %w", nodeID, err)
}

// Update peer tracking after successful send
nc.updateSentPeers(data)

return nil
}

Expand All @@ -241,7 +162,7 @@ type workResult struct {

// work represents a unit of work to be processed by workers.
type work struct {
c change.ChangeSet
r change.Change
nodeID types.NodeID
resultCh chan<- workResult // optional channel for synchronous operations
}
Loading
Loading