diff --git a/vendor/github.com/hashicorp/go-plugin/README.md b/vendor/github.com/hashicorp/go-plugin/README.md index 78d354ed2..e4558dbc5 100644 --- a/vendor/github.com/hashicorp/go-plugin/README.md +++ b/vendor/github.com/hashicorp/go-plugin/README.md @@ -76,7 +76,7 @@ must be properly secured to protect this configuration. ## Architecture The HashiCorp plugin system works by launching subprocesses and communicating -over RPC (using standard `net/rpc` or [gRPC](http://www.grpc.io). A single +over RPC (using standard `net/rpc` or [gRPC](http://www.grpc.io)). A single connection is made between any plugin and the host process. For net/rpc-based plugins, we use a [connection multiplexing](https://github.com/hashicorp/yamux) library to multiplex any other connections on top. For gRPC-based plugins, diff --git a/vendor/github.com/hashicorp/go-plugin/client.go b/vendor/github.com/hashicorp/go-plugin/client.go index b912826b2..b3e3b78ea 100644 --- a/vendor/github.com/hashicorp/go-plugin/client.go +++ b/vendor/github.com/hashicorp/go-plugin/client.go @@ -2,6 +2,7 @@ package plugin import ( "bufio" + "context" "crypto/subtle" "crypto/tls" "errors" @@ -79,6 +80,7 @@ type Client struct { client ClientProtocol protocol Protocol logger hclog.Logger + doneCtx context.Context } // ClientConfig is the configuration used to initialize a new @@ -310,7 +312,7 @@ func (c *Client) Client() (ClientProtocol, error) { c.client, err = newRPCClient(c) case ProtocolGRPC: - c.client, err = newGRPCClient(c) + c.client, err = newGRPCClient(c.doneCtx, c) default: return nil, fmt.Errorf("unknown server protocol: %s", c.protocol) @@ -423,6 +425,9 @@ func (c *Client) Start() (addr net.Addr, err error) { // Create the logging channel for when we kill c.doneLogging = make(chan struct{}) + // Create a context for when we kill + var ctxCancel context.CancelFunc + c.doneCtx, ctxCancel = context.WithCancel(context.Background()) if c.config.Reattach != nil { // Verify the process still exists. If not, then it is an error @@ -457,6 +462,9 @@ func (c *Client) Start() (addr net.Addr, err error) { // Close the logging channel since that doesn't work on reattach close(c.doneLogging) + + // Cancel the context + ctxCancel() }(p.Pid) // Set the address and process @@ -535,6 +543,9 @@ func (c *Client) Start() (addr net.Addr, err error) { // Mark that we exited close(exitCh) + // Cancel the context, marking that we exited + ctxCancel() + // Set that we exited, which takes a lock c.l.Lock() defer c.l.Unlock() @@ -606,7 +617,7 @@ func (c *Client) Start() (addr net.Addr, err error) { if int(coreProtocol) != CoreProtocolVersion { err = fmt.Errorf("Incompatible core API version with plugin. "+ - "Plugin version: %s, Ours: %d\n\n"+ + "Plugin version: %s, Core version: %d\n\n"+ "To fix this, the plugin usually only needs to be recompiled.\n"+ "Please report this to the plugin author.", parts[0], CoreProtocolVersion) return @@ -624,7 +635,7 @@ func (c *Client) Start() (addr net.Addr, err error) { // Test the API version if uint(protocol) != c.config.ProtocolVersion { err = fmt.Errorf("Incompatible API version with plugin. "+ - "Plugin version: %s, Ours: %d", parts[1], c.config.ProtocolVersion) + "Plugin version: %s, Core version: %d", parts[1], c.config.ProtocolVersion) return } @@ -707,18 +718,29 @@ func (c *Client) Protocol() Protocol { return c.protocol } +func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) { + return func(_ string, _ time.Duration) (net.Conn, error) { + // Connect to the client + conn, err := net.Dial(addr.Network(), addr.String()) + if err != nil { + return nil, err + } + if tcpConn, ok := conn.(*net.TCPConn); ok { + // Make sure to set keep alive so that the connection doesn't die + tcpConn.SetKeepAlive(true) + } + + return conn, nil + } +} + // dialer is compatible with grpc.WithDialer and creates the connection // to the plugin. func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) { - // Connect to the client - conn, err := net.Dial(c.address.Network(), c.address.String()) + conn, err := netAddrDialer(c.address)("", timeout) if err != nil { return nil, err } - if tcpConn, ok := conn.(*net.TCPConn); ok { - // Make sure to set keep alive so that the connection doesn't die - tcpConn.SetKeepAlive(true) - } // If we have a TLS config we wrap our connection. We only do this // for net/rpc since gRPC uses its own mechanism for TLS. diff --git a/vendor/github.com/hashicorp/go-plugin/grpc_broker.go b/vendor/github.com/hashicorp/go-plugin/grpc_broker.go new file mode 100644 index 000000000..49fd21c61 --- /dev/null +++ b/vendor/github.com/hashicorp/go-plugin/grpc_broker.go @@ -0,0 +1,455 @@ +package plugin + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "log" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/oklog/run" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +// streamer interface is used in the broker to send/receive connection +// information. +type streamer interface { + Send(*ConnInfo) error + Recv() (*ConnInfo, error) + Close() +} + +// sendErr is used to pass errors back during a send. +type sendErr struct { + i *ConnInfo + ch chan error +} + +// gRPCBrokerServer is used by the plugin to start a stream and to send +// connection information to/from the plugin. Implements GRPCBrokerServer and +// streamer interfaces. +type gRPCBrokerServer struct { + // send is used to send connection info to the gRPC stream. + send chan *sendErr + + // recv is used to receive connection info from the gRPC stream. + recv chan *ConnInfo + + // quit closes down the stream. + quit chan struct{} + + // o is used to ensure we close the quit channel only once. + o sync.Once +} + +func newGRPCBrokerServer() *gRPCBrokerServer { + return &gRPCBrokerServer{ + send: make(chan *sendErr), + recv: make(chan *ConnInfo), + quit: make(chan struct{}), + } +} + +// StartStream implements the GRPCBrokerServer interface and will block until +// the quit channel is closed or the context reports Done. The stream will pass +// connection information to/from the client. +func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error { + doneCh := stream.Context().Done() + defer s.Close() + + // Proccess send stream + go func() { + for { + select { + case <-doneCh: + return + case <-s.quit: + return + case se := <-s.send: + err := stream.Send(se.i) + se.ch <- err + } + } + }() + + // Process receive stream + for { + i, err := stream.Recv() + if err != nil { + return err + } + select { + case <-doneCh: + return nil + case <-s.quit: + return nil + case s.recv <- i: + } + } + + return nil +} + +// Send is used by the GRPCBroker to pass connection information into the stream +// to the client. +func (s *gRPCBrokerServer) Send(i *ConnInfo) error { + ch := make(chan error) + defer close(ch) + + select { + case <-s.quit: + return errors.New("broker closed") + case s.send <- &sendErr{ + i: i, + ch: ch, + }: + } + + return <-ch +} + +// Recv is used by the GRPCBroker to pass connection information that has been +// sent from the client from the stream to the broker. +func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) { + select { + case <-s.quit: + return nil, errors.New("broker closed") + case i := <-s.recv: + return i, nil + } +} + +// Close closes the quit channel, shutting down the stream. +func (s *gRPCBrokerServer) Close() { + s.o.Do(func() { + close(s.quit) + }) +} + +// gRPCBrokerClientImpl is used by the client to start a stream and to send +// connection information to/from the client. Implements GRPCBrokerClient and +// streamer interfaces. +type gRPCBrokerClientImpl struct { + // client is the underlying GRPC client used to make calls to the server. + client GRPCBrokerClient + + // send is used to send connection info to the gRPC stream. + send chan *sendErr + + // recv is used to receive connection info from the gRPC stream. + recv chan *ConnInfo + + // quit closes down the stream. + quit chan struct{} + + // o is used to ensure we close the quit channel only once. + o sync.Once +} + +func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl { + return &gRPCBrokerClientImpl{ + client: NewGRPCBrokerClient(conn), + send: make(chan *sendErr), + recv: make(chan *ConnInfo), + quit: make(chan struct{}), + } +} + +// StartStream implements the GRPCBrokerClient interface and will block until +// the quit channel is closed or the context reports Done. The stream will pass +// connection information to/from the plugin. +func (s *gRPCBrokerClientImpl) StartStream() error { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + defer s.Close() + + stream, err := s.client.StartStream(ctx) + if err != nil { + return err + } + doneCh := stream.Context().Done() + + go func() { + for { + select { + case <-doneCh: + return + case <-s.quit: + return + case se := <-s.send: + err := stream.Send(se.i) + se.ch <- err + } + } + }() + + for { + i, err := stream.Recv() + if err != nil { + return err + } + select { + case <-doneCh: + return nil + case <-s.quit: + return nil + case s.recv <- i: + } + } + + return nil +} + +// Send is used by the GRPCBroker to pass connection information into the stream +// to the plugin. +func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error { + ch := make(chan error) + defer close(ch) + + select { + case <-s.quit: + return errors.New("broker closed") + case s.send <- &sendErr{ + i: i, + ch: ch, + }: + } + + return <-ch +} + +// Recv is used by the GRPCBroker to pass connection information that has been +// sent from the plugin to the broker. +func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) { + select { + case <-s.quit: + return nil, errors.New("broker closed") + case i := <-s.recv: + return i, nil + } +} + +// Close closes the quit channel, shutting down the stream. +func (s *gRPCBrokerClientImpl) Close() { + s.o.Do(func() { + close(s.quit) + }) +} + +// GRPCBroker is responsible for brokering connections by unique ID. +// +// It is used by plugins to create multiple gRPC connections and data +// streams between the plugin process and the host process. +// +// This allows a plugin to request a channel with a specific ID to connect to +// or accept a connection from, and the broker handles the details of +// holding these channels open while they're being negotiated. +// +// The Plugin interface has access to these for both Server and Client. +// The broker can be used by either (optionally) to reserve and connect to +// new streams. This is useful for complex args and return values, +// or anything else you might need a data stream for. +type GRPCBroker struct { + nextId uint32 + streamer streamer + streams map[uint32]*gRPCBrokerPending + tls *tls.Config + doneCh chan struct{} + o sync.Once + + sync.Mutex +} + +type gRPCBrokerPending struct { + ch chan *ConnInfo + doneCh chan struct{} +} + +func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker { + return &GRPCBroker{ + streamer: s, + streams: make(map[uint32]*gRPCBrokerPending), + tls: tls, + doneCh: make(chan struct{}), + } +} + +// Accept accepts a connection by ID. +// +// This should not be called multiple times with the same ID at one time. +func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) { + listener, err := serverListener() + if err != nil { + return nil, err + } + + err = b.streamer.Send(&ConnInfo{ + ServiceId: id, + Network: listener.Addr().Network(), + Address: listener.Addr().String(), + }) + if err != nil { + return nil, err + } + + return listener, nil +} + +// AcceptAndServe is used to accept a specific stream ID and immediately +// serve a gRPC server on that stream ID. This is used to easily serve +// complex arguments. Each AcceptAndServe call opens a new listener socket and +// sends the connection info down the stream to the dialer. Since a new +// connection is opened every call, these calls should be used sparingly. +// Multiple gRPC server implementations can be registered to a single +// AcceptAndServe call. +func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) { + listener, err := b.Accept(id) + if err != nil { + log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err) + return + } + defer listener.Close() + + var opts []grpc.ServerOption + if b.tls != nil { + opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))} + } + + server := s(opts) + + // Here we use a run group to close this goroutine if the server is shutdown + // or the broker is shutdown. + var g run.Group + { + // Serve on the listener, if shutting down call GracefulStop. + g.Add(func() error { + return server.Serve(listener) + }, func(err error) { + server.GracefulStop() + }) + } + { + // block on the closeCh or the doneCh. If we are shutting down close the + // closeCh. + closeCh := make(chan struct{}) + g.Add(func() error { + select { + case <-b.doneCh: + case <-closeCh: + } + return nil + }, func(err error) { + close(closeCh) + }) + } + + // Block until we are done + g.Run() +} + +// Close closes the stream and all servers. +func (b *GRPCBroker) Close() error { + b.streamer.Close() + b.o.Do(func() { + close(b.doneCh) + }) + return nil +} + +// Dial opens a connection by ID. +func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) { + var c *ConnInfo + + // Open the stream + p := b.getStream(id) + select { + case c = <-p.ch: + close(p.doneCh) + case <-time.After(5 * time.Second): + return nil, fmt.Errorf("timeout waiting for connection info") + } + + var addr net.Addr + switch c.Network { + case "tcp": + addr, err = net.ResolveTCPAddr("tcp", c.Address) + case "unix": + addr, err = net.ResolveUnixAddr("unix", c.Address) + default: + err = fmt.Errorf("Unknown address type: %s", c.Address) + } + if err != nil { + return nil, err + } + + return dialGRPCConn(b.tls, netAddrDialer(addr)) +} + +// NextId returns a unique ID to use next. +// +// It is possible for very long-running plugin hosts to wrap this value, +// though it would require a very large amount of calls. In practice +// we've never seen it happen. +func (m *GRPCBroker) NextId() uint32 { + return atomic.AddUint32(&m.nextId, 1) +} + +// Run starts the brokering and should be executed in a goroutine, since it +// blocks forever, or until the session closes. +// +// Uses of GRPCBroker never need to call this. It is called internally by +// the plugin host/client. +func (m *GRPCBroker) Run() { + for { + stream, err := m.streamer.Recv() + if err != nil { + // Once we receive an error, just exit + break + } + + // Initialize the waiter + p := m.getStream(stream.ServiceId) + select { + case p.ch <- stream: + default: + } + + go m.timeoutWait(stream.ServiceId, p) + } +} + +func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending { + m.Lock() + defer m.Unlock() + + p, ok := m.streams[id] + if ok { + return p + } + + m.streams[id] = &gRPCBrokerPending{ + ch: make(chan *ConnInfo, 1), + doneCh: make(chan struct{}), + } + return m.streams[id] +} + +func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) { + // Wait for the stream to either be picked up and connected, or + // for a timeout. + select { + case <-p.doneCh: + case <-time.After(5 * time.Second): + } + + m.Lock() + defer m.Unlock() + + // Delete the stream so no one else can grab it + delete(m.streams, id) +} diff --git a/vendor/github.com/hashicorp/go-plugin/grpc_broker.pb.go b/vendor/github.com/hashicorp/go-plugin/grpc_broker.pb.go new file mode 100644 index 000000000..d490dafba --- /dev/null +++ b/vendor/github.com/hashicorp/go-plugin/grpc_broker.pb.go @@ -0,0 +1,190 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: grpc_broker.proto + +/* +Package plugin is a generated protocol buffer package. + +It is generated from these files: + grpc_broker.proto + +It has these top-level messages: + ConnInfo +*/ +package plugin + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type ConnInfo struct { + ServiceId uint32 `protobuf:"varint,1,opt,name=service_id,json=serviceId" json:"service_id,omitempty"` + Network string `protobuf:"bytes,2,opt,name=network" json:"network,omitempty"` + Address string `protobuf:"bytes,3,opt,name=address" json:"address,omitempty"` +} + +func (m *ConnInfo) Reset() { *m = ConnInfo{} } +func (m *ConnInfo) String() string { return proto.CompactTextString(m) } +func (*ConnInfo) ProtoMessage() {} +func (*ConnInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *ConnInfo) GetServiceId() uint32 { + if m != nil { + return m.ServiceId + } + return 0 +} + +func (m *ConnInfo) GetNetwork() string { + if m != nil { + return m.Network + } + return "" +} + +func (m *ConnInfo) GetAddress() string { + if m != nil { + return m.Address + } + return "" +} + +func init() { + proto.RegisterType((*ConnInfo)(nil), "plugin.ConnInfo") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for GRPCBroker service + +type GRPCBrokerClient interface { + StartStream(ctx context.Context, opts ...grpc.CallOption) (GRPCBroker_StartStreamClient, error) +} + +type gRPCBrokerClient struct { + cc *grpc.ClientConn +} + +func NewGRPCBrokerClient(cc *grpc.ClientConn) GRPCBrokerClient { + return &gRPCBrokerClient{cc} +} + +func (c *gRPCBrokerClient) StartStream(ctx context.Context, opts ...grpc.CallOption) (GRPCBroker_StartStreamClient, error) { + stream, err := grpc.NewClientStream(ctx, &_GRPCBroker_serviceDesc.Streams[0], c.cc, "/plugin.GRPCBroker/StartStream", opts...) + if err != nil { + return nil, err + } + x := &gRPCBrokerStartStreamClient{stream} + return x, nil +} + +type GRPCBroker_StartStreamClient interface { + Send(*ConnInfo) error + Recv() (*ConnInfo, error) + grpc.ClientStream +} + +type gRPCBrokerStartStreamClient struct { + grpc.ClientStream +} + +func (x *gRPCBrokerStartStreamClient) Send(m *ConnInfo) error { + return x.ClientStream.SendMsg(m) +} + +func (x *gRPCBrokerStartStreamClient) Recv() (*ConnInfo, error) { + m := new(ConnInfo) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for GRPCBroker service + +type GRPCBrokerServer interface { + StartStream(GRPCBroker_StartStreamServer) error +} + +func RegisterGRPCBrokerServer(s *grpc.Server, srv GRPCBrokerServer) { + s.RegisterService(&_GRPCBroker_serviceDesc, srv) +} + +func _GRPCBroker_StartStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(GRPCBrokerServer).StartStream(&gRPCBrokerStartStreamServer{stream}) +} + +type GRPCBroker_StartStreamServer interface { + Send(*ConnInfo) error + Recv() (*ConnInfo, error) + grpc.ServerStream +} + +type gRPCBrokerStartStreamServer struct { + grpc.ServerStream +} + +func (x *gRPCBrokerStartStreamServer) Send(m *ConnInfo) error { + return x.ServerStream.SendMsg(m) +} + +func (x *gRPCBrokerStartStreamServer) Recv() (*ConnInfo, error) { + m := new(ConnInfo) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _GRPCBroker_serviceDesc = grpc.ServiceDesc{ + ServiceName: "plugin.GRPCBroker", + HandlerType: (*GRPCBrokerServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "StartStream", + Handler: _GRPCBroker_StartStream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "grpc_broker.proto", +} + +func init() { proto.RegisterFile("grpc_broker.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 170 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x2f, 0x2a, 0x48, + 0x8e, 0x4f, 0x2a, 0xca, 0xcf, 0x4e, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2b, + 0xc8, 0x29, 0x4d, 0xcf, 0xcc, 0x53, 0x8a, 0xe5, 0xe2, 0x70, 0xce, 0xcf, 0xcb, 0xf3, 0xcc, 0x4b, + 0xcb, 0x17, 0x92, 0xe5, 0xe2, 0x2a, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x8d, 0xcf, 0x4c, 0x91, + 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0d, 0xe2, 0x84, 0x8a, 0x78, 0xa6, 0x08, 0x49, 0x70, 0xb1, 0xe7, + 0xa5, 0x96, 0x94, 0xe7, 0x17, 0x65, 0x4b, 0x30, 0x29, 0x30, 0x6a, 0x70, 0x06, 0xc1, 0xb8, 0x20, + 0x99, 0xc4, 0x94, 0x94, 0xa2, 0xd4, 0xe2, 0x62, 0x09, 0x66, 0x88, 0x0c, 0x94, 0x6b, 0xe4, 0xcc, + 0xc5, 0xe5, 0x1e, 0x14, 0xe0, 0xec, 0x04, 0xb6, 0x5a, 0xc8, 0x94, 0x8b, 0x3b, 0xb8, 0x24, 0xb1, + 0xa8, 0x24, 0xb8, 0xa4, 0x28, 0x35, 0x31, 0x57, 0x48, 0x40, 0x0f, 0xe2, 0x08, 0x3d, 0x98, 0x0b, + 0xa4, 0x30, 0x44, 0x34, 0x18, 0x0d, 0x18, 0x93, 0xd8, 0xc0, 0x4e, 0x36, 0x06, 0x04, 0x00, 0x00, + 0xff, 0xff, 0x7b, 0x5d, 0xfb, 0xe1, 0xc7, 0x00, 0x00, 0x00, +} diff --git a/vendor/github.com/hashicorp/go-plugin/grpc_broker.proto b/vendor/github.com/hashicorp/go-plugin/grpc_broker.proto new file mode 100644 index 000000000..f57834856 --- /dev/null +++ b/vendor/github.com/hashicorp/go-plugin/grpc_broker.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; +package plugin; + +message ConnInfo { + uint32 service_id = 1; + string network = 2; + string address = 3; +} + +service GRPCBroker { + rpc StartStream(stream ConnInfo) returns (stream ConnInfo); +} + + diff --git a/vendor/github.com/hashicorp/go-plugin/grpc_client.go b/vendor/github.com/hashicorp/go-plugin/grpc_client.go index 3bcf95efc..44294d0d3 100644 --- a/vendor/github.com/hashicorp/go-plugin/grpc_client.go +++ b/vendor/github.com/hashicorp/go-plugin/grpc_client.go @@ -1,7 +1,10 @@ package plugin import ( + "crypto/tls" "fmt" + "net" + "time" "golang.org/x/net/context" "google.golang.org/grpc" @@ -9,14 +12,12 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" ) -// newGRPCClient creates a new GRPCClient. The Client argument is expected -// to be successfully started already with a lock held. -func newGRPCClient(c *Client) (*GRPCClient, error) { +func dialGRPCConn(tls *tls.Config, dialer func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) { // Build dialing options. opts := make([]grpc.DialOption, 0, 5) // We use a custom dialer so that we can connect over unix domain sockets - opts = append(opts, grpc.WithDialer(c.dialer)) + opts = append(opts, grpc.WithDialer(dialer)) // go-plugin expects to block the connection opts = append(opts, grpc.WithBlock()) @@ -26,11 +27,11 @@ func newGRPCClient(c *Client) (*GRPCClient, error) { // If we have no TLS configuration set, we need to explicitly tell grpc // that we're connecting with an insecure connection. - if c.config.TLSConfig == nil { + if tls == nil { opts = append(opts, grpc.WithInsecure()) } else { opts = append(opts, grpc.WithTransportCredentials( - credentials.NewTLS(c.config.TLSConfig))) + credentials.NewTLS(tls))) } // Connect. Note the first parameter is unused because we use a custom @@ -40,9 +41,28 @@ func newGRPCClient(c *Client) (*GRPCClient, error) { return nil, err } + return conn, nil +} + +// newGRPCClient creates a new GRPCClient. The Client argument is expected +// to be successfully started already with a lock held. +func newGRPCClient(doneCtx context.Context, c *Client) (*GRPCClient, error) { + conn, err := dialGRPCConn(c.config.TLSConfig, c.dialer) + if err != nil { + return nil, err + } + + // Start the broker. + brokerGRPCClient := newGRPCBrokerClient(conn) + broker := newGRPCBroker(brokerGRPCClient, c.config.TLSConfig) + go broker.Run() + go brokerGRPCClient.StartStream() + return &GRPCClient{ Conn: conn, Plugins: c.config.Plugins, + doneCtx: doneCtx, + broker: broker, }, nil } @@ -50,10 +70,14 @@ func newGRPCClient(c *Client) (*GRPCClient, error) { type GRPCClient struct { Conn *grpc.ClientConn Plugins map[string]Plugin + + doneCtx context.Context + broker *GRPCBroker } // ClientProtocol impl. func (c *GRPCClient) Close() error { + c.broker.Close() return c.Conn.Close() } @@ -69,7 +93,7 @@ func (c *GRPCClient) Dispense(name string) (interface{}, error) { return nil, fmt.Errorf("plugin %q doesn't support gRPC", name) } - return p.GRPCClient(c.Conn) + return p.GRPCClient(c.doneCtx, c.broker, c.Conn) } // ClientProtocol impl. diff --git a/vendor/github.com/hashicorp/go-plugin/grpc_server.go b/vendor/github.com/hashicorp/go-plugin/grpc_server.go index 177a0cdd7..3a727393c 100644 --- a/vendor/github.com/hashicorp/go-plugin/grpc_server.go +++ b/vendor/github.com/hashicorp/go-plugin/grpc_server.go @@ -51,6 +51,7 @@ type GRPCServer struct { config GRPCServerConfig server *grpc.Server + broker *GRPCBroker } // ServerProtocol impl. @@ -68,14 +69,20 @@ func (s *GRPCServer) Init() error { GRPCServiceName, grpc_health_v1.HealthCheckResponse_SERVING) grpc_health_v1.RegisterHealthServer(s.server, healthCheck) + // Register the broker service + brokerServer := newGRPCBrokerServer() + RegisterGRPCBrokerServer(s.server, brokerServer) + s.broker = newGRPCBroker(brokerServer, s.TLS) + go s.broker.Run() + // Register all our plugins onto the gRPC server. for k, raw := range s.Plugins { p, ok := raw.(GRPCPlugin) if !ok { - return fmt.Errorf("%q is not a GRPC-compatibile plugin", k) + return fmt.Errorf("%q is not a GRPC-compatible plugin", k) } - if err := p.GRPCServer(s.server); err != nil { + if err := p.GRPCServer(s.broker, s.server); err != nil { return fmt.Errorf("error registring %q: %s", k, err) } } @@ -83,6 +90,16 @@ func (s *GRPCServer) Init() error { return nil } +// Stop calls Stop on the underlying grpc.Server +func (s *GRPCServer) Stop() { + s.server.Stop() +} + +// GracefulStop calls GracefulStop on the underlying grpc.Server +func (s *GRPCServer) GracefulStop() { + s.server.GracefulStop() +} + // Config is the GRPCServerConfig encoded as JSON then base64. func (s *GRPCServer) Config() string { // Create a buffer that will contain our final contents diff --git a/vendor/github.com/hashicorp/go-plugin/plugin.go b/vendor/github.com/hashicorp/go-plugin/plugin.go index 6b7bdd1cf..79d967463 100644 --- a/vendor/github.com/hashicorp/go-plugin/plugin.go +++ b/vendor/github.com/hashicorp/go-plugin/plugin.go @@ -9,6 +9,7 @@ package plugin import ( + "context" "errors" "net/rpc" @@ -33,11 +34,12 @@ type GRPCPlugin interface { // GRPCServer should register this plugin for serving with the // given GRPCServer. Unlike Plugin.Server, this is only called once // since gRPC plugins serve singletons. - GRPCServer(*grpc.Server) error + GRPCServer(*GRPCBroker, *grpc.Server) error // GRPCClient should return the interface implementation for the plugin - // you're serving via gRPC. - GRPCClient(*grpc.ClientConn) (interface{}, error) + // you're serving via gRPC. The provided context will be canceled by + // go-plugin in the event of the plugin process exiting. + GRPCClient(context.Context, *GRPCBroker, *grpc.ClientConn) (interface{}, error) } // NetRPCUnsupportedPlugin implements Plugin but returns errors for the diff --git a/vendor/github.com/hashicorp/go-plugin/server.go b/vendor/github.com/hashicorp/go-plugin/server.go index e1543214a..1e808b99e 100644 --- a/vendor/github.com/hashicorp/go-plugin/server.go +++ b/vendor/github.com/hashicorp/go-plugin/server.go @@ -66,6 +66,10 @@ type ServeConfig struct { // the gRPC health checking service. This is not optional since go-plugin // relies on this to implement Ping(). GRPCServer func([]grpc.ServerOption) *grpc.Server + + // Logger is used to pass a logger into the server. If none is provided the + // server will create a default logger. + Logger hclog.Logger } // Protocol returns the protocol that this server should speak. @@ -106,12 +110,15 @@ func Serve(opts *ServeConfig) { // Logging goes to the original stderr log.SetOutput(os.Stderr) - // internal logger to os.Stderr - logger := hclog.New(&hclog.LoggerOptions{ - Level: hclog.Trace, - Output: os.Stderr, - JSONFormat: true, - }) + logger := opts.Logger + if logger == nil { + // internal logger to os.Stderr + logger = hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + Output: os.Stderr, + JSONFormat: true, + }) + } // Create our new stdout, stderr files. These will override our built-in // stdout/stderr so that it works across the stream boundary. diff --git a/vendor/github.com/hashicorp/go-plugin/testing.go b/vendor/github.com/hashicorp/go-plugin/testing.go index c6bf7c4ed..df29593e1 100644 --- a/vendor/github.com/hashicorp/go-plugin/testing.go +++ b/vendor/github.com/hashicorp/go-plugin/testing.go @@ -2,6 +2,7 @@ package plugin import ( "bytes" + "context" "net" "net/rpc" @@ -77,6 +78,35 @@ func TestPluginRPCConn(t testing.T, ps map[string]Plugin) (*RPCClient, *RPCServe return client, server } +// TestGRPCConn returns a gRPC client conn and grpc server that are connected +// together and configured. The register function is used to register services +// prior to the Serve call. This is used to test gRPC connections. +func TestGRPCConn(t testing.T, register func(*grpc.Server)) (*grpc.ClientConn, *grpc.Server) { + // Create a listener + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("err: %s", err) + } + + server := grpc.NewServer() + register(server) + go server.Serve(l) + + // Connect to the server + conn, err := grpc.Dial( + l.Addr().String(), + grpc.WithBlock(), + grpc.WithInsecure()) + if err != nil { + t.Fatalf("err: %s", err) + } + + // Connection successful, close the listener + l.Close() + + return conn, server +} + // TestPluginGRPCConn returns a plugin gRPC client and server that are connected // together and configured. This is used to test gRPC connections. func TestPluginGRPCConn(t testing.T, ps map[string]Plugin) (*GRPCClient, *GRPCServer) { @@ -107,13 +137,17 @@ func TestPluginGRPCConn(t testing.T, ps map[string]Plugin) (*GRPCClient, *GRPCSe t.Fatalf("err: %s", err) } - // Connection successful, close the listener - l.Close() + brokerGRPCClient := newGRPCBrokerClient(conn) + broker := newGRPCBroker(brokerGRPCClient, nil) + go broker.Run() + go brokerGRPCClient.StartStream() // Create the client client := &GRPCClient{ Conn: conn, Plugins: ps, + broker: broker, + doneCtx: context.Background(), } return client, server diff --git a/vendor/github.com/oklog/run/LICENSE b/vendor/github.com/oklog/run/LICENSE new file mode 100644 index 000000000..261eeb9e9 --- /dev/null +++ b/vendor/github.com/oklog/run/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/oklog/run/README.md b/vendor/github.com/oklog/run/README.md new file mode 100644 index 000000000..a7228cd9a --- /dev/null +++ b/vendor/github.com/oklog/run/README.md @@ -0,0 +1,73 @@ +# run + +[![GoDoc](https://godoc.org/github.com/oklog/run?status.svg)](https://godoc.org/github.com/oklog/run) +[![Build Status](https://travis-ci.org/oklog/run.svg?branch=master)](https://travis-ci.org/oklog/run) +[![Go Report Card](https://goreportcard.com/badge/github.com/oklog/run)](https://goreportcard.com/report/github.com/oklog/run) +[![Apache 2 licensed](https://img.shields.io/badge/license-Apache2-blue.svg)](https://raw.githubusercontent.com/oklog/run/master/LICENSE) + +run.Group is a universal mechanism to manage goroutine lifecycles. + +Create a zero-value run.Group, and then add actors to it. Actors are defined as +a pair of functions: an **execute** function, which should run synchronously; +and an **interrupt** function, which, when invoked, should cause the execute +function to return. Finally, invoke Run, which blocks until the first actor +returns. This general-purpose API allows callers to model pretty much any +runnable task, and achieve well-defined lifecycle semantics for the group. + +run.Group was written to manage component lifecycles in func main for +[OK Log](https://github.com/oklog/oklog). +But it's useful in any circumstance where you need to orchestrate multiple +goroutines as a unit whole. +[Click here](https://www.youtube.com/watch?v=LHe1Cb_Ud_M&t=15m45s) to see a +video of a talk where run.Group is described. + +## Examples + +### context.Context + +```go +ctx, cancel := context.WithCancel(context.Background()) +g.Add(func() error { + return myProcess(ctx, ...) +}, func(error) { + cancel() +}) +``` + +### net.Listener + +```go +ln, _ := net.Listen("tcp", ":8080") +g.Add(func() error { + return http.Serve(ln, nil) +}, func(error) { + ln.Close() +}) +``` + +### io.ReadCloser + +```go +var conn io.ReadCloser = ... +g.Add(func() error { + s := bufio.NewScanner(conn) + for s.Scan() { + println(s.Text()) + } + return s.Err() +}, func(error) { + conn.Close() +}) +``` + +## Comparisons + +Package run is somewhat similar to package +[errgroup](https://godoc.org/golang.org/x/sync/errgroup), +except it doesn't require actor goroutines to understand context semantics. + +It's somewhat similar to package +[tomb.v1](https://godoc.org/gopkg.in/tomb.v1) or +[tomb.v2](https://godoc.org/gopkg.in/tomb.v2), +except it has a much smaller API surface, delegating e.g. staged shutdown of +goroutines to the caller. diff --git a/vendor/github.com/oklog/run/group.go b/vendor/github.com/oklog/run/group.go new file mode 100644 index 000000000..832d47dd1 --- /dev/null +++ b/vendor/github.com/oklog/run/group.go @@ -0,0 +1,62 @@ +// Package run implements an actor-runner with deterministic teardown. It is +// somewhat similar to package errgroup, except it does not require actor +// goroutines to understand context semantics. This makes it suitable for use in +// more circumstances; for example, goroutines which are handling connections +// from net.Listeners, or scanning input from a closable io.Reader. +package run + +// Group collects actors (functions) and runs them concurrently. +// When one actor (function) returns, all actors are interrupted. +// The zero value of a Group is useful. +type Group struct { + actors []actor +} + +// Add an actor (function) to the group. Each actor must be pre-emptable by an +// interrupt function. That is, if interrupt is invoked, execute should return. +// Also, it must be safe to call interrupt even after execute has returned. +// +// The first actor (function) to return interrupts all running actors. +// The error is passed to the interrupt functions, and is returned by Run. +func (g *Group) Add(execute func() error, interrupt func(error)) { + g.actors = append(g.actors, actor{execute, interrupt}) +} + +// Run all actors (functions) concurrently. +// When the first actor returns, all others are interrupted. +// Run only returns when all actors have exited. +// Run returns the error returned by the first exiting actor. +func (g *Group) Run() error { + if len(g.actors) == 0 { + return nil + } + + // Run each actor. + errors := make(chan error, len(g.actors)) + for _, a := range g.actors { + go func(a actor) { + errors <- a.execute() + }(a) + } + + // Wait for the first actor to stop. + err := <-errors + + // Signal all actors to stop. + for _, a := range g.actors { + a.interrupt(err) + } + + // Wait for all actors to stop. + for i := 1; i < cap(errors); i++ { + <-errors + } + + // Return the original error. + return err +} + +type actor struct { + execute func() error + interrupt func(error) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 347ded744..a0706f836 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -1572,10 +1572,12 @@ "revision": "d30f09973e19c1dfcd120b2d9c4f168e68d6b5d5" }, { - "checksumSHA1": "R6me0jVmcT/OPo80Fe0qo5fRwHc=", + "checksumSHA1": "y3op+t01flBlSBKlzUNqH5d4XHQ=", "path": "github.com/hashicorp/go-plugin", - "revision": "a5174f84d7f8ff00fb07ab4ef1f380d32eee0e63", - "revisionTime": "2017-08-16T15:18:19Z" + "revision": "e53f54cbf51efde642d4711313e829a1ff0c236d", + "revisionTime": "2018-01-25T19:04:38Z", + "version": "master", + "versionExact": "master" }, { "checksumSHA1": "ErJHGU6AVPZM9yoY/xV11TwSjQs=", @@ -2068,6 +2070,12 @@ "revision": "179d4d0c4d8d407a32af483c2354df1d2c91e6c3", "revisionTime": "2013-12-21T20:05:32Z" }, + { + "checksumSHA1": "Sfxv8SV6j8m6YD+hwvlMJjq2zfg=", + "path": "github.com/oklog/run", + "revision": "4dadeb3030eda0273a12382bb2348ffc7c9d1a39", + "revisionTime": "2017-11-14T00:29:35Z" + }, { "checksumSHA1": "dIMJD0AZwSwmuOuTaGgqWZkzuPU=", "path": "github.com/packer-community/winrmcp",