Implement networks routed through wesher

This adds an option for specifying a routed network. Every node route
that belongs to that routed network will be announced to the cluster and
every other node will direct traffic to that node for said route.
This commit is contained in:
kaiyou 2020-05-18 10:42:32 +02:00
parent e9aae4dc3b
commit 8637377cec
6 changed files with 104 additions and 8 deletions

View File

@ -11,6 +11,7 @@ import (
// nodeMeta holds metadata sent over the cluster // nodeMeta holds metadata sent over the cluster
type nodeMeta struct { type nodeMeta struct {
OverlayAddr net.IPNet OverlayAddr net.IPNet
Routes []net.IPNet
PubKey string PubKey string
} }

32
common/routes.go Normal file
View File

@ -0,0 +1,32 @@
package common
import (
"net"
"github.com/vishvananda/netlink"
)
// Routes pushes list of local routes to a channel, after filtering using the provided network
// The full list is pushed after every routing change
func Routes(filter *net.IPNet) <-chan []net.IPNet {
routesc := make(chan []net.IPNet)
updatec := make(chan netlink.RouteUpdate)
netlink.RouteSubscribe(updatec, make(chan struct{}))
go func() {
for {
<-updatec
routes, err := netlink.RouteList(nil, netlink.FAMILY_ALL)
if err != nil {
continue
}
result := make([]net.IPNet, 0)
for _, route := range routes {
if route.Dst != nil && filter.Contains(route.Dst.IP) {
result = append(result, *route.Dst)
}
}
routesc <- result
}
}()
return routesc
}

View File

@ -19,6 +19,7 @@ type config struct {
ClusterPort int `id:"cluster-port" desc:"port used for membership gossip traffic (both TCP and UDP); must be the same across cluster" default:"7946"` ClusterPort int `id:"cluster-port" desc:"port used for membership gossip traffic (both TCP and UDP); must be the same across cluster" default:"7946"`
WireguardPort int `id:"wireguard-port" desc:"port used for wireguard traffic (UDP); must be the same across cluster" default:"51820"` WireguardPort int `id:"wireguard-port" desc:"port used for wireguard traffic (UDP); must be the same across cluster" default:"51820"`
OverlayNet *network `id:"overlay-net" desc:"the network in which to allocate addresses for the overlay mesh network (CIDR format); smaller networks increase the chance of IP collision" default:"10.0.0.0/8"` OverlayNet *network `id:"overlay-net" desc:"the network in which to allocate addresses for the overlay mesh network (CIDR format); smaller networks increase the chance of IP collision" default:"10.0.0.0/8"`
RoutedNet *network `id:"routed-net" desc:"network used to filter routes that nodes are allowed to announce (CIDR format)" default:"0.0.0.0/32"`
Interface string `desc:"name of the wireguard interface to create and manage" default:"wgoverlay"` Interface string `desc:"name of the wireguard interface to create and manage" default:"wgoverlay"`
NoEtcHosts bool `id:"no-etc-hosts" desc:"disable writing of entries to /etc/hosts"` NoEtcHosts bool `id:"no-etc-hosts" desc:"disable writing of entries to /etc/hosts"`
LogLevel string `id:"log-level" desc:"set the verbosity (debug/info/warn/error)" default:"warn"` LogLevel string `id:"log-level" desc:"set the verbosity (debug/info/warn/error)" default:"warn"`

View File

@ -64,6 +64,7 @@ func main() {
} }
// Main loop // Main loop
routesc := common.Routes((*net.IPNet)(config.RoutedNet))
incomingSigs := make(chan os.Signal, 1) incomingSigs := make(chan os.Signal, 1)
signal.Notify(incomingSigs, syscall.SIGTERM, os.Interrupt) signal.Notify(incomingSigs, syscall.SIGTERM, os.Interrupt)
logrus.Debug("waiting for cluster events") logrus.Debug("waiting for cluster events")
@ -82,7 +83,7 @@ func main() {
nodes = append(nodes, node) nodes = append(nodes, node)
hosts[node.OverlayAddr.IP.String()] = []string{node.Name} hosts[node.OverlayAddr.IP.String()] = []string{node.Name}
} }
if err := wgstate.SetUpInterface(nodes); err != nil { if err := wgstate.SetUpInterface(nodes, (*net.IPNet)(config.RoutedNet)); err != nil {
logrus.WithError(err).Error("could not up interface") logrus.WithError(err).Error("could not up interface")
wgstate.DownInterface() wgstate.DownInterface()
} }
@ -91,6 +92,10 @@ func main() {
logrus.WithError(err).Error("could not write hosts entries") logrus.WithError(err).Error("could not write hosts entries")
} }
} }
case routes := <-routesc:
logrus.Info("announcing new routes...")
localNode.Routes = routes
cluster.Update(localNode)
case <-incomingSigs: case <-incomingSigs:
logrus.Info("terminating...") logrus.Info("terminating...")
cluster.Leave() cluster.Leave()

