Skip to content

Commit a7f6c5c

Browse files
committed
change: smarter change notifications
This commit replaces the ChangeSet with a simpler bool based change model that can be directly used in the map builder to build the appropriate map response based on the change that has occured. Previously, we fell back to sending full maps for a lot of changes as that was consider "the safe" thing to do to ensure no updates were missed. This was slightly problematic as a node that already has a list of peers will only do full replacement of the peers if the list is non-empty, meaning that it was not possible to remove all nodes (if for example policy changed). Now we will keep track of last seen nodes, so we can send remove ids, but also we are much smarter on how we send smaller, partial maps when needed. Fixes #2389 Signed-off-by: Kristoffer Dalby <[email protected]>
1 parent ee5d82a commit a7f6c5c

File tree

13 files changed

+1327
-657
lines changed

13 files changed

+1327
-657
lines changed

hscontrol/app.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func (h *Headscale) scheduledTasks(ctx context.Context) {
271271
return
272272

273273
case <-expireTicker.C:
274-
var expiredNodeChanges []change.ChangeSet
274+
var expiredNodeChanges []change.Change
275275
var changed bool
276276

277277
lastExpiryCheck, expiredNodeChanges, changed = h.state.ExpireExpiredNodes(lastExpiryCheck)
@@ -305,15 +305,15 @@ func (h *Headscale) scheduledTasks(ctx context.Context) {
305305
}
306306
h.state.SetDERPMap(derpMap)
307307

308-
h.Change(change.DERPSet)
308+
h.Change(change.DERPMap())
309309

310310
case records, ok := <-extraRecordsUpdate:
311311
if !ok {
312312
continue
313313
}
314314
h.cfg.TailcfgDNSConfig.ExtraRecords = records
315315

316-
h.Change(change.ExtraRecordsSet)
316+
h.Change(change.ExtraRecords())
317317
}
318318
}
319319
}
@@ -988,7 +988,7 @@ func readOrCreatePrivateKey(path string) (*key.MachinePrivate, error) {
988988
// Change is used to send changes to nodes.
989989
// All change should be enqueued here and empty will be automatically
990990
// ignored.
991-
func (h *Headscale) Change(cs ...change.ChangeSet) {
991+
func (h *Headscale) Change(cs ...change.Change) {
992992
h.mapBatcher.AddWork(cs...)
993993
}
994994

hscontrol/grpcv1.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,9 @@ func (api headscaleV1APIServer) CreateUser(
5858
return nil, status.Errorf(codes.Internal, "failed to create user: %s", err)
5959
}
6060

61-
c := change.UserAdded(types.UserID(user.ID))
62-
63-
// TODO(kradalby): Both of these might be policy changes, find a better way to merge.
64-
if !policyChanged.Empty() {
65-
c.Change = change.Policy
66-
}
67-
68-
api.h.Change(c)
61+
// CreateUser returns a policy change response if the user creation affected policy.
62+
// This triggers a full policy re-evaluation for all connected nodes.
63+
api.h.Change(policyChanged)
6964

7065
return &v1.CreateUserResponse{User: user.Proto()}, nil
7166
}
@@ -109,7 +104,8 @@ func (api headscaleV1APIServer) DeleteUser(
109104
return nil, err
110105
}
111106

112-
api.h.Change(change.UserRemoved(types.UserID(user.ID)))
107+
// User deletion may affect policy, trigger a full policy re-evaluation.
108+
api.h.Change(change.UserRemoved())
113109

114110
return &v1.DeleteUserResponse{}, nil
115111
}

hscontrol/mapper/batcher.go

Lines changed: 47 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,13 @@ import (
1313
"github.com/puzpuzpuz/xsync/v4"
1414
"github.com/rs/zerolog/log"
1515
"tailscale.com/tailcfg"
16-
"tailscale.com/types/ptr"
1716
)
1817

