rpc: Create the Client/Server
This commit is contained in:
parent
338a61d135
commit
0281c1108e
|
@ -0,0 +1,83 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"net/rpc"
|
||||||
|
|
||||||
|
"github.com/hashicorp/terraform/terraform"
|
||||||
|
"github.com/hashicorp/yamux"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client connects to a Server in order to request plugin implementations
|
||||||
|
// for Terraform.
|
||||||
|
type Client struct {
|
||||||
|
broker *muxBroker
|
||||||
|
control *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dial opens a connection to a Terraform RPC server and returns a client.
|
||||||
|
func Dial(network, address string) (*Client, error) {
|
||||||
|
conn, err := net.Dial(network, address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewClient(conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClient creates a client from an already-open connection-like value.
|
||||||
|
// Dial is typically used instead.
|
||||||
|
func NewClient(conn io.ReadWriteCloser) (*Client, error) {
|
||||||
|
// Create the yamux client so we can multiplex
|
||||||
|
mux, err := yamux.Client(conn, nil)
|
||||||
|
if err != nil {
|
||||||
|
conn.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect to the control stream.
|
||||||
|
control, err := mux.Open()
|
||||||
|
if err != nil {
|
||||||
|
mux.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the broker and start it up
|
||||||
|
broker := newMuxBroker(mux)
|
||||||
|
go broker.Run()
|
||||||
|
|
||||||
|
// Build the client using our broker and control channel.
|
||||||
|
return &Client{
|
||||||
|
broker: broker,
|
||||||
|
control: rpc.NewClient(control),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the connection. The client is no longer usable after this
|
||||||
|
// is called.
|
||||||
|
func (c *Client) Close() error {
|
||||||
|
if err := c.control.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.broker.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) ResourceProvider() (terraform.ResourceProvider, error) {
|
||||||
|
var id uint32
|
||||||
|
if err := c.control.Call(
|
||||||
|
"Dispenser.ResourceProvider", new(interface{}), &id); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := c.broker.Dial(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &ResourceProvider{
|
||||||
|
Client: rpc.NewClient(conn),
|
||||||
|
Name: "ResourceProvider",
|
||||||
|
}, nil
|
||||||
|
}
|
|
@ -0,0 +1,42 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/terraform/terraform"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestClient_ResourceProvider(t *testing.T) {
|
||||||
|
clientConn, serverConn := testConn(t)
|
||||||
|
|
||||||
|
p := new(terraform.MockResourceProvider)
|
||||||
|
server := &Server{ProviderFunc: testProviderFixed(p)}
|
||||||
|
go server.ServeConn(serverConn)
|
||||||
|
|
||||||
|
client, err := NewClient(clientConn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
provider, err := client.ResourceProvider()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure
|
||||||
|
config := &terraform.ResourceConfig{
|
||||||
|
Raw: map[string]interface{}{"foo": "bar"},
|
||||||
|
}
|
||||||
|
e := provider.Configure(config)
|
||||||
|
if !p.ConfigureCalled {
|
||||||
|
t.Fatal("configure should be called")
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(p.ConfigureConfig, config) {
|
||||||
|
t.Fatalf("bad: %#v", p.ConfigureConfig)
|
||||||
|
}
|
||||||
|
if e != nil {
|
||||||
|
t.Fatalf("bad: %#v", e)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,172 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/yamux"
|
||||||
|
)
|
||||||
|
|
||||||
|
// muxBroker is responsible for brokering multiplexed connections by unique ID.
|
||||||
|
//
|
||||||
|
// This allows a plugin to request a channel with a specific ID to connect to
|
||||||
|
// or accept a connection from, and the broker handles the details of
|
||||||
|
// holding these channels open while they're being negotiated.
|
||||||
|
type muxBroker struct {
|
||||||
|
nextId uint32
|
||||||
|
session *yamux.Session
|
||||||
|
streams map[uint32]*muxBrokerPending
|
||||||
|
|
||||||
|
sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type muxBrokerPending struct {
|
||||||
|
ch chan net.Conn
|
||||||
|
doneCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMuxBroker(s *yamux.Session) *muxBroker {
|
||||||
|
return &muxBroker{
|
||||||
|
session: s,
|
||||||
|
streams: make(map[uint32]*muxBrokerPending),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Accept accepts a connection by ID.
|
||||||
|
//
|
||||||
|
// This should not be called multiple times with the same ID at one time.
|
||||||
|
func (m *muxBroker) Accept(id uint32) (net.Conn, error) {
|
||||||
|
var c net.Conn
|
||||||
|
p := m.getStream(id)
|
||||||
|
select {
|
||||||
|
case c = <-p.ch:
|
||||||
|
close(p.doneCh)
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
delete(m.streams, id)
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("timeout waiting for accept")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ack our connection
|
||||||
|
if err := binary.Write(c, binary.LittleEndian, id); err != nil {
|
||||||
|
c.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the connection and all sub-connections.
|
||||||
|
func (m *muxBroker) Close() error {
|
||||||
|
return m.session.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dial opens a connection by ID.
|
||||||
|
func (m *muxBroker) Dial(id uint32) (net.Conn, error) {
|
||||||
|
// Open the stream
|
||||||
|
stream, err := m.session.OpenStream()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the stream ID onto the wire.
|
||||||
|
if err := binary.Write(stream, binary.LittleEndian, id); err != nil {
|
||||||
|
stream.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the ack that we connected. Then we're off!
|
||||||
|
var ack uint32
|
||||||
|
if err := binary.Read(stream, binary.LittleEndian, &ack); err != nil {
|
||||||
|
stream.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if ack != id {
|
||||||
|
stream.Close()
|
||||||
|
return nil, fmt.Errorf("bad ack: %d (expected %d)", ack, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextId returns a unique ID to use next.
|
||||||
|
func (m *muxBroker) NextId() uint32 {
|
||||||
|
return atomic.AddUint32(&m.nextId, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts the brokering and should be executed in a goroutine, since it
|
||||||
|
// blocks forever, or until the session closes.
|
||||||
|
func (m *muxBroker) Run() {
|
||||||
|
for {
|
||||||
|
stream, err := m.session.AcceptStream()
|
||||||
|
if err != nil {
|
||||||
|
// Once we receive an error, just exit
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the stream ID from the stream
|
||||||
|
var id uint32
|
||||||
|
if err := binary.Read(stream, binary.LittleEndian, &id); err != nil {
|
||||||
|
stream.Close()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the waiter
|
||||||
|
p := m.getStream(id)
|
||||||
|
select {
|
||||||
|
case p.ch <- stream:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for a timeout
|
||||||
|
go m.timeoutWait(id, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *muxBroker) getStream(id uint32) *muxBrokerPending {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
|
p, ok := m.streams[id]
|
||||||
|
if ok {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
m.streams[id] = &muxBrokerPending{
|
||||||
|
ch: make(chan net.Conn, 1),
|
||||||
|
doneCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
return m.streams[id]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *muxBroker) timeoutWait(id uint32, p *muxBrokerPending) {
|
||||||
|
// Wait for the stream to either be picked up and connected, or
|
||||||
|
// for a timeout.
|
||||||
|
timeout := false
|
||||||
|
select {
|
||||||
|
case <-p.doneCh:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
timeout = true
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
|
// Delete the stream so no one else can grab it
|
||||||
|
delete(m.streams, id)
|
||||||
|
|
||||||
|
// If we timed out, then check if we have a channel in the buffer,
|
||||||
|
// and if so, close it.
|
||||||
|
if timeout {
|
||||||
|
select {
|
||||||
|
case s := <-p.ch:
|
||||||
|
s.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"net/rpc"
|
"net/rpc"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/terraform/terraform"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testConn(t *testing.T) (net.Conn, net.Conn) {
|
func testConn(t *testing.T) (net.Conn, net.Conn) {
|
||||||
|
@ -43,3 +45,9 @@ func testClientServer(t *testing.T) (*rpc.Client, *rpc.Server) {
|
||||||
|
|
||||||
return client, server
|
return client, server
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testProviderFixed(p terraform.ResourceProvider) ProviderFunc {
|
||||||
|
return func() terraform.ResourceProvider {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,108 @@
|
||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"net/rpc"
|
||||||
|
|
||||||
|
"github.com/hashicorp/terraform/terraform"
|
||||||
|
"github.com/hashicorp/yamux"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Server listens for network connections and then dispenses interface
|
||||||
|
// implementations for Terraform over net/rpc.
|
||||||
|
type Server struct {
|
||||||
|
ProviderFunc ProviderFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProviderFunc creates terraform.ResourceProviders when they're requested
|
||||||
|
// from the server.
|
||||||
|
type ProviderFunc func() terraform.ResourceProvider
|
||||||
|
|
||||||
|
// Accept accepts connections on a listener and serves requests for
|
||||||
|
// each incoming connection. Accept blocks; the caller typically invokes
|
||||||
|
// it in a go statement.
|
||||||
|
func (s *Server) Accept(lis net.Listener) {
|
||||||
|
for {
|
||||||
|
conn, err := lis.Accept()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ERR] plugin server: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go s.ServeConn(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeConn runs a single connection.
|
||||||
|
//
|
||||||
|
// ServeConn blocks, serving the connection until the client hangs up.
|
||||||
|
func (s *Server) ServeConn(conn io.ReadWriteCloser) {
|
||||||
|
// First create the yamux server to wrap this connection
|
||||||
|
mux, err := yamux.Server(conn, nil)
|
||||||
|
if err != nil {
|
||||||
|
conn.Close()
|
||||||
|
log.Printf("[ERR] plugin: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Accept the control connection
|
||||||
|
control, err := mux.Accept()
|
||||||
|
if err != nil {
|
||||||
|
mux.Close()
|
||||||
|
log.Printf("[ERR] plugin: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the broker and start it up
|
||||||
|
broker := newMuxBroker(mux)
|
||||||
|
go broker.Run()
|
||||||
|
|
||||||
|
// Use the control connection to build the dispenser and serve the
|
||||||
|
// connection.
|
||||||
|
server := rpc.NewServer()
|
||||||
|
server.RegisterName("Dispenser", &dispenseServer{
|
||||||
|
ProviderFunc: s.ProviderFunc,
|
||||||
|
|
||||||
|
broker: broker,
|
||||||
|
})
|
||||||
|
server.ServeConn(control)
|
||||||
|
}
|
||||||
|
|
||||||
|
// dispenseServer dispenses variousinterface implementations for Terraform.
|
||||||
|
type dispenseServer struct {
|
||||||
|
ProviderFunc ProviderFunc
|
||||||
|
|
||||||
|
broker *muxBroker
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dispenseServer) ResourceProvider(
|
||||||
|
args interface{}, response *uint32) error {
|
||||||
|
id := d.broker.NextId()
|
||||||
|
*response = id
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
conn, err := d.broker.Accept(id)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ERR] Plugin dispense: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
d.serve(conn, "ResourceProvider", &ResourceProviderServer{
|
||||||
|
Provider: d.ProviderFunc(),
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dispenseServer) serve(conn io.ReadWriteCloser, name string, v interface{}) {
|
||||||
|
server := rpc.NewServer()
|
||||||
|
if err := server.RegisterName(name, v); err != nil {
|
||||||
|
log.Printf("[ERR] Plugin dispense: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
server.ServeConn(conn)
|
||||||
|
}
|
Loading…
Reference in New Issue