diff --git a/connection_manager_test.go b/connection_manager_test.go index a59747d..b1b79de 100644 --- a/connection_manager_test.go +++ b/connection_manager_test.go @@ -28,7 +28,7 @@ func Test_NewConnectionManagerTest(t *testing.T) { rawCertificateNoKey: []byte{}, } - lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1) + lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1, false) ifce := &Interface{ hostMap: hostMap, inside: &Tun{}, @@ -91,7 +91,7 @@ func Test_NewConnectionManagerTest2(t *testing.T) { rawCertificateNoKey: []byte{}, } - lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1) + lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1, false) ifce := &Interface{ hostMap: hostMap, inside: &Tun{}, diff --git a/examples/config.yml b/examples/config.yml index b2d168e..9c43bf6 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -177,6 +177,15 @@ logging: #subsystem: nebula #interval: 10s + # enables counter metrics for meta packets + # e.g.: `messages.tx.handshake` + # NOTE: `message.{tx,rx}.recv_error` is always emitted + #message_metrics: false + + # enables detailed counter metrics for lighthouse packets + # e.g.: `lighthouse.rx.HostQuery` + #lighthouse_metrics: false + # Handshake Manger Settings #handshakes: # Total time to try a handshake = sequence of `try_interval * retries` diff --git a/handshake_ix.go b/handshake_ix.go index 32fbe57..e44df11 100644 --- a/handshake_ix.go +++ b/handshake_ix.go @@ -98,6 +98,7 @@ func ixHandshakeStage1(f *Interface, addr *udpAddr, hostinfo *HostInfo, packet [ hostinfo, _ := f.handshakeManager.pendingHostMap.QueryReverseIndex(hs.Details.InitiatorIndex) if hostinfo != nil && bytes.Equal(hostinfo.HandshakePacket[0], packet[HeaderLen:]) { if msg, ok := hostinfo.HandshakePacket[2]; ok { + f.messageMetrics.Tx(handshake, NebulaMessageSubType(msg[1]), 1) err := f.outside.WriteTo(msg, addr) if err != nil { l.WithField("vpnIp", IntIp(hostinfo.hostId)).WithField("udpAddr", addr). @@ -191,6 +192,7 @@ func ixHandshakeStage1(f *Interface, addr *udpAddr, hostinfo *HostInfo, packet [ hostinfo.HandshakePacket[2] = make([]byte, len(msg)) copy(hostinfo.HandshakePacket[2], msg) + f.messageMetrics.Tx(handshake, NebulaMessageSubType(msg[1]), 1) err := f.outside.WriteTo(msg, addr) if err != nil { l.WithField("vpnIp", IntIp(vpnIP)).WithField("udpAddr", addr). diff --git a/handshake_manager.go b/handshake_manager.go index 67e990a..1d23013 100644 --- a/handshake_manager.go +++ b/handshake_manager.go @@ -31,6 +31,8 @@ type HandshakeConfig struct { tryInterval time.Duration retries int waitRotation int + + messageMetrics *MessageMetrics } type HandshakeManager struct { @@ -42,6 +44,8 @@ type HandshakeManager struct { OutboundHandshakeTimer *SystemTimerWheel InboundHandshakeTimer *SystemTimerWheel + + messageMetrics *MessageMetrics } func NewHandshakeManager(tunCidr *net.IPNet, preferredRanges []*net.IPNet, mainHostMap *HostMap, lightHouse *LightHouse, outside *udpConn, config HandshakeConfig) *HandshakeManager { @@ -55,6 +59,8 @@ func NewHandshakeManager(tunCidr *net.IPNet, preferredRanges []*net.IPNet, mainH OutboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)), InboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)), + + messageMetrics: config.messageMetrics, } } @@ -111,6 +117,7 @@ func (c *HandshakeManager) NextOutboundHandshakeTimerTick(now time.Time, f EncWr // Ensure the handshake is ready to avoid a race in timer tick and stage 0 handshake generation if hostinfo.HandshakeReady && hostinfo.remote != nil { + c.messageMetrics.Tx(handshake, NebulaMessageSubType(hostinfo.HandshakePacket[0][1]), 1) err := c.outside.WriteTo(hostinfo.HandshakePacket[0], hostinfo.remote) if err != nil { hostinfo.logger().WithField("udpAddr", hostinfo.remote). diff --git a/hostmap.go b/hostmap.go index a93ffc5..1ecdb96 100644 --- a/hostmap.go +++ b/hostmap.go @@ -30,6 +30,7 @@ type HostMap struct { vpnCIDR *net.IPNet defaultRoute uint32 unsafeRoutes *CIDRTree + metricsEnabled bool } type HostInfo struct { @@ -384,8 +385,16 @@ func (hm *HostMap) PunchList() []*udpAddr { } func (hm *HostMap) Punchy(conn *udpConn) { + var metricsTxPunchy metrics.Counter + if hm.metricsEnabled { + metricsTxPunchy = metrics.GetOrRegisterCounter("messages.tx.punchy", nil) + } else { + metricsTxPunchy = metrics.NilCounter{} + } + for { for _, addr := range hm.PunchList() { + metricsTxPunchy.Inc(1) conn.WriteTo([]byte{1}, addr) } time.Sleep(time.Second * 30) diff --git a/inside.go b/inside.go index 7bba445..6e65559 100644 --- a/inside.go +++ b/inside.go @@ -46,7 +46,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket, dropReason := f.firewall.Drop(packet, *fwPacket, false, hostinfo, trustedCAs) if dropReason == nil { - f.send(message, 0, ci, hostinfo, hostinfo.remote, packet, nb, out) + f.sendNoMetrics(message, 0, ci, hostinfo, hostinfo.remote, packet, nb, out) if f.lightHouse != nil && *ci.messageCounter%5000 == 0 { f.lightHouse.Query(fwPacket.RemoteIP, f) } @@ -118,7 +118,7 @@ func (f *Interface) sendMessageNow(t NebulaMessageType, st NebulaMessageSubType, return } - f.send(message, st, hostInfo.ConnectionState, hostInfo, hostInfo.remote, p, nb, out) + f.sendNoMetrics(message, st, hostInfo.ConnectionState, hostInfo, hostInfo.remote, p, nb, out) if f.lightHouse != nil && *hostInfo.ConnectionState.messageCounter%5000 == 0 { f.lightHouse.Query(fp.RemoteIP, f) } @@ -175,6 +175,11 @@ func (f *Interface) sendMessageToAll(t NebulaMessageType, st NebulaMessageSubTyp } func (f *Interface) send(t NebulaMessageType, st NebulaMessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote *udpAddr, p, nb, out []byte) { + f.messageMetrics.Tx(t, st, 1) + f.sendNoMetrics(t, st, ci, hostinfo, remote, p, nb, out) +} + +func (f *Interface) sendNoMetrics(t NebulaMessageType, st NebulaMessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote *udpAddr, p, nb, out []byte) { if ci.eKey == nil { //TODO: log warning return diff --git a/interface.go b/interface.go index 0ecc0e8..9203f09 100644 --- a/interface.go +++ b/interface.go @@ -25,6 +25,7 @@ type InterfaceConfig struct { DropLocalBroadcast bool DropMulticast bool UDPBatchSize int + MessageMetrics *MessageMetrics } type Interface struct { @@ -45,9 +46,8 @@ type Interface struct { udpBatchSize int version string - metricRxRecvError metrics.Counter - metricTxRecvError metrics.Counter - metricHandshakes metrics.Histogram + metricHandshakes metrics.Histogram + messageMetrics *MessageMetrics } func NewInterface(c *InterfaceConfig) (*Interface, error) { @@ -80,9 +80,8 @@ func NewInterface(c *InterfaceConfig) (*Interface, error) { dropMulticast: c.DropMulticast, udpBatchSize: c.UDPBatchSize, - metricRxRecvError: metrics.GetOrRegisterCounter("messages.rx.recv_error", nil), - metricTxRecvError: metrics.GetOrRegisterCounter("messages.tx.recv_error", nil), - metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)), + metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)), + messageMetrics: c.MessageMetrics, } ifce.connectionManager = newConnectionManager(ifce, c.checkInterval, c.pendingDeletionInterval) diff --git a/lighthouse.go b/lighthouse.go index 5ccb4a6..9b5b1c7 100644 --- a/lighthouse.go +++ b/lighthouse.go @@ -7,6 +7,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/rcrowley/go-metrics" "github.com/slackhq/nebula/cert" ) @@ -37,6 +38,9 @@ type LightHouse struct { nebulaPort int punchBack bool punchDelay time.Duration + + metrics *MessageMetrics + metricHolepunchTx metrics.Counter } type EncWriter interface { @@ -44,7 +48,7 @@ type EncWriter interface { SendMessageToAll(t NebulaMessageType, st NebulaMessageSubType, vpnIp uint32, p, nb, out []byte) } -func NewLightHouse(amLighthouse bool, myIp uint32, ips []uint32, interval int, nebulaPort int, pc *udpConn, punchBack bool, punchDelay time.Duration) *LightHouse { +func NewLightHouse(amLighthouse bool, myIp uint32, ips []uint32, interval int, nebulaPort int, pc *udpConn, punchBack bool, punchDelay time.Duration, metricsEnabled bool) *LightHouse { h := LightHouse{ amLighthouse: amLighthouse, myIp: myIp, @@ -58,6 +62,14 @@ func NewLightHouse(amLighthouse bool, myIp uint32, ips []uint32, interval int, n punchDelay: punchDelay, } + if metricsEnabled { + h.metrics = newLighthouseMetrics() + + h.metricHolepunchTx = metrics.GetOrRegisterCounter("messages.tx.holepunch", nil) + } else { + h.metricHolepunchTx = metrics.NilCounter{} + } + for _, ip := range ips { h.lighthouses[ip] = struct{}{} } @@ -111,6 +123,7 @@ func (lh *LightHouse) QueryServer(ip uint32, f EncWriter) { return } + lh.metricTx(NebulaMeta_HostQuery, int64(len(lh.lighthouses))) nb := make([]byte, 12, 12) out := make([]byte, mtu) for n := range lh.lighthouses { @@ -249,6 +262,7 @@ func (lh *LightHouse) LhUpdateWorker(f EncWriter) { }, } + lh.metricTx(NebulaMeta_HostUpdateNotification, int64(len(lh.lighthouses))) nb := make([]byte, 12, 12) out := make([]byte, mtu) for vpnIp := range lh.lighthouses { @@ -281,6 +295,8 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c return } + lh.metricRx(n.Type, 1) + switch n.Type { case NebulaMeta_HostQuery: // Exit if we don't answer queries @@ -308,6 +324,7 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).Error("Failed to marshal lighthouse host query reply") return } + lh.metricTx(NebulaMeta_HostQueryReply, 1) f.SendMessageToVpnIp(lightHouse, 0, vpnIp, reply, make([]byte, 12, 12), make([]byte, mtu)) // This signals the other side to punch some zero byte udp packets @@ -326,6 +343,7 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c }, } reply, _ := proto.Marshal(answer) + lh.metricTx(NebulaMeta_HostPunchNotification, 1) f.SendMessageToVpnIp(lightHouse, 0, n.Details.VpnIp, reply, make([]byte, 12, 12), make([]byte, mtu)) } //fmt.Println(reply, remoteaddr) @@ -362,6 +380,7 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c vpnPeer := NewUDPAddr(a.Ip, uint16(a.Port)) go func() { time.Sleep(lh.punchDelay) + lh.metricHolepunchTx.Inc(1) lh.punchConn.WriteTo(empty, vpnPeer) }() @@ -380,6 +399,13 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c } } +func (lh *LightHouse) metricRx(t NebulaMeta_MessageType, i int64) { + lh.metrics.Rx(NebulaMessageType(t), 0, i) +} +func (lh *LightHouse) metricTx(t NebulaMeta_MessageType, i int64) { + lh.metrics.Tx(NebulaMessageType(t), 0, i) +} + /* func (f *Interface) sendPathCheck(ci *ConnectionState, endpoint *net.UDPAddr, counter int) { c := ci.messageCounter diff --git a/lighthouse_test.go b/lighthouse_test.go index 02b8ca7..d5ce880 100644 --- a/lighthouse_test.go +++ b/lighthouse_test.go @@ -52,7 +52,7 @@ func Test_lhStaticMapping(t *testing.T) { udpServer, _ := NewListener("0.0.0.0", 0, true) - meh := NewLightHouse(true, 1, []uint32{ip2int(lh1IP)}, 10, 10003, udpServer, false, 1) + meh := NewLightHouse(true, 1, []uint32{ip2int(lh1IP)}, 10, 10003, udpServer, false, 1, false) meh.AddRemote(ip2int(lh1IP), NewUDPAddr(ip2int(lh1IP), uint16(4242)), true) err := meh.ValidateLHStaticEntries() assert.Nil(t, err) @@ -60,7 +60,7 @@ func Test_lhStaticMapping(t *testing.T) { lh2 := "10.128.0.3" lh2IP := net.ParseIP(lh2) - meh = NewLightHouse(true, 1, []uint32{ip2int(lh1IP), ip2int(lh2IP)}, 10, 10003, udpServer, false, 1) + meh = NewLightHouse(true, 1, []uint32{ip2int(lh1IP), ip2int(lh2IP)}, 10, 10003, udpServer, false, 1, false) meh.AddRemote(ip2int(lh1IP), NewUDPAddr(ip2int(lh1IP), uint16(4242)), true) err = meh.ValidateLHStaticEntries() assert.EqualError(t, err, "Lighthouse 10.128.0.3 does not have a static_host_map entry") diff --git a/main.go b/main.go index 7088173..9eb6584 100644 --- a/main.go +++ b/main.go @@ -172,6 +172,7 @@ func Main(configPath string, configTest bool, buildVersion string) { hostMap := NewHostMap("main", tunCidr, preferredRanges) hostMap.SetDefaultRoute(ip2int(net.ParseIP(config.GetString("default_route", "0.0.0.0")))) hostMap.addUnsafeRoutes(&unsafeRoutes) + hostMap.metricsEnabled = config.GetBool("stats.message_metrics", false) l.WithField("network", hostMap.vpnCIDR).WithField("preferredRanges", hostMap.preferredRanges).Info("Main HostMap created") @@ -226,6 +227,7 @@ func Main(configPath string, configTest bool, buildVersion string) { udpServer, punchy.Respond, punchy.Delay, + config.GetBool("stats.lighthouse_metrics", false), ) remoteAllowList, err := config.GetAllowList("lighthouse.remote_allow_list", false) @@ -280,10 +282,19 @@ func Main(configPath string, configTest bool, buildVersion string) { l.WithError(err).Error("Lighthouse unreachable") } + var messageMetrics *MessageMetrics + if config.GetBool("stats.message_metrics", false) { + messageMetrics = newMessageMetrics() + } else { + messageMetrics = newMessageMetricsOnlyRecvError() + } + handshakeConfig := HandshakeConfig{ tryInterval: config.GetDuration("handshakes.try_interval", DefaultHandshakeTryInterval), retries: config.GetInt("handshakes.retries", DefaultHandshakeRetries), waitRotation: config.GetInt("handshakes.wait_rotation", DefaultHandshakeWaitRotation), + + messageMetrics: messageMetrics, } handshakeManager := NewHandshakeManager(tunCidr, preferredRanges, hostMap, lightHouse, udpServer, handshakeConfig) @@ -310,6 +321,7 @@ func Main(configPath string, configTest bool, buildVersion string) { DropLocalBroadcast: config.GetBool("tun.drop_local_broadcast", false), DropMulticast: config.GetBool("tun.drop_multicast", false), UDPBatchSize: config.GetInt("listen.batch", 64), + MessageMetrics: messageMetrics, } switch ifConfig.Cipher { diff --git a/message_metrics.go b/message_metrics.go new file mode 100644 index 0000000..ccd0207 --- /dev/null +++ b/message_metrics.go @@ -0,0 +1,97 @@ +package nebula + +import ( + "fmt" + + "github.com/rcrowley/go-metrics" +) + +type MessageMetrics struct { + rx [][]metrics.Counter + tx [][]metrics.Counter + + rxUnknown metrics.Counter + txUnknown metrics.Counter +} + +func (m *MessageMetrics) Rx(t NebulaMessageType, s NebulaMessageSubType, i int64) { + if m != nil { + if t >= 0 && int(t) < len(m.rx) && s >= 0 && int(s) < len(m.rx[t]) { + m.rx[t][s].Inc(i) + } else if m.rxUnknown != nil { + m.rxUnknown.Inc(i) + } + } +} +func (m *MessageMetrics) Tx(t NebulaMessageType, s NebulaMessageSubType, i int64) { + if m != nil { + if t >= 0 && int(t) < len(m.tx) && s >= 0 && int(s) < len(m.tx[t]) { + m.tx[t][s].Inc(i) + } else if m.txUnknown != nil { + m.txUnknown.Inc(i) + } + } +} + +func newMessageMetrics() *MessageMetrics { + gen := func(t string) [][]metrics.Counter { + return [][]metrics.Counter{ + { + metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.handshake_ixpsk0", t), nil), + }, + nil, + {metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.recv_error", t), nil)}, + {metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.lighthouse", t), nil)}, + { + metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.test_request", t), nil), + metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.test_response", t), nil), + }, + {metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.close_tunnel", t), nil)}, + } + } + return &MessageMetrics{ + rx: gen("rx"), + tx: gen("tx"), + + rxUnknown: metrics.GetOrRegisterCounter("messages.rx.other", nil), + txUnknown: metrics.GetOrRegisterCounter("messages.tx.other", nil), + } +} + +// Historically we only recorded recv_error, so this is backwards compat +func newMessageMetricsOnlyRecvError() *MessageMetrics { + gen := func(t string) [][]metrics.Counter { + return [][]metrics.Counter{ + nil, + nil, + {metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.recv_error", t), nil)}, + } + } + return &MessageMetrics{ + rx: gen("rx"), + tx: gen("tx"), + } +} + +func newLighthouseMetrics() *MessageMetrics { + gen := func(t string) [][]metrics.Counter { + h := make([][]metrics.Counter, len(NebulaMeta_MessageType_name)) + used := []NebulaMeta_MessageType{ + NebulaMeta_HostQuery, + NebulaMeta_HostQueryReply, + NebulaMeta_HostUpdateNotification, + NebulaMeta_HostPunchNotification, + } + for _, i := range used { + h[i] = []metrics.Counter{metrics.GetOrRegisterCounter(fmt.Sprintf("lighthouse.%s.%s", t, i.String()), nil)} + } + return h + } + return &MessageMetrics{ + rx: gen("rx"), + tx: gen("tx"), + + rxUnknown: metrics.GetOrRegisterCounter("lighthouse.rx.other", nil), + txUnknown: metrics.GetOrRegisterCounter("lighthouse.tx.other", nil), + } +} diff --git a/outside.go b/outside.go index adc91ca..add7c62 100644 --- a/outside.go +++ b/outside.go @@ -54,6 +54,7 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte, // Fallthrough to the bottom to record incoming traffic case lightHouse: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) if !f.handleEncrypted(ci, addr, header) { return } @@ -74,6 +75,7 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte, // Fallthrough to the bottom to record incoming traffic case test: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) if !f.handleEncrypted(ci, addr, header) { return } @@ -102,15 +104,18 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte, // are unauthenticated case handshake: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) HandleIncomingHandshake(f, addr, packet, header, hostinfo) return case recvError: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) // TODO: Remove this with recv_error deprecation f.handleRecvError(addr, header) return case closeTunnel: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) if !f.handleEncrypted(ci, addr, header) { return } @@ -122,6 +127,7 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte, return default: + f.messageMetrics.Rx(header.Type, header.Subtype, 1) hostinfo.logger().Debugf("Unexpected packet received from %s", addr) return } @@ -298,7 +304,7 @@ func (f *Interface) decryptToTun(hostinfo *HostInfo, messageCounter uint64, out } func (f *Interface) sendRecvError(endpoint *udpAddr, index uint32) { - f.metricTxRecvError.Inc(1) + f.messageMetrics.Tx(recvError, 0, 1) //TODO: this should be a signed message so we can trust that we should drop the index b := HeaderEncode(make([]byte, HeaderLen), Version, uint8(recvError), 0, index, 0) @@ -311,8 +317,6 @@ func (f *Interface) sendRecvError(endpoint *udpAddr, index uint32) { } func (f *Interface) handleRecvError(addr *udpAddr, h *Header) { - f.metricRxRecvError.Inc(1) - // This flag is to stop caring about recv_error from old versions // This should go away when the old version is gone from prod if l.Level >= logrus.DebugLevel {