Skip to content

Commit eae5976

Browse files
committed
response
Signed-off-by: Kristoffer Dalby <[email protected]>
1 parent e52bc37 commit eae5976

File tree

12 files changed

+1144
-675
lines changed

12 files changed

+1144
-675
lines changed

hscontrol/app.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func (h *Headscale) scheduledTasks(ctx context.Context) {
271271
return
272272

273273
case <-expireTicker.C:
274-
var expiredNodeChanges []change.ChangeSet
274+
var expiredNodeChanges []change.Response
275275
var changed bool
276276

277277
lastExpiryCheck, expiredNodeChanges, changed = h.state.ExpireExpiredNodes(lastExpiryCheck)
@@ -305,15 +305,15 @@ func (h *Headscale) scheduledTasks(ctx context.Context) {
305305
}
306306
h.state.SetDERPMap(derpMap)
307307

308-
h.Change(change.DERPSet)
308+
h.Change(change.DERPMapResponse())
309309

310310
case records, ok := <-extraRecordsUpdate:
311311
if !ok {
312312
continue
313313
}
314314
h.cfg.TailcfgDNSConfig.ExtraRecords = records
315315

316-
h.Change(change.ExtraRecordsSet)
316+
h.Change(change.ExtraRecordsResponse())
317317
}
318318
}
319319
}
@@ -988,7 +988,7 @@ func readOrCreatePrivateKey(path string) (*key.MachinePrivate, error) {
988988
// Change is used to send changes to nodes.
989989
// All change should be enqueued here and empty will be automatically
990990
// ignored.
991-
func (h *Headscale) Change(cs ...change.ChangeSet) {
991+
func (h *Headscale) Change(cs ...change.Response) {
992992
h.mapBatcher.AddWork(cs...)
993993
}
994994

hscontrol/grpcv1.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,9 @@ func (api headscaleV1APIServer) CreateUser(
5858
return nil, status.Errorf(codes.Internal, "failed to create user: %s", err)
5959
}
6060

61-
c := change.UserAdded(types.UserID(user.ID))
62-
63-
// TODO(kradalby): Both of these might be policy changes, find a better way to merge.
64-
if !policyChanged.Empty() {
65-
c.Change = change.Policy
66-
}
67-
68-
api.h.Change(c)
61+
// CreateUser returns a policy change response if the user creation affected policy.
62+
// This triggers a full policy re-evaluation for all connected nodes.
63+
api.h.Change(policyChanged)
6964

7065
return &v1.CreateUserResponse{User: user.Proto()}, nil
7166
}
@@ -109,7 +104,8 @@ func (api headscaleV1APIServer) DeleteUser(
109104
return nil, err
110105
}
111106

112-
api.h.Change(change.UserRemoved(types.UserID(user.ID)))
107+
// User deletion may affect policy, trigger a full policy re-evaluation.
108+
api.h.Change(change.UserRemovedResponse())
113109

114110
return &v1.DeleteUserResponse{}, nil
115111
}

hscontrol/mapper/batcher.go

Lines changed: 32 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,13 @@ import (
1313
"github.com/puzpuzpuz/xsync/v4"
1414
"github.com/rs/zerolog/log"
1515
"tailscale.com/tailcfg"
16-
"tailscale.com/types/ptr"
1716
)
1817

19-
var (
20-
mapResponseGenerated = promauto.NewCounterVec(prometheus.CounterOpts{
21-
Namespace: "headscale",
22-
Name: "mapresponse_generated_total",
23-
Help: "total count of mapresponses generated by response type and change type",
24-
}, []string{"response_type", "change_type"})
25-
26-
errNodeNotFoundInNodeStore = errors.New("node not found in NodeStore")
27-
)
18+
var mapResponseGenerated = promauto.NewCounterVec(prometheus.CounterOpts{
19+
Namespace: "headscale",
20+
Name: "mapresponse_generated_total",
21+
Help: "total count of mapresponses generated by response type",
22+
}, []string{"response_type"})
2823

2924
type batcherFunc func(cfg *types.Config, state *state.State) Batcher
3025

@@ -36,8 +31,8 @@ type Batcher interface {
3631
RemoveNode(id types.NodeID, c chan<- *tailcfg.MapResponse) bool
3732
IsConnected(id types.NodeID) bool
3833
ConnectedMap() *xsync.Map[types.NodeID, bool]
39-
AddWork(c ...change.ChangeSet)
40-
MapResponseFromChange(id types.NodeID, c change.ChangeSet) (*tailcfg.MapResponse, error)
34+
AddWork(r ...change.Response)
35+
MapResponseFromChange(id types.NodeID, r change.Response) (*tailcfg.MapResponse, error)
4136
DebugMapResponses() (map[types.NodeID][]tailcfg.MapResponse, error)
4237
}
4338