19-
var (
20-
mapResponseGenerated = promauto.NewCounterVec(prometheus.CounterOpts{
21-
Namespace: "headscale",
22-
Name: "mapresponse_generated_total",
23-
Help: "total count of mapresponses generated by response type and change type",
24-
}, []string{"response_type", "change_type"})
25-
26-
errNodeNotFoundInNodeStore = errors.New("node not found in NodeStore")
27-
)
18+
var mapResponseGenerated = promauto.NewCounterVec(prometheus.CounterOpts{
19+
Namespace: "headscale",
20+
Name: "mapresponse_generated_total",
21+
Help: "total count of mapresponses generated by response type",
22+
}, []string{"response_type"})
2823

2924
type batcherFunc func(cfg *types.Config, state *state.State) Batcher
3025

@@ -36,8 +31,8 @@ type Batcher interface {
3631
RemoveNode(id types.NodeID, c chan<- *tailcfg.MapResponse) bool
3732
IsConnected(id types.NodeID) bool
3833
ConnectedMap() *xsync.Map[types.NodeID, bool]
39-
AddWork(c ...change.ChangeSet)
40-
MapResponseFromChange(id types.NodeID, c change.ChangeSet) (*tailcfg.MapResponse, error)
34+
AddWork(r ...change.Change)
35+
MapResponseFromChange(id types.NodeID, r change.Change) (*tailcfg.MapResponse, error)
4136
DebugMapResponses() (map[types.NodeID][]tailcfg.MapResponse, error)
4237
}
4338

@@ -51,7 +46,7 @@ func NewBatcher(batchTime time.Duration, workers int, mapper *mapper) *LockFreeB
5146
workCh: make(chan work, workers*200),
5247
nodes: xsync.NewMap[types.NodeID, *multiChannelNodeConn](),
5348
connected: xsync.NewMap[types.NodeID, *time.Time](),
54-
pendingChanges: xsync.NewMap[types.NodeID, []change.ChangeSet](),
49+
pendingChanges: xsync.NewMap[types.NodeID, []change.Change](),
5550
}
5651
}
5752

@@ -69,15 +64,21 @@ type nodeConnection interface {
6964
nodeID() types.NodeID
7065
version() tailcfg.CapabilityVersion
7166
send(data *tailcfg.MapResponse) error
67+
// computePeerDiff returns peers that were previously sent but are no longer in the current list.
68+
computePeerDiff(currentPeers []tailcfg.NodeID) (removed []tailcfg.NodeID)
69+
// updateSentPeers updates the tracking of which peers have been sent to this node.
70+
updateSentPeers(resp *tailcfg.MapResponse)
7271
}
7372

