From 0281c1108e1303f69723859faa08b26eae0c12f2 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sun, 28 Sep 2014 10:49:27 -0700 Subject: [PATCH] rpc: Create the Client/Server --- plugin/{server.go => server_old.go} | 0 rpc/client.go | 83 ++++++++++++++ rpc/client_test.go | 42 +++++++ rpc/mux_broker.go | 172 ++++++++++++++++++++++++++++ rpc/rpc_test.go | 8 ++ rpc/server.go | 108 +++++++++++++++++ 6 files changed, 413 insertions(+) rename plugin/{server.go => server_old.go} (100%) create mode 100644 rpc/client.go create mode 100644 rpc/client_test.go create mode 100644 rpc/mux_broker.go create mode 100644 rpc/server.go diff --git a/plugin/server.go b/plugin/server_old.go similarity index 100% rename from plugin/server.go rename to plugin/server_old.go diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 000000000..259be4d76 --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,83 @@ +package rpc + +import ( + "io" + "net" + "net/rpc" + + "github.com/hashicorp/terraform/terraform" + "github.com/hashicorp/yamux" +) + +// Client connects to a Server in order to request plugin implementations +// for Terraform. +type Client struct { + broker *muxBroker + control *rpc.Client +} + +// Dial opens a connection to a Terraform RPC server and returns a client. +func Dial(network, address string) (*Client, error) { + conn, err := net.Dial(network, address) + if err != nil { + return nil, err + } + + return NewClient(conn) +} + +// NewClient creates a client from an already-open connection-like value. +// Dial is typically used instead. +func NewClient(conn io.ReadWriteCloser) (*Client, error) { + // Create the yamux client so we can multiplex + mux, err := yamux.Client(conn, nil) + if err != nil { + conn.Close() + return nil, err + } + + // Connect to the control stream. + control, err := mux.Open() + if err != nil { + mux.Close() + return nil, err + } + + // Create the broker and start it up + broker := newMuxBroker(mux) + go broker.Run() + + // Build the client using our broker and control channel. + return &Client{ + broker: broker, + control: rpc.NewClient(control), + }, nil +} + +// Close closes the connection. The client is no longer usable after this +// is called. +func (c *Client) Close() error { + if err := c.control.Close(); err != nil { + return err + } + + return c.broker.Close() +} + +func (c *Client) ResourceProvider() (terraform.ResourceProvider, error) { + var id uint32 + if err := c.control.Call( + "Dispenser.ResourceProvider", new(interface{}), &id); err != nil { + return nil, err + } + + conn, err := c.broker.Dial(id) + if err != nil { + return nil, err + } + + return &ResourceProvider{ + Client: rpc.NewClient(conn), + Name: "ResourceProvider", + }, nil +} diff --git a/rpc/client_test.go b/rpc/client_test.go new file mode 100644 index 000000000..5bb9bc3b2 --- /dev/null +++ b/rpc/client_test.go @@ -0,0 +1,42 @@ +package rpc + +import ( + "reflect" + "testing" + + "github.com/hashicorp/terraform/terraform" +) + +func TestClient_ResourceProvider(t *testing.T) { + clientConn, serverConn := testConn(t) + + p := new(terraform.MockResourceProvider) + server := &Server{ProviderFunc: testProviderFixed(p)} + go server.ServeConn(serverConn) + + client, err := NewClient(clientConn) + if err != nil { + t.Fatalf("err: %s", err) + } + defer client.Close() + + provider, err := client.ResourceProvider() + if err != nil { + t.Fatalf("err: %s", err) + } + + // Configure + config := &terraform.ResourceConfig{ + Raw: map[string]interface{}{"foo": "bar"}, + } + e := provider.Configure(config) + if !p.ConfigureCalled { + t.Fatal("configure should be called") + } + if !reflect.DeepEqual(p.ConfigureConfig, config) { + t.Fatalf("bad: %#v", p.ConfigureConfig) + } + if e != nil { + t.Fatalf("bad: %#v", e) + } +} diff --git a/rpc/mux_broker.go b/rpc/mux_broker.go new file mode 100644 index 000000000..639902a82 --- /dev/null +++ b/rpc/mux_broker.go @@ -0,0 +1,172 @@ +package rpc + +import ( + "encoding/binary" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/hashicorp/yamux" +) + +// muxBroker is responsible for brokering multiplexed connections by unique ID. +// +// This allows a plugin to request a channel with a specific ID to connect to +// or accept a connection from, and the broker handles the details of +// holding these channels open while they're being negotiated. +type muxBroker struct { + nextId uint32 + session *yamux.Session + streams map[uint32]*muxBrokerPending + + sync.Mutex +} + +type muxBrokerPending struct { + ch chan net.Conn + doneCh chan struct{} +} + +func newMuxBroker(s *yamux.Session) *muxBroker { + return &muxBroker{ + session: s, + streams: make(map[uint32]*muxBrokerPending), + } +} + +// Accept accepts a connection by ID. +// +// This should not be called multiple times with the same ID at one time. +func (m *muxBroker) Accept(id uint32) (net.Conn, error) { + var c net.Conn + p := m.getStream(id) + select { + case c = <-p.ch: + close(p.doneCh) + case <-time.After(5 * time.Second): + m.Lock() + defer m.Unlock() + delete(m.streams, id) + + return nil, fmt.Errorf("timeout waiting for accept") + } + + // Ack our connection + if err := binary.Write(c, binary.LittleEndian, id); err != nil { + c.Close() + return nil, err + } + + return c, nil +} + +// Close closes the connection and all sub-connections. +func (m *muxBroker) Close() error { + return m.session.Close() +} + +// Dial opens a connection by ID. +func (m *muxBroker) Dial(id uint32) (net.Conn, error) { + // Open the stream + stream, err := m.session.OpenStream() + if err != nil { + return nil, err + } + + // Write the stream ID onto the wire. + if err := binary.Write(stream, binary.LittleEndian, id); err != nil { + stream.Close() + return nil, err + } + + // Read the ack that we connected. Then we're off! + var ack uint32 + if err := binary.Read(stream, binary.LittleEndian, &ack); err != nil { + stream.Close() + return nil, err + } + if ack != id { + stream.Close() + return nil, fmt.Errorf("bad ack: %d (expected %d)", ack, id) + } + + return stream, nil +} + +// NextId returns a unique ID to use next. +func (m *muxBroker) NextId() uint32 { + return atomic.AddUint32(&m.nextId, 1) +} + +// Run starts the brokering and should be executed in a goroutine, since it +// blocks forever, or until the session closes. +func (m *muxBroker) Run() { + for { + stream, err := m.session.AcceptStream() + if err != nil { + // Once we receive an error, just exit + break + } + + // Read the stream ID from the stream + var id uint32 + if err := binary.Read(stream, binary.LittleEndian, &id); err != nil { + stream.Close() + continue + } + + // Initialize the waiter + p := m.getStream(id) + select { + case p.ch <- stream: + default: + } + + // Wait for a timeout + go m.timeoutWait(id, p) + } +} + +func (m *muxBroker) getStream(id uint32) *muxBrokerPending { + m.Lock() + defer m.Unlock() + + p, ok := m.streams[id] + if ok { + return p + } + + m.streams[id] = &muxBrokerPending{ + ch: make(chan net.Conn, 1), + doneCh: make(chan struct{}), + } + return m.streams[id] +} + +func (m *muxBroker) timeoutWait(id uint32, p *muxBrokerPending) { + // Wait for the stream to either be picked up and connected, or + // for a timeout. + timeout := false + select { + case <-p.doneCh: + case <-time.After(5 * time.Second): + timeout = true + } + + m.Lock() + defer m.Unlock() + + // Delete the stream so no one else can grab it + delete(m.streams, id) + + // If we timed out, then check if we have a channel in the buffer, + // and if so, close it. + if timeout { + select { + case s := <-p.ch: + s.Close() + } + } +} diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 80f3aeaef..3006dfbec 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -4,6 +4,8 @@ import ( "net" "net/rpc" "testing" + + "github.com/hashicorp/terraform/terraform" ) func testConn(t *testing.T) (net.Conn, net.Conn) { @@ -43,3 +45,9 @@ func testClientServer(t *testing.T) (*rpc.Client, *rpc.Server) { return client, server } + +func testProviderFixed(p terraform.ResourceProvider) ProviderFunc { + return func() terraform.ResourceProvider { + return p + } +} diff --git a/rpc/server.go b/rpc/server.go new file mode 100644 index 000000000..705e6e0fa --- /dev/null +++ b/rpc/server.go @@ -0,0 +1,108 @@ +package rpc + +import ( + "io" + "log" + "net" + "net/rpc" + + "github.com/hashicorp/terraform/terraform" + "github.com/hashicorp/yamux" +) + +// Server listens for network connections and then dispenses interface +// implementations for Terraform over net/rpc. +type Server struct { + ProviderFunc ProviderFunc +} + +// ProviderFunc creates terraform.ResourceProviders when they're requested +// from the server. +type ProviderFunc func() terraform.ResourceProvider + +// Accept accepts connections on a listener and serves requests for +// each incoming connection. Accept blocks; the caller typically invokes +// it in a go statement. +func (s *Server) Accept(lis net.Listener) { + for { + conn, err := lis.Accept() + if err != nil { + log.Printf("[ERR] plugin server: %s", err) + return + } + + go s.ServeConn(conn) + } +} + +// ServeConn runs a single connection. +// +// ServeConn blocks, serving the connection until the client hangs up. +func (s *Server) ServeConn(conn io.ReadWriteCloser) { + // First create the yamux server to wrap this connection + mux, err := yamux.Server(conn, nil) + if err != nil { + conn.Close() + log.Printf("[ERR] plugin: %s", err) + return + } + + // Accept the control connection + control, err := mux.Accept() + if err != nil { + mux.Close() + log.Printf("[ERR] plugin: %s", err) + return + } + + // Create the broker and start it up + broker := newMuxBroker(mux) + go broker.Run() + + // Use the control connection to build the dispenser and serve the + // connection. + server := rpc.NewServer() + server.RegisterName("Dispenser", &dispenseServer{ + ProviderFunc: s.ProviderFunc, + + broker: broker, + }) + server.ServeConn(control) +} + +// dispenseServer dispenses variousinterface implementations for Terraform. +type dispenseServer struct { + ProviderFunc ProviderFunc + + broker *muxBroker +} + +func (d *dispenseServer) ResourceProvider( + args interface{}, response *uint32) error { + id := d.broker.NextId() + *response = id + + go func() { + conn, err := d.broker.Accept(id) + if err != nil { + log.Printf("[ERR] Plugin dispense: %s", err) + return + } + + d.serve(conn, "ResourceProvider", &ResourceProviderServer{ + Provider: d.ProviderFunc(), + }) + }() + + return nil +} + +func (d *dispenseServer) serve(conn io.ReadWriteCloser, name string, v interface{}) { + server := rpc.NewServer() + if err := server.RegisterName(name, v); err != nil { + log.Printf("[ERR] Plugin dispense: %s", err) + return + } + + server.ServeConn(conn) +}