Reinstante cluster.LocalNode

This makes for more straightforward calls. Also, generate
the localNode directly in wg.New.
This commit is contained in:
kaiyou 2020-05-12 19:10:25 +02:00 committed by Leo Antunes
parent a0aa0bcc0e
commit e9b9239eae
3 changed files with 22 additions and 34 deletions

View File

@ -22,6 +22,7 @@ type Cluster struct {
ml *memberlist.Memberlist ml *memberlist.Memberlist
mlConfig *memberlist.Config mlConfig *memberlist.Config
localNode *common.Node localNode *common.Node
LocalName string
state *state state *state
events chan memberlist.NodeEvent events chan memberlist.NodeEvent
} }
@ -55,14 +56,14 @@ func New(init bool, clusterKey []byte, bindAddr string, bindPort int, useIPAsNam
} }
cluster := Cluster{ cluster := Cluster{
ml: ml, ml: ml,
mlConfig: mlConfig, mlConfig: mlConfig,
LocalName: ml.LocalNode().Name,
// The big channel buffer is a work-around for https://github.com/hashicorp/memberlist/issues/23 // The big channel buffer is a work-around for https://github.com/hashicorp/memberlist/issues/23
// More than this many simultaneous events will deadlock cluster.members() // More than this many simultaneous events will deadlock cluster.members()
events: make(chan memberlist.NodeEvent, 100), events: make(chan memberlist.NodeEvent, 100),
state: state, state: state,
} }
cluster.SetLocalNode(&common.Node{})
return &cluster, nil return &cluster, nil
} }
@ -99,7 +100,13 @@ func (c *Cluster) Leave() {
} }
// Update gossips the local node configuration, propagating any change // Update gossips the local node configuration, propagating any change
func (c *Cluster) Update() { func (c *Cluster) Update(localNode *common.Node) {
c.localNode = localNode
// wrap in a delegateNode instance for memberlist.Delegate implementation
delegate := &delegateNode{c.localNode}
c.mlConfig.Conflict = delegate
c.mlConfig.Delegate = delegate
c.mlConfig.Events = &memberlist.ChannelEventDelegate{Ch: c.events}
c.ml.UpdateNode(1 * time.Second) // we currently do not update after creation c.ml.UpdateNode(1 * time.Second) // we currently do not update after creation
} }
@ -111,7 +118,7 @@ func (c *Cluster) Members() <-chan []common.Node {
go func() { go func() {
for { for {
event := <-c.events event := <-c.events
if event.Node.Name == c.localNode.Name { if event.Node.Name == c.LocalName {
// ignore events about ourselves // ignore events about ourselves
continue continue
} }
@ -126,7 +133,7 @@ func (c *Cluster) Members() <-chan []common.Node {
nodes := make([]common.Node, 0) nodes := make([]common.Node, 0)
for _, n := range c.ml.Members() { for _, n := range c.ml.Members() {
if n.Name == c.localNode.Name { if n.Name == c.LocalName {
continue continue
} }
nodes = append(nodes, common.Node{ nodes = append(nodes, common.Node{
@ -143,18 +150,6 @@ func (c *Cluster) Members() <-chan []common.Node {
return changes return changes
} }
// SetLocalNode takes a new local node configuration into account
// It also sets this node as the memberlist.Delegate implementation
func (c *Cluster) SetLocalNode(localNode *common.Node) {
c.localNode = localNode
c.localNode.Name = c.ml.LocalNode().Name
// wrap in a delegateNode instance for memberlist.Delegate implementation
delegate := &delegateNode{c.localNode}
c.mlConfig.Conflict = delegate
c.mlConfig.Delegate = delegate
c.mlConfig.Events = &memberlist.ChannelEventDelegate{Ch: c.events}
}
func computeClusterKey(state *state, clusterKey []byte) ([]byte, error) { func computeClusterKey(state *state, clusterKey []byte) ([]byte, error) {
if len(clusterKey) == 0 { if len(clusterKey) == 0 {
clusterKey = state.ClusterKey clusterKey = state.ClusterKey

View File

@ -39,7 +39,7 @@ func main() {
if err != nil { if err != nil {
logrus.WithError(err).Fatal("could not create cluster") logrus.WithError(err).Fatal("could not create cluster")
} }
wgstate, err := wg.New(config.Interface, config.WireguardPort, (*net.IPNet)(config.OverlayNet), cluster.Name()) wgstate, localNode, err := wg.New(config.Interface, config.WireguardPort, (*net.IPNet)(config.OverlayNet), cluster.LocalName)
if err != nil { if err != nil {
logrus.WithError(err).Fatal("could not instantiate wireguard controller") logrus.WithError(err).Fatal("could not instantiate wireguard controller")
} }
@ -49,13 +49,8 @@ func main() {
Logger: logrus.StandardLogger(), Logger: logrus.StandardLogger(),
} }
// Assign a local node address and propagate it to the cluster
localNode := &common.Node{}
wgstate.UpdateNode(localNode)
cluster.SetLocalNode(localNode)
cluster.Update()
// Join the cluster // Join the cluster
cluster.Update(localNode)
nodec := cluster.Members() // avoid deadlocks by starting before join nodec := cluster.Members() // avoid deadlocks by starting before join
if err := backoff.RetryNotify( if err := backoff.RetryNotify(
func() error { return cluster.Join(config.Join) }, func() error { return cluster.Join(config.Join) },

View File

@ -25,15 +25,15 @@ type State struct {
// New creates a new Wesher Wireguard state // New creates a new Wesher Wireguard state
// The Wireguard keys are generated for every new interface // The Wireguard keys are generated for every new interface
// The interface must later be setup using SetUpInterface // The interface must later be setup using SetUpInterface
func New(iface string, port int, ipnet *net.IPNet, name string) (*State, error) { func New(iface string, port int, ipnet *net.IPNet, name string) (*State, *common.Node, error) {
client, err := wgctrl.New() client, err := wgctrl.New()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "could not instantiate wireguard client") return nil, nil, errors.Wrap(err, "could not instantiate wireguard client")
} }
privKey, err := wgtypes.GeneratePrivateKey() privKey, err := wgtypes.GeneratePrivateKey()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
pubKey := privKey.PublicKey() pubKey := privKey.PublicKey()
@ -46,13 +46,11 @@ func New(iface string, port int, ipnet *net.IPNet, name string) (*State, error)
} }
state.assignOverlayAddr(ipnet, name) state.assignOverlayAddr(ipnet, name)
return &state, nil node := &common.Node{}
} node.OverlayAddr = state.OverlayAddr
node.PubKey = state.PubKey.String()
// UpdateNode populates a node instance with wireguard specific fields return &state, node, nil
func (s *State) UpdateNode(node *common.Node) {
node.OverlayAddr = s.OverlayAddr
node.PubKey = s.PubKey.String()
} }
// assignOverlayAddr assigns a new address to the interface // assignOverlayAddr assigns a new address to the interface