Skip to content

Commit 06fb7d4

Browse files
committed
WIP: Client updates. Long polling rewritten
1 parent ca6904f commit 06fb7d4

File tree

2 files changed

+105
-40
lines changed

2 files changed

+105
-40
lines changed

app.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"log"
66
"os"
7+
"sync"
78

89
"github.com/gin-gonic/gin"
910
"tailscale.com/tailcfg"
@@ -30,6 +31,9 @@ type Headscale struct {
3031
dbString string
3132
publicKey *wgcfg.Key
3233
privateKey *wgcfg.PrivateKey
34+
35+
pollMu sync.Mutex
36+
clientsPolling map[uint64]chan []byte // this is by all means a hackity hack
3337
}
3438

3539
// NewHeadscale returns the Headscale app
@@ -54,6 +58,7 @@ func NewHeadscale(cfg Config) (*Headscale, error) {
5458
if err != nil {
5559
return nil, err
5660
}
61+
h.clientsPolling = make(map[uint64]chan []byte)
5762
return &h, nil
5863
}
5964

@@ -64,9 +69,6 @@ func (h *Headscale) Serve() error {
6469
r.GET("/register", h.RegisterWebAPI)
6570
r.POST("/machine/:id/map", h.PollNetMapHandler)
6671
r.POST("/machine/:id", h.RegistrationHandler)
67-
68-
// r.LoadHTMLFiles("./frontend/build/index.html")
69-
// r.Use(static.Serve("/", static.LocalFile("./frontend/build", true)))
7072
err := r.Run(h.cfg.Addr)
7173
return err
7274
}

handlers.go