74-
// generateMapResponse generates a [tailcfg.MapResponse] for the given NodeID that is based on the provided [change.ChangeSet].
75-
func generateMapResponse(nodeID types.NodeID, version tailcfg.CapabilityVersion, mapper *mapper, c change.ChangeSet) (*tailcfg.MapResponse, error) {
76-
if c.Empty() {
77-
return nil, nil
73+
// generateMapResponse generates a [tailcfg.MapResponse] for the given NodeID based on the provided [change.Change].
74+
func generateMapResponse(nc nodeConnection, mapper *mapper, r change.Change) (*tailcfg.MapResponse, error) {
75+
nodeID := nc.nodeID()
76+
version := nc.version()
77+
78+
if r.IsEmpty() {
79+
return nil, nil //nolint:nilnil // Empty response means nothing to send
7880
}
7981

80-
// Validate inputs before processing
8182
if nodeID == 0 {
8283
return nil, fmt.Errorf("invalid nodeID: %d", nodeID)
8384
}
@@ -86,141 +87,58 @@ func generateMapResponse(nodeID types.NodeID, version tailcfg.CapabilityVersion,
8687
return nil, fmt.Errorf("mapper is nil for nodeID %d", nodeID)
8788
}
8889

90+
// Handle self-only responses
91+
if r.IsSelfOnly() && r.TargetNode != nodeID {
92+
return nil, nil //nolint:nilnil // No response needed for other nodes when self-only
93+
}
94+
8995
var (
90-
mapResp *tailcfg.MapResponse
91-
err error
92-
responseType string
96+
mapResp *tailcfg.MapResponse
97+
err error
9398
)
9499

95-
// Record metric when function exits
96-
defer func() {
97-
if err == nil && mapResp != nil && responseType != "" {
98-
mapResponseGenerated.WithLabelValues(responseType, c.Change.String()).Inc()
99-
}
100-
}()
101-
102-
switch c.Change {
103-
case change.DERP:
104-
responseType = "derp"
105-
mapResp, err = mapper.derpMapResponse(nodeID)
106-
107-
case change.NodeCameOnline, change.NodeWentOffline:
108-
if c.IsSubnetRouter {
109-
// TODO(kradalby): This can potentially be a peer update of the old and new subnet router.
110-
responseType = "full"
111-
mapResp, err = mapper.fullMapResponse(nodeID, version)
112-
} else {
113-
// Trust the change type for online/offline status to avoid race conditions
114-
// between NodeStore updates and change processing
115-
responseType = string(patchResponseDebug)
116-
onlineStatus := c.Change == change.NodeCameOnline
117-
118-
mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{
119-
{
120-
NodeID: c.NodeID.NodeID(),
121-
Online: ptr.To(onlineStatus),
122-
},
123-
})
124-
}
125-
126-
case change.NodeNewOrUpdate:
127-
// If the node is the one being updated, we send a self update that preserves peer information
128-
// to ensure the node sees changes to its own properties (e.g., hostname/DNS name changes)
129-
// without losing its view of peer status during rapid reconnection cycles
130-
if c.IsSelfUpdate(nodeID) {
131-
responseType = "self"
132-
mapResp, err = mapper.selfMapResponse(nodeID, version)
133-
} else {
134-
responseType = "change"
135-
mapResp, err = mapper.peerChangeResponse(nodeID, version, c.NodeID)
136-
}
137-
138-
case change.NodeRemove:
139-
responseType = "remove"
140-
mapResp, err = mapper.peerRemovedResponse(nodeID, c.NodeID)
141-
142-
case change.NodeKeyExpiry:
143-
// If the node is the one whose key is expiring, we send a "full" self update
144-
// as nodes will ignore patch updates about themselves (?).
145-
if c.IsSelfUpdate(nodeID) {
146-
responseType = "self"
147-
mapResp, err = mapper.selfMapResponse(nodeID, version)
148-
// mapResp, err = mapper.fullMapResponse(nodeID, version)
149-
} else {
150-
responseType = "patch"
151-
mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{
152-
{
153-
NodeID: c.NodeID.NodeID(),
154-
KeyExpiry: c.NodeExpiry,
155-
},
156-
})
157-
}
158-
159-
case change.NodeEndpoint, change.NodeDERP:
160-
// Endpoint or DERP changes can be sent as lightweight patches.
161-
// Query the NodeStore for the current peer state to construct the PeerChange.
162-
// Even if only endpoint or only DERP changed, we include both in the patch
163-
// since they're often updated together and it's minimal overhead.
164-
responseType = "patch"
165-
166-
peer, found := mapper.state.GetNodeByID(c.NodeID)
167-
if !found {
168-
return nil, fmt.Errorf("%w: %d", errNodeNotFoundInNodeStore, c.NodeID)
169-
}
100+
// Track metric using categorized type, not free-form reason
101+
mapResponseGenerated.WithLabelValues(r.Type()).Inc()
170102

171-
peerChange := &tailcfg.PeerChange{
172-
NodeID: c.NodeID.NodeID(),
173-
Endpoints: peer.Endpoints().AsSlice(),
174-
DERPRegion: 0, // Will be set below if available
175-
}
103+
// Check if this requires runtime peer visibility computation (e.g., policy changes)
104+
if r.RequiresRuntimePeerComputation {
105+
currentPeers := mapper.state.ListPeers(nodeID)
176106

177-
// Extract DERP region from Hostinfo if available
178-
if hi := peer.AsStruct().Hostinfo; hi != nil && hi.NetInfo != nil {
179-
peerChange.DERPRegion = hi.NetInfo.PreferredDERP
107+
currentPeerIDs := make([]tailcfg.NodeID, 0, currentPeers.Len())
108+
for _, peer := range currentPeers.All() {
109+
currentPeerIDs = append(currentPeerIDs, peer.ID().NodeID())
180110
}
181111

182-
mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{peerChange})
183-
184-
default:
185-
// The following will always hit this:
186-
// change.Full, change.Policy
187-
responseType = "full"
188-
mapResp, err = mapper.fullMapResponse(nodeID, version)
112+
removedPeers := nc.computePeerDiff(currentPeerIDs)
113+
mapResp, err = mapper.policyChangeResponse(nodeID, version, removedPeers, currentPeers)
114+
} else {
115+
mapResp, err = mapper.buildFromChange(nodeID, version, &r)
189116
}
190117

191118
if err != nil {
192119
return nil, fmt.Errorf("generating map response for nodeID %d: %w", nodeID, err)
193120
}
194121

195-
// TODO(kradalby): Is this necessary?
196-
// Validate the generated map response - only check for nil response
197-
// Note: mapResp.Node can be nil for peer updates, which is valid
198-
if mapResp == nil && c.Change != change.DERP && c.Change != change.NodeRemove {
199-
return nil, fmt.Errorf("generated nil map response for nodeID %d change %s", nodeID, c.Change.String())
200-
}
201-
202122
return mapResp, nil
203123
}
204124

205-
// handleNodeChange generates and sends a [tailcfg.MapResponse] for a given node and [change.ChangeSet].
206-
func handleNodeChange(nc nodeConnection, mapper *mapper, c change.ChangeSet) error {
125+
// handleNodeChange generates and sends a [tailcfg.MapResponse] for a given node and [change.Change].
126+
func handleNodeChange(nc nodeConnection, mapper *mapper, r change.Change) error {
207127
if nc == nil {
208128
return errors.New("nodeConnection is nil")
209129
}
210130

211131
nodeID := nc.nodeID()
212132

213-
log.Debug().Caller().Uint64("node.id", nodeID.Uint64()).Str("change.type", c.Change.String()).Msg("Node change processing started because change notification received")
133+
log.Debug().Caller().Uint64("node.id", nodeID.Uint64()).Str("reason", r.Reason).Msg("Node change processing started because change notification received")
214134

215-
var data *tailcfg.MapResponse
216-
var err error
217-
data, err = generateMapResponse(nodeID, nc.version(), mapper, c)
135+
data, err := generateMapResponse(nc, mapper, r)
218136
if err != nil {
219137
return fmt.Errorf("generating map response for node %d: %w", nodeID, err)
220138
}
221139

222140
if data == nil {
223-
// No data to send is valid for some change types
141+
// No data to send is valid for some response types
224142
return nil
225143
}
226144

@@ -230,6 +148,9 @@ func handleNodeChange(nc nodeConnection, mapper *mapper, c change.ChangeSet) err
230148
return fmt.Errorf("sending map response to node %d: %w", nodeID, err)
231149
}
232150

151+
// Update peer tracking after successful send
152+
nc.updateSentPeers(data)
153+
233154
return nil
234155
}
235156

@@ -241,7 +162,7 @@ type workResult struct {
241162

242163
// work represents a unit of work to be processed by workers.
243164
type work struct {
244-
c change.ChangeSet
165+
r change.Change
245166
nodeID types.NodeID
246167
resultCh chan<- workResult // optional channel for synchronous operations
247168
}

0 commit comments

Comments
 (0)