@@ -51,7 +46,7 @@ func NewBatcher(batchTime time.Duration, workers int, mapper *mapper) *LockFreeB
5146
workCh: make(chan work, workers*200),
5247
nodes: xsync.NewMap[types.NodeID, *multiChannelNodeConn](),
5348
connected: xsync.NewMap[types.NodeID, *time.Time](),
54-
pendingChanges: xsync.NewMap[types.NodeID, []change.ChangeSet](),
49+
pendingChanges: xsync.NewMap[types.NodeID, []change.Response](),
5550
}
5651
}
5752

@@ -75,15 +70,15 @@ type nodeConnection interface {
7570
updateSentPeers(resp *tailcfg.MapResponse)
7671
}
7772

78-
// generateMapResponse generates a [tailcfg.MapResponse] for the given NodeID that is based on the provided [change.ChangeSet].
79-
func generateMapResponse(nc nodeConnection, mapper *mapper, c change.ChangeSet) (*tailcfg.MapResponse, error) {
73+
// generateMapResponse generates a [tailcfg.MapResponse] for the given NodeID based on the provided [change.Response].
74+
func generateMapResponse(nc nodeConnection, mapper *mapper, r change.Response) (*tailcfg.MapResponse, error) {
8075
nodeID := nc.nodeID()
8176
version := nc.version()
82-
if c.Empty() {
83-
return nil, nil
77+
78+
if r.IsEmpty() {
79+
return nil, nil //nolint:nilnil // Empty response means nothing to send
8480
}
8581

86-
// Validate inputs before processing
8782
if nodeID == 0 {
8883
return nil, fmt.Errorf("invalid nodeID: %d", nodeID)
8984
}
@@ -92,165 +87,58 @@ func generateMapResponse(nc nodeConnection, mapper *mapper, c change.ChangeSet)
9287
return nil, fmt.Errorf("mapper is nil for nodeID %d", nodeID)
9388
}
9489

90+
// Handle self-only responses
91+
if r.IsSelfOnly() && r.TargetNode != nodeID {
92+
return nil, nil //nolint:nilnil // No response needed for other nodes when self-only
93+
}
94+
9595
var (
96-
mapResp *tailcfg.MapResponse
97-
err error
98-
responseType string
96+
mapResp *tailcfg.MapResponse
97+
err error
9998
)
10099

101-
// Record metric when function exits
102-
defer func() {
103-
if err == nil && mapResp != nil && responseType != "" {
104-
mapResponseGenerated.WithLabelValues(responseType, c.Change.String()).Inc()
105-
}
106-
}()
107-
108-
switch c.Change {
109-
case change.DERP:
110-
responseType = "derp"
111-
mapResp, err = mapper.derpMapResponse(nodeID)
112-
113-
case change.NodeCameOnline, change.NodeWentOffline:
114-
if c.IsSubnetRouter {
115-
// TODO(kradalby): This can potentially be a peer update of the old and new subnet router.
116-
responseType = "full"
117-
mapResp, err = mapper.fullMapResponse(nodeID, version)
118-
} else {
119-
// Trust the change type for online/offline status to avoid race conditions
120-
// between NodeStore updates and change processing
121-
responseType = string(patchResponseDebug)
122-
onlineStatus := c.Change == change.NodeCameOnline
123-
124-
mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{
125-
{
126-
NodeID: c.NodeID.NodeID(),
127-
Online: ptr.To(onlineStatus),
128-
},
129-
})
130-
}
131-
132-
case change.NodeNewOrUpdate:
133-
// If the node is the one being updated, we send a self update that preserves peer information
134-
// to ensure the node sees changes to its own properties (e.g., hostname/DNS name changes)
135-
// without losing its view of peer status during rapid reconnection cycles
136-
if c.IsSelfUpdate(nodeID) {
137-
responseType = "self"
138-
mapResp, err = mapper.selfMapResponse(nodeID, version)
139-
} else {
140-
responseType = "change"
141-
mapResp, err = mapper.peerChangeResponse(nodeID, version, c.NodeID)
142-
}
143-
144-
case change.NodeRemove:
145-
responseType = "remove"
146-
mapResp, err = mapper.peerRemovedResponse(nodeID, c.NodeID)
147-
148-
case change.NodeKeyExpiry:
149-
// If the node is the one whose key is expiring, we send a "full" self update
150-
// as nodes will ignore patch updates about themselves (?).
151-
if c.IsSelfUpdate(nodeID) {
152-
responseType = "self"
153-
mapResp, err = mapper.selfMapResponse(nodeID, version)
154-
// mapResp, err = mapper.fullMapResponse(nodeID, version)
155-
} else {
156-
responseType = "patch"
157-
mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{
158-
{
159-
NodeID: c.NodeID.NodeID(),
160-
KeyExpiry: c.NodeExpiry,
161-
},
162-
})
163-
}
164-
165-
case change.NodeEndpoint, change.NodeDERP:
166-
// Endpoint or DERP changes can be sent as lightweight patches.
167-
// Query the NodeStore for the current peer state to construct the PeerChange.
168-
// Even if only endpoint or only DERP changed, we include both in the patch
169-
// since they're often updated together and it's minimal overhead.
170-
responseType = "patch"
171-
172-
peer, found := mapper.state.GetNodeByID(c.NodeID)
173-
if !found {
174-
return nil, fmt.Errorf("%w: %d", errNodeNotFoundInNodeStore, c.NodeID)
175-
}
176-
177-
peerChange := &tailcfg.PeerChange{
178-
NodeID: c.NodeID.NodeID(),
179-
Endpoints: peer.Endpoints().AsSlice(),
180-
DERPRegion: 0, // Will be set below if available
181-
}
182-
183-
// Extract DERP region from Hostinfo if available
184-
if hi := peer.AsStruct().Hostinfo; hi != nil && hi.NetInfo != nil {
185-
peerChange.DERPRegion = hi.NetInfo.PreferredDERP
186-
}
187-
188-
mapResp, err = mapper.peerChangedPatchResponse(nodeID, []*tailcfg.PeerChange{peerChange})
189-
190-
case change.Policy:
191-
// For policy changes, we need to:
192-
// 1. Send PeersRemoved for peers no longer visible
193-
// 2. Send PeersChanged for remaining peers (their AllowedIPs may have changed)
194-
// 3. Send updated PacketFilters
195-
// This is critical because Tailscale clients interpret an empty Peers slice
196-
// as "no change" rather than "no peers".
197-
responseType = "policy"
100+
// Check if this requires runtime peer visibility computation (e.g., policy changes)
101+
if r.RequiresRuntimePeerComputation {
102+
mapResponseGenerated.WithLabelValues("policy").Inc()
198103

199-
// Get current peers from the state
200104
currentPeers := mapper.state.ListPeers(nodeID)
201105

202106
currentPeerIDs := make([]tailcfg.NodeID, 0, currentPeers.Len())
203107
for _, peer := range currentPeers.All() {
204108
currentPeerIDs = append(currentPeerIDs, peer.ID().NodeID())
205109
}
206110

207-
// Compute which peers were removed
208111
removedPeers := nc.computePeerDiff(currentPeerIDs)
209-
210112
mapResp, err = mapper.policyChangeResponse(nodeID, version, removedPeers, currentPeers)
211-
212-
default:
213-
// The following will always hit this:
214-
// change.Full
215-
responseType = "full"
216-
mapResp, err = mapper.fullMapResponse(nodeID, version)
113+
} else {
114+
mapResponseGenerated.WithLabelValues(r.Reason).Inc()
115+
mapResp, err = mapper.buildFromResponse(nodeID, version, &r)
217116
}
218117

219118
if err != nil {
220119
return nil, fmt.Errorf("generating map response for nodeID %d: %w", nodeID, err)
221120
}
222121

223-
// TODO(kradalby): Is this necessary?
224-
// Validate the generated map response - only check for nil response
225-
// Note: mapResp.Node can be nil for peer updates, which is valid
226-
if mapResp == nil && c.Change != change.DERP && c.Change != change.NodeRemove {
227-
return nil, fmt.Errorf("generated nil map response for nodeID %d change %s", nodeID, c.Change.String())
228-
}
229-
230122
return mapResp, nil
231123
}
232124