Lines changed: 100 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (h *Headscale) RegistrationHandler(c *gin.Context) {
5757
// We do have the updated key!
5858
if m.NodeKey == wgcfg.Key(req.NodeKey).HexString() {
5959
if m.Registered {
60-
log.Println("Registered and we have the updated key! Lets move to map")
60+
log.Println("Client is registered and we have the current key. All clear to /map")
6161
resp.AuthURL = ""
6262
respBody, err := encode(resp, &mKey, h.privateKey)
6363
if err != nil {
@@ -102,85 +102,147 @@ func (h *Headscale) RegistrationHandler(c *gin.Context) {
102102
log.Println("We dont know anything about the new key. WTF")
103103
}
104104

105+
// PollNetMapHandler takes care of /machine/:id/map
106+
//
107+
// This is the busiest endpoint, as it keeps the HTTP long poll that updates
108+
// the clients when something in the network changes.
109+
//
110+
// The clients POST stuff like HostInfo and their Endpoints here, but
111+
// only after their first request (marked with the ReadOnly field).
112+
//
113+
// At this moment the updates are sent in a quite horrendous way, but they kinda work.
105114
func (h *Headscale) PollNetMapHandler(c *gin.Context) {
106115
body, _ := io.ReadAll(c.Request.Body)
107116
mKeyStr := c.Param("id")
108117
mKey, err := wgcfg.ParseHexKey(mKeyStr)
109118
if err != nil {
110119
log.Printf("Cannot parse client key: %s", err)
111-
c.String(http.StatusOK, "Sad!")
112120
return
113121
}
114122
req := tailcfg.MapRequest{}
115123
err = decode(body, &req, &mKey, h.privateKey)
116124
if err != nil {
117125
log.Printf("Cannot decode message: %s", err)
118-
c.String(http.StatusOK, "Very sad!")
119-
// return
126+
return
120127
}
121128

122129
db, err := h.db()
123130
if err != nil {
124131
log.Printf("Cannot open DB: %s", err)
125-
c.String(http.StatusInternalServerError, ":(")
126132
return
127133
}
128134
defer db.Close()
129135
var m Machine
130136
if db.First(&m, "machine_key = ?", mKey.HexString()).RecordNotFound() {
131-
log.Printf("Cannot encode message: %s", err)
132-
c.String(http.StatusOK, "Extremely sad!")
137+
log.Printf("Cannot find machine: %s", err)
133138
return
134139
}
135140

136-
endpoints, _ := json.Marshal(req.Endpoints)
137141
hostinfo, _ := json.Marshal(req.Hostinfo)
138-
m.Endpoints = postgres.Jsonb{RawMessage: json.RawMessage(endpoints)}
142+
m.Name = req.Hostinfo.Hostname
139143
m.HostInfo = postgres.Jsonb{RawMessage: json.RawMessage(hostinfo)}
140144
m.DiscoKey = wgcfg.Key(req.DiscoKey).HexString()
141145
now := time.Now().UTC()
142-
m.LastSeen = &now
146+
147+
// From Tailscale client:
148+
//
149+
// ReadOnly is whether the client just wants to fetch the MapResponse,
150+
// without updating their Endpoints. The Endpoints field will be ignored and
151+
// LastSeen will not be updated and peers will not be notified of changes.
152+
//
153+
// The intended use is for clients to discover the DERP map at start-up
154+
// before their first real endpoint update.
155+
if !req.ReadOnly {
156+
endpoints, _ := json.Marshal(req.Endpoints)
157+
m.Endpoints = postgres.Jsonb{RawMessage: json.RawMessage(endpoints)}
158+
m.LastSeen = &now
159+
}
143160
db.Save(&m)
144161
db.Close()
145162

146-
chanStream := make(chan []byte, 1)
147-
go func() {
148-
defer close(chanStream)
163+
pollData := make(chan []byte, 1)
164+
update := make(chan []byte, 1)
165+
cancelKeepAlive := make(chan []byte, 1)
166+
defer close(pollData)
167+
defer close(update)
168+
defer close(cancelKeepAlive)
169+
h.pollMu.Lock()
170+
h.clientsPolling[m.ID] = update
171+
h.pollMu.Unlock()
149172

150-
data, err := h.getMapResponse(mKey, req, m)
151-
if err != nil {
152-
c.String(http.StatusInternalServerError, ":(")
153-
return
154-
}
173+
data, err := h.getMapResponse(mKey, req, m)
174+
if err != nil {
175+
c.String(http.StatusInternalServerError, ":(")
176+
return
177+
}
155178

156-
//send initial dump
157-
chanStream <- *data
158-
for {
179+
log.Printf("[%s] sending initial map", m.Name)
180+
pollData <- *data
159181

160-
data, err := h.getMapKeepAliveResponse(mKey, req, m)
161-
if err != nil {
162-
c.String(http.StatusInternalServerError, ":(")
163-
return
182+
// We update our peers if the client is not sending ReadOnly in the MapRequest
183+
// so we don't distribute its initial request (it comes with
184+
// empty endpoints to peers)
185+
if !req.ReadOnly {
186+
peers, _ := h.getPeers(m)
187+
h.pollMu.Lock()
188+
for _, p := range *peers {
189+
log.Printf("[%s] notifying peer %s (%s)", m.Name, p.Name, p.Addresses[0])
190+
if pUp, ok := h.clientsPolling[uint64(p.ID)]; ok {
191+
pUp <- []byte{}
192+
} else {
193+
log.Printf("[%s] Peer %s does not appear to be polling", m.Name, p.Name)
164194
}
165-
chanStream <- *data
166-
// keep the node entertained
167-
time.Sleep(time.Second * 180)
168-
break
169195
}
196+
h.pollMu.Unlock()
197+
}
198+
199+
go h.keepAlive(cancelKeepAlive, pollData, mKey, req, m)
170200

171-
}()
172201
c.Stream(func(w io.Writer) bool {
173-
if msg, ok := <-chanStream; ok {
174-
log.Printf("🦀 Sending data to %s: %d bytes", c.Request.RemoteAddr, len(msg))
175-
w.Write(msg)
202+
select {
203+
case data := <-pollData:
204+
log.Printf("[%s] Sending data (%d bytes)", m.Name, len(data))
205+
w.Write(data)
176206
return true
177-
} else {
178-
log.Printf("🦄 Closing connection to %s", c.Request.RemoteAddr)
179-
c.AbortWithStatus(200)
207+
208+
case <-update:
209+
log.Printf("[%s] Received a request for update", m.Name)
210+
data, err := h.getMapResponse(mKey, req, m)
211+
if err != nil {
212+
fmt.Printf("[%s] 🤮 Cannot get the poll response: %s", m.Name, err)
213+
}
214+
w.Write(*data)
215+
return true
216+
217+
case <-c.Request.Context().Done():
218+
log.Printf("[%s] 😥 The client has closed the connection", m.Name)
219+
h.pollMu.Lock()
220+
cancelKeepAlive <- []byte{}
221+
delete(h.clientsPolling, m.ID)
222+
h.pollMu.Unlock()
223+
180224
return false
225+
181226
}
182227
})
228+
}
183229

230+
func (h *Headscale) keepAlive(cancel chan []byte, pollData chan []byte, mKey wgcfg.Key, req tailcfg.MapRequest, m Machine) {
231+
for {
232+
select {
233+
case <-cancel:
234+
return
235+
236+
default:
237+
data, err := h.getMapKeepAliveResponse(mKey, req, m)
238+
if err != nil {
239+
log.Printf("Error generating the keep alive msg: %s", err)
240+
return
241+
}
242+
pollData <- *data
243+
time.Sleep(60 * time.Second)
244+
}
245+
}
184246
}
185247

186248
func (h *Headscale) getMapResponse(mKey wgcfg.Key, req tailcfg.MapRequest, m Machine) (*[]byte, error) {
@@ -221,7 +283,7 @@ func (h *Headscale) getMapResponse(mKey wgcfg.Key, req tailcfg.MapRequest, m Mac
221283
return nil, err
222284
}
223285
}
224-
286+
// spew.Dump(resp)
225287
// declare the incoming size on the first 4 bytes
226288
data := make([]byte, 4)
227289
binary.LittleEndian.PutUint32(data, uint32(len(respBody)))
@@ -289,6 +351,7 @@ func (h *Headscale) handleNewServer(c *gin.Context, db *gorm.DB, idKey wgcfg.Key
289351
MachineKey: idKey.HexString(),
290352
NodeKey: wgcfg.Key(req.NodeKey).HexString(),
291353
Expiry: &req.Expiry,
354+
Name: req.Hostinfo.Hostname,
292355
}
293356
if err := db.Create(&mNew).Error; err != nil {
294357
log.Printf("Could not create row: %s", err)

0 commit comments

Comments
 (0)