View File

@ -127,6 +127,20 @@ test_multiple_clusters_restart() {
stop_test_container test1-orig stop_test_container test1-orig
} }
test_routed_network() {
run_test_container test1-orig test1 --init --routed-net 10.15.0.0/16
run_test_container test2-orig test2 --join test1-orig --routed-net 10.15.0.0/16
docker exec test2-orig bash -c "ip l a test type bridge; ip l s up test; ip a a 10.15.0.1/24 dev test"
sleep 3
docker exec test1-orig ping -c1 -W1 test2 || (docker logs test1-orig; docker logs test2-orig; false)
docker exec test1-orig ping -c1 -W1 10.15.0.1 || (docker logs test1-orig; docker logs test2-orig; false)
stop_test_container test2-orig
stop_test_container test1-orig
}
for test_func in $(declare -F | grep -Eo '\<test_.*$'); do for test_func in $(declare -F | grep -Eo '\<test_.*$'); do
echo "--- Running $test_func:" echo "--- Running $test_func:"
$test_func $test_func

View File

@ -94,7 +94,7 @@ func (s *State) DownInterface() error {
} }
// SetUpInterface creates and sets up the associated network interface // SetUpInterface creates and sets up the associated network interface
func (s *State) SetUpInterface(nodes []common.Node) error { func (s *State) SetUpInterface(nodes []common.Node, routedNet *net.IPNet) error {
if err := netlink.LinkAdd(&wireguard{LinkAttrs: netlink.LinkAttrs{Name: s.iface}}); err != nil && !os.IsExist(err) { if err := netlink.LinkAdd(&wireguard{LinkAttrs: netlink.LinkAttrs{Name: s.iface}}); err != nil && !os.IsExist(err) {
return errors.Wrapf(err, "could not create interface %s", s.iface) return errors.Wrapf(err, "could not create interface %s", s.iface)
} }
@ -128,12 +128,45 @@ func (s *State) SetUpInterface(nodes []common.Node) error {
if err := netlink.LinkSetUp(link); err != nil { if err := netlink.LinkSetUp(link); err != nil {
return errors.Wrapf(err, "could not enable interface %s", s.iface) return errors.Wrapf(err, "could not enable interface %s", s.iface)
} }
for _, node := range nodes {
netlink.RouteAdd(&netlink.Route{ // first compute routes
currentRoutes, err := netlink.RouteList(link, netlink.FAMILY_ALL)
if err != nil {
return errors.Wrapf(err, "could not update the routing table for %s", s.iface)
}
routes := make([]netlink.Route, 0)
for index, node := range nodes {
// dev route
routes = append(routes, netlink.Route{
LinkIndex: link.Attrs().Index, LinkIndex: link.Attrs().Index,
Dst: &node.OverlayAddr, Dst: &nodes[index].OverlayAddr,
Scope: netlink.SCOPE_LINK, Scope: netlink.SCOPE_LINK,
}) })
// via routes
for _, route := range node.Routes {
routes = append(routes, netlink.Route{
LinkIndex: link.Attrs().Index,
Dst: &route,
Gw: node.OverlayAddr.IP,
Scope: netlink.SCOPE_SITE,
})
}
}
// then actually update the routing table
for _, route := range routes {
match := matchRoute(currentRoutes, route)
if match == nil {
netlink.RouteAdd(&route)
} else if match.Gw.String() != route.Gw.String() {
netlink.RouteReplace(&route)
}
}
for _, route := range routes {
// only delete a reoute if it is a site scope route that belongs to the routed net, mainly to
// avoid deleting otherwise manually set routes
if matchRoute(currentRoutes, route) == nil && route.Scope == netlink.SCOPE_LINK && routedNet.Contains(route.Dst.IP) {
netlink.RouteDel(&route)
}
} }
return nil return nil
@ -153,10 +186,20 @@ func (s *State) nodesToPeerConfigs(nodes []common.Node) ([]wgtypes.PeerConfig, e
IP: node.Addr, IP: node.Addr,
Port: s.Port, Port: s.Port,
}, },
AllowedIPs: []net.IPNet{ AllowedIPs: append([]net.IPNet{node.OverlayAddr}, node.Routes...),
node.OverlayAddr,
},
} }
} }
return peerCfgs, nil return peerCfgs, nil
} }
func matchRoute(set []netlink.Route, needle netlink.Route) *netlink.Route {
// routes are considered equal if they overlap and have the same prefix length
prefixn, _ := needle.Dst.Mask.Size()
for _, route := range set {
prefixr, _ := route.Dst.Mask.Size()
if prefixn == prefixr && route.Dst.Contains(needle.Dst.IP) {
return &route
}
}
return nil
}