diff --git a/cluster/cluster.go b/cluster/cluster.go index 75c8d0e..873a847 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -19,10 +19,9 @@ const KeyLen = 32 // Cluster represents a running cluster configuration type Cluster struct { - LocalName string // used to avoid LocalNode(); should not change ml *memberlist.Memberlist mlConfig *memberlist.Config - localNode common.Node + localNode *common.Node state *state events chan memberlist.NodeEvent } @@ -56,18 +55,22 @@ func New(init bool, clusterKey []byte, bindAddr string, bindPort int, useIPAsNam } cluster := Cluster{ - LocalName: ml.LocalNode().Name, - ml: ml, - mlConfig: mlConfig, + ml: ml, + mlConfig: mlConfig, // The big channel buffer is a work-around for https://github.com/hashicorp/memberlist/issues/23 // More than this many simultaneous events will deadlock cluster.members() events: make(chan memberlist.NodeEvent, 100), state: state, } - cluster.setupDelegate() + cluster.SetLocalNode(&common.Node{}) return &cluster, nil } +// Name provides the current cluster name +func (c *Cluster) Name() string { + return c.localNode.Name +} + // Join tries to join the cluster by contacting provided addresses // Provided addresses are passed as is, if no address is provided, known // cluster nodes are contacted instead. @@ -95,12 +98,8 @@ func (c *Cluster) Leave() { c.ml.Shutdown() //nolint: errcheck } -// Update takes a new local node configuration into account -// If the node is already joined, update also gossips the new local node -// configuration -func (c *Cluster) Update(localNode common.Node) { - c.localNode = localNode - c.setupDelegate() +// Update gossips the local node configuration, propagating any change +func (c *Cluster) Update() { c.ml.UpdateNode(1 * time.Second) // we currently do not update after creation } @@ -112,7 +111,7 @@ func (c *Cluster) Members() <-chan []common.Node { go func() { for { event := <-c.events - if event.Node.Name == c.LocalName { + if event.Node.Name == c.localNode.Name { // ignore events about ourselves continue } @@ -127,7 +126,7 @@ func (c *Cluster) Members() <-chan []common.Node { nodes := make([]common.Node, 0) for _, n := range c.ml.Members() { - if n.Name == c.LocalName { + if n.Name == c.localNode.Name { continue } nodes = append(nodes, common.Node{ @@ -144,10 +143,15 @@ func (c *Cluster) Members() <-chan []common.Node { return changes } -func (c *Cluster) setupDelegate() { - delegate := delegateNode{&c.localNode} - c.mlConfig.Conflict = &delegate - c.mlConfig.Delegate = &delegate +// SetLocalNode takes a new local node configuration into account +// It also sets this node as the memberlist.Delegate implementation +func (c *Cluster) SetLocalNode(localNode *common.Node) { + c.localNode = localNode + c.localNode.Name = c.ml.LocalNode().Name + // wrap in a delegateNode instance for memberlist.Delegate implementation + delegate := &delegateNode{c.localNode} + c.mlConfig.Conflict = delegate + c.mlConfig.Delegate = delegate c.mlConfig.Events = &memberlist.ChannelEventDelegate{Ch: c.events} } diff --git a/main.go b/main.go index 7f3b2c8..2a1655a 100644 --- a/main.go +++ b/main.go @@ -50,11 +50,12 @@ func main() { } // Assign a local node address and propagate it to the cluster - wgstate.AssignOverlayAddr((*net.IPNet)(config.OverlayNet), cluster.LocalName) - localNode := common.Node{} + wgstate.AssignOverlayAddr((*net.IPNet)(config.OverlayNet), cluster.Name()) + localNode := &common.Node{} localNode.OverlayAddr = wgstate.OverlayAddr localNode.PubKey = wgstate.PubKey.String() - cluster.Update(localNode) + cluster.SetLocalNode(localNode) + cluster.Update() // Join the cluster nodec := cluster.Members() // avoid deadlocks by starting before join