Use localNode to store the local node name

Also rename setupDelegate to SetLocalNode, so that
main can call SetLocalNode then Update().
This commit is contained in:
kaiyou 2020-05-10 17:37:06 +02:00 committed by Leo Antunes
parent 28a31efc1f
commit 078946dbdd
2 changed files with 26 additions and 21 deletions

View File

@ -19,10 +19,9 @@ const KeyLen = 32
// Cluster represents a running cluster configuration // Cluster represents a running cluster configuration
type Cluster struct { type Cluster struct {
LocalName string // used to avoid LocalNode(); should not change
ml *memberlist.Memberlist ml *memberlist.Memberlist
mlConfig *memberlist.Config mlConfig *memberlist.Config
localNode common.Node localNode *common.Node
state *state state *state
events chan memberlist.NodeEvent events chan memberlist.NodeEvent
} }
@ -56,18 +55,22 @@ func New(init bool, clusterKey []byte, bindAddr string, bindPort int, useIPAsNam
} }
cluster := Cluster{ cluster := Cluster{
LocalName: ml.LocalNode().Name, ml: ml,
ml: ml, mlConfig: mlConfig,
mlConfig: mlConfig,
// The big channel buffer is a work-around for https://github.com/hashicorp/memberlist/issues/23 // The big channel buffer is a work-around for https://github.com/hashicorp/memberlist/issues/23
// More than this many simultaneous events will deadlock cluster.members() // More than this many simultaneous events will deadlock cluster.members()
events: make(chan memberlist.NodeEvent, 100), events: make(chan memberlist.NodeEvent, 100),
state: state, state: state,
} }
cluster.setupDelegate() cluster.SetLocalNode(&common.Node{})
return &cluster, nil return &cluster, nil
} }
// Name provides the current cluster name
func (c *Cluster) Name() string {
return c.localNode.Name
}
// Join tries to join the cluster by contacting provided addresses // Join tries to join the cluster by contacting provided addresses
// Provided addresses are passed as is, if no address is provided, known // Provided addresses are passed as is, if no address is provided, known
// cluster nodes are contacted instead. // cluster nodes are contacted instead.
@ -95,12 +98,8 @@ func (c *Cluster) Leave() {
c.ml.Shutdown() //nolint: errcheck c.ml.Shutdown() //nolint: errcheck
} }
// Update takes a new local node configuration into account // Update gossips the local node configuration, propagating any change
// If the node is already joined, update also gossips the new local node func (c *Cluster) Update() {
// configuration
func (c *Cluster) Update(localNode common.Node) {
c.localNode = localNode
c.setupDelegate()
c.ml.UpdateNode(1 * time.Second) // we currently do not update after creation c.ml.UpdateNode(1 * time.Second) // we currently do not update after creation
} }
@ -112,7 +111,7 @@ func (c *Cluster) Members() <-chan []common.Node {
go func() { go func() {
for { for {
event := <-c.events event := <-c.events
if event.Node.Name == c.LocalName { if event.Node.Name == c.localNode.Name {
// ignore events about ourselves // ignore events about ourselves
continue continue
} }
@ -127,7 +126,7 @@ func (c *Cluster) Members() <-chan []common.Node {
nodes := make([]common.Node, 0) nodes := make([]common.Node, 0)
for _, n := range c.ml.Members() { for _, n := range c.ml.Members() {
if n.Name == c.LocalName { if n.Name == c.localNode.Name {
continue continue
} }
nodes = append(nodes, common.Node{ nodes = append(nodes, common.Node{
@ -144,10 +143,15 @@ func (c *Cluster) Members() <-chan []common.Node {
return changes return changes
} }
func (c *Cluster) setupDelegate() { // SetLocalNode takes a new local node configuration into account
delegate := delegateNode{&c.localNode} // It also sets this node as the memberlist.Delegate implementation
c.mlConfig.Conflict = &delegate func (c *Cluster) SetLocalNode(localNode *common.Node) {
c.mlConfig.Delegate = &delegate c.localNode = localNode
c.localNode.Name = c.ml.LocalNode().Name
// wrap in a delegateNode instance for memberlist.Delegate implementation
delegate := &delegateNode{c.localNode}
c.mlConfig.Conflict = delegate
c.mlConfig.Delegate = delegate
c.mlConfig.Events = &memberlist.ChannelEventDelegate{Ch: c.events} c.mlConfig.Events = &memberlist.ChannelEventDelegate{Ch: c.events}
} }

View File

@ -50,11 +50,12 @@ func main() {
} }
// Assign a local node address and propagate it to the cluster // Assign a local node address and propagate it to the cluster
wgstate.AssignOverlayAddr((*net.IPNet)(config.OverlayNet), cluster.LocalName) wgstate.AssignOverlayAddr((*net.IPNet)(config.OverlayNet), cluster.Name())
localNode := common.Node{} localNode := &common.Node{}
localNode.OverlayAddr = wgstate.OverlayAddr localNode.OverlayAddr = wgstate.OverlayAddr
localNode.PubKey = wgstate.PubKey.String() localNode.PubKey = wgstate.PubKey.String()
cluster.Update(localNode) cluster.SetLocalNode(localNode)
cluster.Update()
// Join the cluster // Join the cluster
nodec := cluster.Members() // avoid deadlocks by starting before join nodec := cluster.Members() // avoid deadlocks by starting before join