Merge pull request #19060 from hashicorp/f-stx-etx
backend/remote: update `go-tfe` to support better log polling
This commit is contained in:
commit
7b55d1640e
|
@ -165,6 +165,9 @@ func (b *Remote) opApply(stopCtx, cancelCtx context.Context, op *backend.Operati
|
||||||
scanner := bufio.NewScanner(logs)
|
scanner := bufio.NewScanner(logs)
|
||||||
|
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
|
if scanner.Text() == "\x02" || scanner.Text() == "\x03" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if b.CLI != nil {
|
if b.CLI != nil {
|
||||||
b.CLI.Output(b.Colorize().Color(scanner.Text()))
|
b.CLI.Output(b.Colorize().Color(scanner.Text()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -208,6 +208,9 @@ func (b *Remote) plan(stopCtx, cancelCtx context.Context, op *backend.Operation,
|
||||||
scanner := bufio.NewScanner(logs)
|
scanner := bufio.NewScanner(logs)
|
||||||
|
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
|
if scanner.Text() == "\x02" || scanner.Text() == "\x03" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if b.CLI != nil {
|
if b.CLI != nil {
|
||||||
b.CLI.Output(b.Colorize().Color(scanner.Text()))
|
b.CLI.Output(b.Colorize().Color(scanner.Text()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
package tfe
|
package tfe
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
@ -11,11 +13,24 @@ import (
|
||||||
|
|
||||||
// LogReader implements io.Reader for streaming logs.
|
// LogReader implements io.Reader for streaming logs.
|
||||||
type LogReader struct {
|
type LogReader struct {
|
||||||
client *Client
|
client *Client
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
done func() (bool, error)
|
done func() (bool, error)
|
||||||
logURL *url.URL
|
logURL *url.URL
|
||||||
offset int64
|
offset int64
|
||||||
|
reads int
|
||||||
|
startOfText bool
|
||||||
|
endOfText bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// backoff will perform exponential backoff based on the iteration and
|
||||||
|
// limited by the provided min and max (in milliseconds) durations.
|
||||||
|
func backoff(min, max float64, iter int) time.Duration {
|
||||||
|
backoff := math.Pow(2, float64(iter)/5) * min
|
||||||
|
if backoff > max {
|
||||||
|
backoff = max
|
||||||
|
}
|
||||||
|
return time.Duration(backoff) * time.Millisecond
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *LogReader) Read(l []byte) (int, error) {
|
func (r *LogReader) Read(l []byte) (int, error) {
|
||||||
|
@ -26,11 +41,11 @@ func (r *LogReader) Read(l []byte) (int, error) {
|
||||||
// Loop until we can any data, the context is canceled or the
|
// Loop until we can any data, the context is canceled or the
|
||||||
// run is finsished. If we would return right away without any
|
// run is finsished. If we would return right away without any
|
||||||
// data, we could and up causing a io.ErrNoProgress error.
|
// data, we could and up causing a io.ErrNoProgress error.
|
||||||
for {
|
for r.reads = 1; ; r.reads++ {
|
||||||
select {
|
select {
|
||||||
case <-r.ctx.Done():
|
case <-r.ctx.Done():
|
||||||
return 0, r.ctx.Err()
|
return 0, r.ctx.Err()
|
||||||
case <-time.After(500 * time.Millisecond):
|
case <-time.After(backoff(500, 2000, r.reads)):
|
||||||
if written, err := r.read(l); err != io.ErrNoProgress {
|
if written, err := r.read(l); err != io.ErrNoProgress {
|
||||||
return written, err
|
return written, err
|
||||||
}
|
}
|
||||||
|
@ -70,16 +85,33 @@ func (r *LogReader) read(l []byte) (int, error) {
|
||||||
return written, err
|
return written, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if written > 0 {
|
||||||
|
// Check for an STX (Start of Text) ASCII control marker.
|
||||||
|
if !r.startOfText && bytes.Contains(l, []byte("\x02")) {
|
||||||
|
r.startOfText = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we found an STX ASCII control character, start looking for
|
||||||
|
// the ETX (End of Text) control character.
|
||||||
|
if r.startOfText && bytes.Contains(l, []byte("\x03")) {
|
||||||
|
r.endOfText = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Check if we need to continue the loop and wait 500 miliseconds
|
// Check if we need to continue the loop and wait 500 miliseconds
|
||||||
// before checking if there is a new chunk available or that the
|
// before checking if there is a new chunk available or that the
|
||||||
// run is finished and we are done reading all chunks.
|
// run is finished and we are done reading all chunks.
|
||||||
if written == 0 {
|
if written == 0 {
|
||||||
done, err := r.done()
|
if (r.startOfText && r.endOfText) || // The logstream finished without issues.
|
||||||
if err != nil {
|
(r.startOfText && r.reads%10 == 0) || // The logstream terminated unexpectedly.
|
||||||
return 0, err
|
(!r.startOfText && r.reads > 1) { // The logstream doesn't support STX/ETX.
|
||||||
}
|
done, err := r.done()
|
||||||
if done {
|
if err != nil {
|
||||||
return 0, io.EOF
|
return 0, err
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return 0, io.ErrNoProgress
|
return 0, io.ErrNoProgress
|
||||||
}
|
}
|
||||||
|
|
|
@ -1804,10 +1804,10 @@
|
||||||
"revisionTime": "2018-07-12T07:51:27Z"
|
"revisionTime": "2018-07-12T07:51:27Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "9EZuhp7LWTAVsTDpP9DzajjmJxg=",
|
"checksumSHA1": "bZzpA/TNWpYzVGIFEWLpOz7AXCU=",
|
||||||
"path": "github.com/hashicorp/go-tfe",
|
"path": "github.com/hashicorp/go-tfe",
|
||||||
"revision": "ed986a3b38aba4630ca6ae7dbc876eb0d0c95c57",
|
"revision": "937a37d8d40df424b1e47fe05de0548727154efc",
|
||||||
"revisionTime": "2018-10-10T13:21:10Z"
|
"revisionTime": "2018-10-11T20:03:11Z"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"checksumSHA1": "85XUnluYJL7F55ptcwdmN8eSOsk=",
|
"checksumSHA1": "85XUnluYJL7F55ptcwdmN8eSOsk=",
|
||||||
|
|
Loading…
Reference in New Issue