diff --git a/firewall.go b/firewall.go index 81f3377..5716894 100644 --- a/firewall.go +++ b/firewall.go @@ -68,9 +68,18 @@ type Firewall struct { rules string rulesVersion uint16 - trackTCPRTT bool - metricTCPRTT metrics.Histogram - l *logrus.Logger + trackTCPRTT bool + metricTCPRTT metrics.Histogram + incomingMetrics firewallMetrics + outgoingMetrics firewallMetrics + + l *logrus.Logger +} + +type firewallMetrics struct { + droppedLocalIP metrics.Counter + droppedRemoteIP metrics.Counter + droppedNoRule metrics.Counter } type FirewallConntrack struct { @@ -195,8 +204,19 @@ func NewFirewall(l *logrus.Logger, tcpTimeout, UDPTimeout, defaultTimeout time.D UDPTimeout: UDPTimeout, DefaultTimeout: defaultTimeout, localIps: localIps, - metricTCPRTT: metrics.GetOrRegisterHistogram("network.tcp.rtt", nil, metrics.NewExpDecaySample(1028, 0.015)), l: l, + + metricTCPRTT: metrics.GetOrRegisterHistogram("network.tcp.rtt", nil, metrics.NewExpDecaySample(1028, 0.015)), + incomingMetrics: firewallMetrics{ + droppedLocalIP: metrics.GetOrRegisterCounter("firewall.incoming.dropped.local_ip", nil), + droppedRemoteIP: metrics.GetOrRegisterCounter("firewall.incoming.dropped.remote_ip", nil), + droppedNoRule: metrics.GetOrRegisterCounter("firewall.incoming.dropped.no_rule", nil), + }, + outgoingMetrics: firewallMetrics{ + droppedLocalIP: metrics.GetOrRegisterCounter("firewall.outgoing.dropped.local_ip", nil), + droppedRemoteIP: metrics.GetOrRegisterCounter("firewall.outgoing.dropped.remote_ip", nil), + droppedNoRule: metrics.GetOrRegisterCounter("firewall.outgoing.dropped.no_rule", nil), + }, } } @@ -385,17 +405,20 @@ func (f *Firewall) Drop(packet []byte, fp FirewallPacket, incoming bool, h *Host // Make sure remote address matches nebula certificate if remoteCidr := h.remoteCidr; remoteCidr != nil { if remoteCidr.Contains(fp.RemoteIP) == nil { + f.metrics(incoming).droppedRemoteIP.Inc(1) return ErrInvalidRemoteIP } } else { // Simple case: Certificate has one IP and no subnets if fp.RemoteIP != h.hostId { + f.metrics(incoming).droppedRemoteIP.Inc(1) return ErrInvalidRemoteIP } } // Make sure we are supposed to be handling this local ip address if f.localIps.Contains(fp.LocalIP) == nil { + f.metrics(incoming).droppedLocalIP.Inc(1) return ErrInvalidLocalIP } @@ -406,6 +429,7 @@ func (f *Firewall) Drop(packet []byte, fp FirewallPacket, incoming bool, h *Host // We now know which firewall table to check against if !table.match(fp, incoming, h.ConnectionState.peerCert, caPool) { + f.metrics(incoming).droppedNoRule.Inc(1) return ErrNoMatchingRule } @@ -415,6 +439,14 @@ func (f *Firewall) Drop(packet []byte, fp FirewallPacket, incoming bool, h *Host return nil } +func (f *Firewall) metrics(incoming bool) firewallMetrics { + if incoming { + return f.incomingMetrics + } else { + return f.outgoingMetrics + } +} + // Destroy cleans up any known cyclical references so the object can be free'd my GC. This should be called if a new // firewall object is created func (f *Firewall) Destroy() { diff --git a/handshake_ix.go b/handshake_ix.go index 6dec998..5fc5825 100644 --- a/handshake_ix.go +++ b/handshake_ix.go @@ -268,10 +268,11 @@ func ixHandshakeStage1(f *Interface, addr *udpAddr, packet []byte, h *Header) { WithField("fingerprint", fingerprint). WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex). WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 2, "style": "ix_psk0"}). + WithField("sentCachedPackets", len(hostinfo.packetStore)). Info("Handshake message sent") } - hostinfo.handshakeComplete(f.l) + hostinfo.handshakeComplete(f.l, f.cachedPacketMetrics) return } @@ -391,6 +392,7 @@ func ixHandshakeStage2(f *Interface, addr *udpAddr, hostinfo *HostInfo, packet [ WithField("initiatorIndex", hs.Details.InitiatorIndex).WithField("responderIndex", hs.Details.ResponderIndex). WithField("remoteIndex", h.RemoteIndex).WithField("handshake", m{"stage": 2, "style": "ix_psk0"}). WithField("durationNs", duration). + WithField("sentCachedPackets", len(hostinfo.packetStore)). Info("Handshake message received") hostinfo.remoteIndexId = hs.Details.ResponderIndex @@ -410,7 +412,7 @@ func ixHandshakeStage2(f *Interface, addr *udpAddr, hostinfo *HostInfo, packet [ // Complete our handshake and update metrics, this will replace any existing tunnels for this vpnIp //TODO: Complete here does not do a race avoidance, it will just take the new tunnel. Is this ok? f.handshakeManager.Complete(hostinfo, f) - hostinfo.handshakeComplete(f.l) + hostinfo.handshakeComplete(f.l, f.cachedPacketMetrics) f.metricHandshakes.Update(duration) return false diff --git a/handshake_manager.go b/handshake_manager.go index 90afc62..313f568 100644 --- a/handshake_manager.go +++ b/handshake_manager.go @@ -8,6 +8,7 @@ import ( "net" "time" + "github.com/rcrowley/go-metrics" "github.com/sirupsen/logrus" ) @@ -41,6 +42,8 @@ type HandshakeManager struct { config HandshakeConfig OutboundHandshakeTimer *SystemTimerWheel messageMetrics *MessageMetrics + metricInitiated metrics.Counter + metricTimedOut metrics.Counter l *logrus.Logger // can be used to trigger outbound handshake for the given vpnIP @@ -57,6 +60,8 @@ func NewHandshakeManager(l *logrus.Logger, tunCidr *net.IPNet, preferredRanges [ trigger: make(chan uint32, config.triggerBuffer), OutboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, hsTimeout(config.retries, config.tryInterval)), messageMetrics: config.messageMetrics, + metricInitiated: metrics.GetOrRegisterCounter("handshake_manager.initiated", nil), + metricTimedOut: metrics.GetOrRegisterCounter("handshake_manager.timed_out", nil), l: l, } } @@ -117,7 +122,7 @@ func (c *HandshakeManager) handleOutbound(vpnIP uint32, f EncWriter, lighthouseT WithField("handshake", m{"stage": 1, "style": "ix_psk0"}). WithField("durationNs", time.Since(hostinfo.handshakeStart).Nanoseconds()). Info("Handshake timed out") - //TODO: emit metrics + c.metricTimedOut.Inc(1) c.pendingHostMap.DeleteHostInfo(hostinfo) return } @@ -181,6 +186,7 @@ func (c *HandshakeManager) AddVpnIP(vpnIP uint32) *HostInfo { // main receive thread for very long by waiting to add items to the pending map //TODO: what lock? c.OutboundHandshakeTimer.Add(vpnIP, c.config.tryInterval) + c.metricInitiated.Inc(1) return hostinfo } diff --git a/hostmap.go b/hostmap.go index 4e988bf..c764c20 100644 --- a/hostmap.go +++ b/hostmap.go @@ -77,6 +77,11 @@ type cachedPacket struct { type packetCallback func(t NebulaMessageType, st NebulaMessageSubType, h *HostInfo, p, nb, out []byte) +type cachedPacketMetrics struct { + sent metrics.Counter + dropped metrics.Counter +} + func NewHostMap(l *logrus.Logger, name string, vpnCIDR *net.IPNet, preferredRanges []*net.IPNet) *HostMap { h := map[uint32]*HostInfo{} i := map[uint32]*HostInfo{} @@ -435,7 +440,7 @@ func (i *HostInfo) TryPromoteBest(preferredRanges []*net.IPNet, ifce *Interface) } } -func (i *HostInfo) cachePacket(l *logrus.Logger, t NebulaMessageType, st NebulaMessageSubType, packet []byte, f packetCallback) { +func (i *HostInfo) cachePacket(l *logrus.Logger, t NebulaMessageType, st NebulaMessageSubType, packet []byte, f packetCallback, m *cachedPacketMetrics) { //TODO: return the error so we can log with more context if len(i.packetStore) < 100 { tempPacket := make([]byte, len(packet)) @@ -450,6 +455,7 @@ func (i *HostInfo) cachePacket(l *logrus.Logger, t NebulaMessageType, st NebulaM } } else if l.Level >= logrus.DebugLevel { + m.dropped.Inc(1) i.logger(l). WithField("length", len(i.packetStore)). WithField("stored", false). @@ -458,7 +464,7 @@ func (i *HostInfo) cachePacket(l *logrus.Logger, t NebulaMessageType, st NebulaM } // handshakeComplete will set the connection as ready to communicate, as well as flush any stored packets -func (i *HostInfo) handshakeComplete(l *logrus.Logger) { +func (i *HostInfo) handshakeComplete(l *logrus.Logger, m *cachedPacketMetrics) { //TODO: I'm not certain the distinction between handshake complete and ConnectionState being ready matters because: //TODO: HandshakeComplete means send stored packets and ConnectionState.ready means we are ready to send //TODO: if the transition from HandhsakeComplete to ConnectionState.ready happens all within this function they are identical @@ -479,6 +485,7 @@ func (i *HostInfo) handshakeComplete(l *logrus.Logger) { for _, cp := range i.packetStore { cp.callback(cp.messageType, cp.messageSubType, i, cp.packet, nb, out) } + m.sent.Inc(int64(len(i.packetStore))) } i.remotes.ResetBlockedRemotes() diff --git a/inside.go b/inside.go index 46371bd..d2ca06a 100644 --- a/inside.go +++ b/inside.go @@ -45,7 +45,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket, // the packet queue. ci.queueLock.Lock() if !ci.ready { - hostinfo.cachePacket(f.l, message, 0, packet, f.sendMessageNow) + hostinfo.cachePacket(f.l, message, 0, packet, f.sendMessageNow, f.cachedPacketMetrics) ci.queueLock.Unlock() return } @@ -164,7 +164,7 @@ func (f *Interface) SendMessageToVpnIp(t NebulaMessageType, st NebulaMessageSubT // the packet queue. hostInfo.ConnectionState.queueLock.Lock() if !hostInfo.ConnectionState.ready { - hostInfo.cachePacket(f.l, t, st, p, f.sendMessageToVpnIp) + hostInfo.cachePacket(f.l, t, st, p, f.sendMessageToVpnIp, f.cachedPacketMetrics) hostInfo.ConnectionState.queueLock.Unlock() return } diff --git a/interface.go b/interface.go index 6ad2d84..108ca05 100644 --- a/interface.go +++ b/interface.go @@ -77,9 +77,11 @@ type Interface struct { writers []*udpConn readers []io.ReadWriteCloser - metricHandshakes metrics.Histogram - messageMetrics *MessageMetrics - l *logrus.Logger + metricHandshakes metrics.Histogram + messageMetrics *MessageMetrics + cachedPacketMetrics *cachedPacketMetrics + + l *logrus.Logger } func NewInterface(c *InterfaceConfig) (*Interface, error) { @@ -122,7 +124,12 @@ func NewInterface(c *InterfaceConfig) (*Interface, error) { metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)), messageMetrics: c.MessageMetrics, - l: c.l, + cachedPacketMetrics: &cachedPacketMetrics{ + sent: metrics.GetOrRegisterCounter("hostinfo.cached_packets.sent", nil), + dropped: metrics.GetOrRegisterCounter("hostinfo.cached_packets.dropped", nil), + }, + + l: c.l, } ifce.connectionManager = newConnectionManager(c.l, ifce, c.checkInterval, c.pendingDeletionInterval)