233-
// handleNodeChange generates and sends a [tailcfg.MapResponse] for a given node and [change.ChangeSet].
234-
func handleNodeChange(nc nodeConnection, mapper *mapper, c change.ChangeSet) error {
125+
// handleNodeChange generates and sends a [tailcfg.MapResponse] for a given node and [change.Response].
126+
func handleNodeChange(nc nodeConnection, mapper *mapper, r change.Response) error {
235127
if nc == nil {
236128
return errors.New("nodeConnection is nil")
237129
}
238130

239131
nodeID := nc.nodeID()
240132

241-
log.Debug().Caller().Uint64("node.id", nodeID.Uint64()).Str("change.type", c.Change.String()).Msg("Node change processing started because change notification received")
242-
243-
var data *tailcfg.MapResponse
244-
245-
var err error
133+
log.Debug().Caller().Uint64("node.id", nodeID.Uint64()).Str("reason", r.Reason).Msg("Node change processing started because change notification received")
246134

247-
data, err = generateMapResponse(nc, mapper, c)
135+
data, err := generateMapResponse(nc, mapper, r)
248136
if err != nil {
249137
return fmt.Errorf("generating map response for node %d: %w", nodeID, err)
250138
}
251139

252140
if data == nil {
253-
// No data to send is valid for some change types
141+
// No data to send is valid for some response types
254142
return nil
255143
}
256144

@@ -274,7 +162,7 @@ type workResult struct {
274162

275163
// work represents a unit of work to be processed by workers.
276164
type work struct {
277-
c change.ChangeSet
165+
r change.Response
278166
nodeID types.NodeID
279167
resultCh chan<- workResult // optional channel for synchronous operations
280168
}

0 commit comments

Comments
 (0)