@@ -13,18 +13,13 @@ import (
1313 "github.com/puzpuzpuz/xsync/v4"
1414 "github.com/rs/zerolog/log"
1515 "tailscale.com/tailcfg"
16- "tailscale.com/types/ptr"
1716)
1817
19- var (
20- mapResponseGenerated = promauto .NewCounterVec (prometheus.CounterOpts {
21- Namespace : "headscale" ,
22- Name : "mapresponse_generated_total" ,
23- Help : "total count of mapresponses generated by response type and change type" ,
24- }, []string {"response_type" , "change_type" })
25-
26- errNodeNotFoundInNodeStore = errors .New ("node not found in NodeStore" )
27- )
18+ var mapResponseGenerated = promauto .NewCounterVec (prometheus.CounterOpts {
19+ Namespace : "headscale" ,
20+ Name : "mapresponse_generated_total" ,
21+ Help : "total count of mapresponses generated by response type" ,
22+ }, []string {"response_type" })
2823
2924type batcherFunc func (cfg * types.Config , state * state.State ) Batcher
3025
@@ -36,8 +31,8 @@ type Batcher interface {
3631 RemoveNode (id types.NodeID , c chan <- * tailcfg.MapResponse ) bool
3732 IsConnected (id types.NodeID ) bool
3833 ConnectedMap () * xsync.Map [types.NodeID , bool ]
39- AddWork (c ... change.ChangeSet )
40- MapResponseFromChange (id types.NodeID , c change.ChangeSet ) (* tailcfg.MapResponse , error )
34+ AddWork (r ... change.Change )
35+ MapResponseFromChange (id types.NodeID , r change.Change ) (* tailcfg.MapResponse , error )
4136 DebugMapResponses () (map [types.NodeID ][]tailcfg.MapResponse , error )
4237}
4338
@@ -51,7 +46,7 @@ func NewBatcher(batchTime time.Duration, workers int, mapper *mapper) *LockFreeB
5146 workCh : make (chan work , workers * 200 ),
5247 nodes : xsync .NewMap [types.NodeID , * multiChannelNodeConn ](),
5348 connected : xsync .NewMap [types.NodeID , * time.Time ](),
54- pendingChanges : xsync .NewMap [types.NodeID , []change.ChangeSet ](),
49+ pendingChanges : xsync .NewMap [types.NodeID , []change.Change ](),
5550 }
5651}
5752
@@ -69,15 +64,21 @@ type nodeConnection interface {
6964 nodeID () types.NodeID
7065 version () tailcfg.CapabilityVersion
7166 send (data * tailcfg.MapResponse ) error
67+ // computePeerDiff returns peers that were previously sent but are no longer in the current list.
68+ computePeerDiff (currentPeers []tailcfg.NodeID ) (removed []tailcfg.NodeID )
69+ // updateSentPeers updates the tracking of which peers have been sent to this node.
70+ updateSentPeers (resp * tailcfg.MapResponse )
7271}
7372
74- // generateMapResponse generates a [tailcfg.MapResponse] for the given NodeID that is based on the provided [change.ChangeSet].
75- func generateMapResponse (nodeID types.NodeID , version tailcfg.CapabilityVersion , mapper * mapper , c change.ChangeSet ) (* tailcfg.MapResponse , error ) {
76- if c .Empty () {
77- return nil , nil
73+ // generateMapResponse generates a [tailcfg.MapResponse] for the given NodeID based on the provided [change.Change].
74+ func generateMapResponse (nc nodeConnection , mapper * mapper , r change.Change ) (* tailcfg.MapResponse , error ) {
75+ nodeID := nc .nodeID ()
76+ version := nc .version ()
77+
78+ if r .IsEmpty () {
79+ return nil , nil //nolint:nilnil // Empty response means nothing to send
7880 }
7981
80- // Validate inputs before processing
8182 if nodeID == 0 {
8283 return nil , fmt .Errorf ("invalid nodeID: %d" , nodeID )
8384 }
@@ -86,141 +87,58 @@ func generateMapResponse(nodeID types.NodeID, version tailcfg.CapabilityVersion,
8687 return nil , fmt .Errorf ("mapper is nil for nodeID %d" , nodeID )
8788 }
8889
90+ // Handle self-only responses
91+ if r .IsSelfOnly () && r .TargetNode != nodeID {
92+ return nil , nil //nolint:nilnil // No response needed for other nodes when self-only
93+ }
94+
8995 var (
90- mapResp * tailcfg.MapResponse
91- err error
92- responseType string
96+ mapResp * tailcfg.MapResponse
97+ err error
9398 )
9499
95- // Record metric when function exits
96- defer func () {
97- if err == nil && mapResp != nil && responseType != "" {
98- mapResponseGenerated .WithLabelValues (responseType , c .Change .String ()).Inc ()
99- }
100- }()
101-
102- switch c .Change {
103- case change .DERP :
104- responseType = "derp"
105- mapResp , err = mapper .derpMapResponse (nodeID )
106-
107- case change .NodeCameOnline , change .NodeWentOffline :
108- if c .IsSubnetRouter {
109- // TODO(kradalby): This can potentially be a peer update of the old and new subnet router.
110- responseType = "full"
111- mapResp , err = mapper .fullMapResponse (nodeID , version )
112- } else {
113- // Trust the change type for online/offline status to avoid race conditions
114- // between NodeStore updates and change processing
115- responseType = string (patchResponseDebug )
116- onlineStatus := c .Change == change .NodeCameOnline
117-
118- mapResp , err = mapper .peerChangedPatchResponse (nodeID , []* tailcfg.PeerChange {
119- {
120- NodeID : c .NodeID .NodeID (),
121- Online : ptr .To (onlineStatus ),
122- },
123- })
124- }
125-
126- case change .NodeNewOrUpdate :
127- // If the node is the one being updated, we send a self update that preserves peer information
128- // to ensure the node sees changes to its own properties (e.g., hostname/DNS name changes)
129- // without losing its view of peer status during rapid reconnection cycles
130- if c .IsSelfUpdate (nodeID ) {
131- responseType = "self"
132- mapResp , err = mapper .selfMapResponse (nodeID , version )
133- } else {
134- responseType = "change"
135- mapResp , err = mapper .peerChangeResponse (nodeID , version , c .NodeID )
136- }
137-
138- case change .NodeRemove :
139- responseType = "remove"
140- mapResp , err = mapper .peerRemovedResponse (nodeID , c .NodeID )
141-
142- case change .NodeKeyExpiry :
143- // If the node is the one whose key is expiring, we send a "full" self update
144- // as nodes will ignore patch updates about themselves (?).
145- if c .IsSelfUpdate (nodeID ) {
146- responseType = "self"
147- mapResp , err = mapper .selfMapResponse (nodeID , version )
148- // mapResp, err = mapper.fullMapResponse(nodeID, version)
149- } else {
150- responseType = "patch"
151- mapResp , err = mapper .peerChangedPatchResponse (nodeID , []* tailcfg.PeerChange {
152- {
153- NodeID : c .NodeID .NodeID (),
154- KeyExpiry : c .NodeExpiry ,
155- },
156- })
157- }
158-
159- case change .NodeEndpoint , change .NodeDERP :
160- // Endpoint or DERP changes can be sent as lightweight patches.
161- // Query the NodeStore for the current peer state to construct the PeerChange.
162- // Even if only endpoint or only DERP changed, we include both in the patch
163- // since they're often updated together and it's minimal overhead.
164- responseType = "patch"
165-
166- peer , found := mapper .state .GetNodeByID (c .NodeID )
167- if ! found {
168- return nil , fmt .Errorf ("%w: %d" , errNodeNotFoundInNodeStore , c .NodeID )
169- }
100+ // Track metric using categorized type, not free-form reason
101+ mapResponseGenerated .WithLabelValues (r .Type ()).Inc ()
170102
171- peerChange := & tailcfg.PeerChange {
172- NodeID : c .NodeID .NodeID (),
173- Endpoints : peer .Endpoints ().AsSlice (),
174- DERPRegion : 0 , // Will be set below if available
175- }
103+ // Check if this requires runtime peer visibility computation (e.g., policy changes)
104+ if r .RequiresRuntimePeerComputation {
105+ currentPeers := mapper .state .ListPeers (nodeID )
176106
177- // Extract DERP region from Hostinfo if available
178- if hi := peer . AsStruct (). Hostinfo ; hi != nil && hi . NetInfo != nil {
179- peerChange . DERPRegion = hi . NetInfo . PreferredDERP
107+ currentPeerIDs := make ([]tailcfg. NodeID , 0 , currentPeers . Len ())
108+ for _ , peer := range currentPeers . All () {
109+ currentPeerIDs = append ( currentPeerIDs , peer . ID (). NodeID ())
180110 }
181111
182- mapResp , err = mapper .peerChangedPatchResponse (nodeID , []* tailcfg.PeerChange {peerChange })
183-
184- default :
185- // The following will always hit this:
186- // change.Full, change.Policy
187- responseType = "full"
188- mapResp , err = mapper .fullMapResponse (nodeID , version )
112+ removedPeers := nc .computePeerDiff (currentPeerIDs )
113+ mapResp , err = mapper .policyChangeResponse (nodeID , version , removedPeers , currentPeers )
114+ } else {
115+ mapResp , err = mapper .buildFromChange (nodeID , version , & r )
189116 }
190117
191118 if err != nil {
192119 return nil , fmt .Errorf ("generating map response for nodeID %d: %w" , nodeID , err )
193120 }
194121
195- // TODO(kradalby): Is this necessary?
196- // Validate the generated map response - only check for nil response
197- // Note: mapResp.Node can be nil for peer updates, which is valid
198- if mapResp == nil && c .Change != change .DERP && c .Change != change .NodeRemove {
199- return nil , fmt .Errorf ("generated nil map response for nodeID %d change %s" , nodeID , c .Change .String ())
200- }
201-
202122 return mapResp , nil
203123}
204124
205- // handleNodeChange generates and sends a [tailcfg.MapResponse] for a given node and [change.ChangeSet ].
206- func handleNodeChange (nc nodeConnection , mapper * mapper , c change.ChangeSet ) error {
125+ // handleNodeChange generates and sends a [tailcfg.MapResponse] for a given node and [change.Change ].
126+ func handleNodeChange (nc nodeConnection , mapper * mapper , r change.Change ) error {
207127 if nc == nil {
208128 return errors .New ("nodeConnection is nil" )
209129 }
210130
211131 nodeID := nc .nodeID ()
212132
213- log .Debug ().Caller ().Uint64 ("node.id" , nodeID .Uint64 ()).Str ("change.type " , c . Change . String () ).Msg ("Node change processing started because change notification received" )
133+ log .Debug ().Caller ().Uint64 ("node.id" , nodeID .Uint64 ()).Str ("reason " , r . Reason ).Msg ("Node change processing started because change notification received" )
214134
215- var data * tailcfg.MapResponse
216- var err error
217- data , err = generateMapResponse (nodeID , nc .version (), mapper , c )
135+ data , err := generateMapResponse (nc , mapper , r )
218136 if err != nil {
219137 return fmt .Errorf ("generating map response for node %d: %w" , nodeID , err )
220138 }
221139
222140 if data == nil {
223- // No data to send is valid for some change types
141+ // No data to send is valid for some response types
224142 return nil
225143 }
226144
@@ -230,6 +148,9 @@ func handleNodeChange(nc nodeConnection, mapper *mapper, c change.ChangeSet) err
230148 return fmt .Errorf ("sending map response to node %d: %w" , nodeID , err )
231149 }
232150
151+ // Update peer tracking after successful send
152+ nc .updateSentPeers (data )
153+
233154 return nil
234155}
235156
@@ -241,7 +162,7 @@ type workResult struct {
241162
242163// work represents a unit of work to be processed by workers.
243164type work struct {
244- c change.ChangeSet
165+ r change.Change
245166 nodeID types.NodeID
246167 resultCh chan <- workResult // optional channel for synchronous operations
247168}
0 commit comments