terraform/vendor/github.com/hashicorp/go-plugin/grpc_stdio.go

208 lines
5.8 KiB
Go

package plugin
import (
"bufio"
"bytes"
"context"
"io"
empty "github.com/golang/protobuf/ptypes/empty"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin/internal/plugin"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// grpcStdioBuffer is the buffer size we try to fill when sending a chunk of
// stdio data. This is currently 1 KB for no reason other than that seems like
// enough (stdio data isn't that common) and is fairly low.
const grpcStdioBuffer = 1 * 1024
// grpcStdioServer implements the Stdio service and streams stdiout/stderr.
type grpcStdioServer struct {
stdoutCh <-chan []byte
stderrCh <-chan []byte
}
// newGRPCStdioServer creates a new grpcStdioServer and starts the stream
// copying for the given out and err readers.
//
// This must only be called ONCE per srcOut, srcErr.
func newGRPCStdioServer(log hclog.Logger, srcOut, srcErr io.Reader) *grpcStdioServer {
stdoutCh := make(chan []byte)
stderrCh := make(chan []byte)
// Begin copying the streams
go copyChan(log, stdoutCh, srcOut)
go copyChan(log, stderrCh, srcErr)
// Construct our server
return &grpcStdioServer{
stdoutCh: stdoutCh,
stderrCh: stderrCh,
}
}
// StreamStdio streams our stdout/err as the response.
func (s *grpcStdioServer) StreamStdio(
_ *empty.Empty,
srv plugin.GRPCStdio_StreamStdioServer,
) error {
// Share the same data value between runs. Sending this over the wire
// marshals it so we can reuse this.
var data plugin.StdioData
for {
// Read our data
select {
case data.Data = <-s.stdoutCh:
data.Channel = plugin.StdioData_STDOUT
case data.Data = <-s.stderrCh:
data.Channel = plugin.StdioData_STDERR
case <-srv.Context().Done():
return nil
}
// Not sure if this is possible, but if we somehow got here and
// we didn't populate any data at all, then just continue.
if len(data.Data) == 0 {
continue
}
// Send our data to the client.
if err := srv.Send(&data); err != nil {
return err
}
}
}
// grpcStdioClient wraps the stdio service as a client to copy
// the stdio data to output writers.
type grpcStdioClient struct {
log hclog.Logger
stdioClient plugin.GRPCStdio_StreamStdioClient
}
// newGRPCStdioClient creates a grpcStdioClient. This will perform the
// initial connection to the stdio service. If the stdio service is unavailable
// then this will be a no-op. This allows this to work without error for
// plugins that don't support this.
func newGRPCStdioClient(
ctx context.Context,
log hclog.Logger,
conn *grpc.ClientConn,
) (*grpcStdioClient, error) {
client := plugin.NewGRPCStdioClient(conn)
// Connect immediately to the endpoint
stdioClient, err := client.StreamStdio(ctx, &empty.Empty{})
// If we get an Unavailable or Unimplemented error, this means that the plugin isn't
// updated and linking to the latest version of go-plugin that supports
// this. We fall back to the previous behavior of just not syncing anything.
if status.Code(err) == codes.Unavailable || status.Code(err) == codes.Unimplemented {
log.Warn("stdio service not available, stdout/stderr syncing unavailable")
stdioClient = nil
err = nil
}
if err != nil {
return nil, err
}
return &grpcStdioClient{
log: log,
stdioClient: stdioClient,
}, nil
}
// Run starts the loop that receives stdio data and writes it to the given
// writers. This blocks and should be run in a goroutine.
func (c *grpcStdioClient) Run(stdout, stderr io.Writer) {
// This will be nil if stdio is not supported by the plugin
if c.stdioClient == nil {
c.log.Warn("stdio service unavailable, run will do nothing")
return
}
for {
c.log.Trace("waiting for stdio data")
data, err := c.stdioClient.Recv()
if err != nil {
if err == io.EOF ||
status.Code(err) == codes.Unavailable ||
status.Code(err) == codes.Canceled ||
status.Code(err) == codes.Unimplemented ||
err == context.Canceled {
c.log.Warn("received EOF, stopping recv loop", "err", err)
return
}
c.log.Error("error receiving data", "err", err)
continue
}
// Determine our output writer based on channel
var w io.Writer
switch data.Channel {
case plugin.StdioData_STDOUT:
w = stdout
case plugin.StdioData_STDERR:
w = stderr
default:
c.log.Warn("unknown channel, dropping", "channel", data.Channel)
continue
}
// Write! In the event of an error we just continue.
if c.log.IsTrace() {
c.log.Trace("received data", "channel", data.Channel.String(), "len", len(data.Data))
}
if _, err := io.Copy(w, bytes.NewReader(data.Data)); err != nil {
c.log.Error("failed to copy all bytes", "err", err)
}
}
}
// copyChan copies an io.Reader into a channel.
func copyChan(log hclog.Logger, dst chan<- []byte, src io.Reader) {
bufsrc := bufio.NewReader(src)
for {
// Make our data buffer. We allocate a new one per loop iteration
// so that we can send it over the channel.
var data [1024]byte
// Read the data, this will block until data is available
n, err := bufsrc.Read(data[:])
// We have to check if we have data BEFORE err != nil. The bufio
// docs guarantee n == 0 on EOF but its better to be safe here.
if n > 0 {
// We have data! Send it on the channel. This will block if there
// is no reader on the other side. We expect that go-plugin will
// connect immediately to the stdio server to drain this so we want
// this block to happen for backpressure.
dst <- data[:n]
}
// If we hit EOF we're done copying
if err == io.EOF {
log.Debug("stdio EOF, exiting copy loop")
return
}
// Any other error we just exit the loop. We don't expect there to
// be errors since our use case for this is reading/writing from
// a in-process pipe (os.Pipe).
if err != nil {
log.Warn("error copying stdio data, stopping copy", "err", err)
return
}
}
}