Merge pull request #20211 from yanndegat/swift_remote_backend
Swift remote backend support for workspaces & locking
This commit is contained in:
commit
aebfecf871
|
@ -173,6 +173,13 @@ func New() backend.Backend {
|
|||
Optional: true,
|
||||
Description: descriptions["expire_after"],
|
||||
},
|
||||
|
||||
"lock": &schema.Schema{
|
||||
Type: schema.TypeBool,
|
||||
Optional: true,
|
||||
Description: "Lock state access",
|
||||
Default: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -238,6 +245,7 @@ type Backend struct {
|
|||
archiveContainer string
|
||||
expireSecs int
|
||||
container string
|
||||
lock bool
|
||||
}
|
||||
|
||||
func (b *Backend) configure(ctx context.Context) error {
|
||||
|
@ -247,7 +255,6 @@ func (b *Backend) configure(ctx context.Context) error {
|
|||
|
||||
// Grab the resource data
|
||||
data := schema.FromContextBackendConfig(ctx)
|
||||
|
||||
config := &tf_openstack.Config{
|
||||
CACertFile: data.Get("cacert_file").(string),
|
||||
ClientCertFile: data.Get("cert").(string),
|
||||
|
@ -280,6 +287,9 @@ func (b *Backend) configure(ctx context.Context) error {
|
|||
b.container = data.Get("path").(string)
|
||||
}
|
||||
|
||||
// Store the lock information
|
||||
b.lock = data.Get("lock").(bool)
|
||||
|
||||
// Enable object archiving?
|
||||
if archiveContainer, ok := data.GetOk("archive_container"); ok {
|
||||
log.Printf("[DEBUG] Archive_container set, enabling object versioning")
|
||||
|
|
|
@ -1,22 +1,78 @@
|
|||
package swift
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/terraform/backend"
|
||||
"github.com/hashicorp/terraform/state"
|
||||
"github.com/hashicorp/terraform/state/remote"
|
||||
"github.com/hashicorp/terraform/states"
|
||||
)
|
||||
|
||||
const (
|
||||
objectEnvPrefix = "env-"
|
||||
delimiter = "/"
|
||||
)
|
||||
|
||||
func (b *Backend) Workspaces() ([]string, error) {
|
||||
return nil, backend.ErrWorkspacesNotSupported
|
||||
client := &RemoteClient{
|
||||
client: b.client,
|
||||
container: b.container,
|
||||
archive: b.archive,
|
||||
archiveContainer: b.archiveContainer,
|
||||
expireSecs: b.expireSecs,
|
||||
lockState: b.lock,
|
||||
}
|
||||
|
||||
// List our container objects
|
||||
objectNames, err := client.ListObjectsNames(objectEnvPrefix, delimiter)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Find the envs, we use a map since we can get duplicates with
|
||||
// path suffixes.
|
||||
envs := map[string]struct{}{}
|
||||
for _, object := range objectNames {
|
||||
object = strings.TrimPrefix(object, objectEnvPrefix)
|
||||
object = strings.TrimSuffix(object, delimiter)
|
||||
|
||||
// Ignore objects that still contain a "/"
|
||||
// as we dont store states in subdirectories
|
||||
if idx := strings.Index(object, delimiter); idx >= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// swift is eventually consistent, thus a deleted object may
|
||||
// be listed in objectList. To ensure consistency, we query
|
||||
// each object with a "newest" arg set to true
|
||||
payload, err := client.get(b.objectName(object))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if payload == nil {
|
||||
// object doesn't exist anymore. skipping.
|
||||
continue
|
||||
}
|
||||
|
||||
envs[object] = struct{}{}
|
||||
}
|
||||
|
||||
result := make([]string, 1, len(envs)+1)
|
||||
result[0] = backend.DefaultStateName
|
||||
|
||||
for k, _ := range envs {
|
||||
result = append(result, k)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (b *Backend) DeleteWorkspace(name string) error {
|
||||
return backend.ErrWorkspacesNotSupported
|
||||
}
|
||||
|
||||
func (b *Backend) StateMgr(name string) (state.State, error) {
|
||||
if name != backend.DefaultStateName {
|
||||
return nil, backend.ErrWorkspacesNotSupported
|
||||
if name == backend.DefaultStateName || name == "" {
|
||||
return fmt.Errorf("can't delete default state")
|
||||
}
|
||||
|
||||
client := &RemoteClient{
|
||||
|
@ -25,7 +81,128 @@ func (b *Backend) StateMgr(name string) (state.State, error) {
|
|||
archive: b.archive,
|
||||
archiveContainer: b.archiveContainer,
|
||||
expireSecs: b.expireSecs,
|
||||
objectName: b.objectName(name),
|
||||
lockState: b.lock,
|
||||
}
|
||||
|
||||
return &remote.State{Client: client}, nil
|
||||
// Delete our object
|
||||
err := client.Delete()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *Backend) StateMgr(name string) (state.State, error) {
|
||||
if name == "" {
|
||||
return nil, fmt.Errorf("missing state name")
|
||||
}
|
||||
|
||||
client := &RemoteClient{
|
||||
client: b.client,
|
||||
container: b.container,
|
||||
archive: b.archive,
|
||||
archiveContainer: b.archiveContainer,
|
||||
expireSecs: b.expireSecs,
|
||||
objectName: b.objectName(name),
|
||||
lockState: b.lock,
|
||||
}
|
||||
|
||||
var stateMgr state.State = &remote.State{Client: client}
|
||||
|
||||
// If we're not locking, disable it
|
||||
if !b.lock {
|
||||
stateMgr = &state.LockDisabled{Inner: stateMgr}
|
||||
}
|
||||
|
||||
// Check to see if this state already exists.
|
||||
// If we're trying to force-unlock a state, we can't take the lock before
|
||||
// fetching the state. If the state doesn't exist, we have to assume this
|
||||
// is a normal create operation, and take the lock at that point.
|
||||
//
|
||||
// If we need to force-unlock, but for some reason the state no longer
|
||||
// exists, the user will have to use openstack tools to manually fix the
|
||||
// situation.
|
||||
existing, err := b.Workspaces()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
exists := false
|
||||
for _, s := range existing {
|
||||
if s == name {
|
||||
exists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// We need to create the object so it's listed by States.
|
||||
if !exists {
|
||||
// the default state always exists
|
||||
if name == backend.DefaultStateName {
|
||||
return stateMgr, nil
|
||||
}
|
||||
|
||||
// Grab a lock, we use this to write an empty state if one doesn't
|
||||
// exist already. We have to write an empty state as a sentinel value
|
||||
// so States() knows it exists.
|
||||
lockInfo := state.NewLockInfo()
|
||||
lockInfo.Operation = "init"
|
||||
lockId, err := stateMgr.Lock(lockInfo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to lock state in Swift: %s", err)
|
||||
}
|
||||
|
||||
// Local helper function so we can call it multiple places
|
||||
lockUnlock := func(parent error) error {
|
||||
if err := stateMgr.Unlock(lockId); err != nil {
|
||||
return fmt.Errorf(strings.TrimSpace(errStateUnlock), lockId, err)
|
||||
}
|
||||
|
||||
return parent
|
||||
}
|
||||
|
||||
// Grab the value
|
||||
if err := stateMgr.RefreshState(); err != nil {
|
||||
err = lockUnlock(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we have no state, we have to create an empty state
|
||||
if v := stateMgr.State(); v == nil {
|
||||
if err := stateMgr.WriteState(states.NewState()); err != nil {
|
||||
err = lockUnlock(err)
|
||||
return nil, err
|
||||
}
|
||||
if err := stateMgr.PersistState(); err != nil {
|
||||
err = lockUnlock(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Unlock, the state should now be initialized
|
||||
if err := lockUnlock(nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return stateMgr, nil
|
||||
}
|
||||
|
||||
func (b *Backend) objectName(name string) string {
|
||||
if name != backend.DefaultStateName {
|
||||
name = fmt.Sprintf("%s%s/%s", objectEnvPrefix, name, TFSTATE_NAME)
|
||||
} else {
|
||||
name = TFSTATE_NAME
|
||||
}
|
||||
|
||||
return name
|
||||
}
|
||||
|
||||
const errStateUnlock = `
|
||||
Error unlocking Swift state. Lock ID: %s
|
||||
|
||||
Error: %s
|
||||
|
||||
You may have to force-unlock this state in order to use it again.
|
||||
The Swift backend acquires a lock during initialization to ensure
|
||||
the minimum required keys are prepared.
|
||||
`
|
||||
|
|
|
@ -2,22 +2,11 @@ package swift
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gophercloud/gophercloud"
|
||||
"github.com/gophercloud/gophercloud/openstack/objectstorage/v1/containers"
|
||||
"github.com/gophercloud/gophercloud/openstack/objectstorage/v1/objects"
|
||||
"github.com/gophercloud/gophercloud/pagination"
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
|
||||
"github.com/hashicorp/terraform/addrs"
|
||||
"github.com/hashicorp/terraform/backend"
|
||||
"github.com/hashicorp/terraform/state/remote"
|
||||
"github.com/hashicorp/terraform/states"
|
||||
"github.com/hashicorp/terraform/states/statefile"
|
||||
)
|
||||
|
||||
// verify that we are doing ACC tests or the Swift tests specifically
|
||||
|
@ -45,198 +34,82 @@ func TestBackendConfig(t *testing.T) {
|
|||
testACC(t)
|
||||
|
||||
// Build config
|
||||
container := fmt.Sprintf("terraform-state-swift-testconfig-%x", time.Now().Unix())
|
||||
archiveContainer := fmt.Sprintf("%s_archive", container)
|
||||
|
||||
config := map[string]interface{}{
|
||||
"archive_container": "test-tfstate-archive",
|
||||
"container": "test-tfstate",
|
||||
"archive_container": archiveContainer,
|
||||
"container": container,
|
||||
}
|
||||
|
||||
b := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(config)).(*Backend)
|
||||
|
||||
if b.container != "test-tfstate" {
|
||||
t.Fatal("Incorrect path was provided.")
|
||||
if b.container != container {
|
||||
t.Fatal("Incorrect container was provided.")
|
||||
}
|
||||
if b.archiveContainer != "test-tfstate-archive" {
|
||||
t.Fatal("Incorrect archivepath was provided.")
|
||||
if b.archiveContainer != archiveContainer {
|
||||
t.Fatal("Incorrect archive_container was provided.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackend(t *testing.T) {
|
||||
testACC(t)
|
||||
|
||||
container := fmt.Sprintf("terraform-state-swift-test-%x", time.Now().Unix())
|
||||
container := fmt.Sprintf("terraform-state-swift-testbackend-%x", time.Now().Unix())
|
||||
|
||||
b := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
|
||||
be0 := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
|
||||
"container": container,
|
||||
})).(*Backend)
|
||||
|
||||
defer deleteSwiftContainer(t, b.client, container)
|
||||
|
||||
backend.TestBackendStates(t, b)
|
||||
}
|
||||
|
||||
func TestBackendPath(t *testing.T) {
|
||||
testACC(t)
|
||||
|
||||
path := fmt.Sprintf("terraform-state-swift-test-%x", time.Now().Unix())
|
||||
t.Logf("[DEBUG] Generating backend config")
|
||||
b := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
|
||||
"path": path,
|
||||
be1 := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
|
||||
"container": container,
|
||||
})).(*Backend)
|
||||
t.Logf("[DEBUG] Backend configured")
|
||||
|
||||
defer deleteSwiftContainer(t, b.client, path)
|
||||
|
||||
t.Logf("[DEBUG] Testing Backend")
|
||||
|
||||
// Generate some state
|
||||
state1 := states.NewState()
|
||||
|
||||
// RemoteClient to test with
|
||||
client := &RemoteClient{
|
||||
client: b.client,
|
||||
archive: b.archive,
|
||||
archiveContainer: b.archiveContainer,
|
||||
container: b.container,
|
||||
client: be0.client,
|
||||
container: be0.container,
|
||||
}
|
||||
|
||||
stateMgr := &remote.State{Client: client}
|
||||
stateMgr.WriteState(state1)
|
||||
if err := stateMgr.PersistState(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := stateMgr.RefreshState(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Add some state
|
||||
mod := state1.EnsureModule(addrs.RootModuleInstance)
|
||||
mod.SetOutputValue("bar", cty.StringVal("baz"), false)
|
||||
stateMgr.WriteState(state1)
|
||||
if err := stateMgr.PersistState(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.deleteContainer()
|
||||
|
||||
backend.TestBackendStates(t, be0)
|
||||
backend.TestBackendStateLocks(t, be0, be1)
|
||||
backend.TestBackendStateForceUnlock(t, be0, be1)
|
||||
}
|
||||
|
||||
func TestBackendArchive(t *testing.T) {
|
||||
testACC(t)
|
||||
|
||||
container := fmt.Sprintf("terraform-state-swift-test-%x", time.Now().Unix())
|
||||
container := fmt.Sprintf("terraform-state-swift-testarchive-%x", time.Now().Unix())
|
||||
archiveContainer := fmt.Sprintf("%s_archive", container)
|
||||
|
||||
b := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
|
||||
be0 := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
|
||||
"archive_container": archiveContainer,
|
||||
"container": container,
|
||||
})).(*Backend)
|
||||
|
||||
defer deleteSwiftContainer(t, b.client, container)
|
||||
defer deleteSwiftContainer(t, b.client, archiveContainer)
|
||||
be1 := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
|
||||
"archive_container": archiveContainer,
|
||||
"container": container,
|
||||
})).(*Backend)
|
||||
|
||||
// Generate some state
|
||||
state1 := states.NewState()
|
||||
|
||||
// RemoteClient to test with
|
||||
client := &RemoteClient{
|
||||
client: b.client,
|
||||
archive: b.archive,
|
||||
archiveContainer: b.archiveContainer,
|
||||
container: b.container,
|
||||
}
|
||||
|
||||
stateMgr := &remote.State{Client: client}
|
||||
stateMgr.WriteState(state1)
|
||||
if err := stateMgr.PersistState(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := stateMgr.RefreshState(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Add some state
|
||||
mod := state1.EnsureModule(addrs.RootModuleInstance)
|
||||
mod.SetOutputValue("bar", cty.StringVal("baz"), false)
|
||||
stateMgr.WriteState(state1)
|
||||
if err := stateMgr.PersistState(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
archiveObjects := getSwiftObjectNames(t, b.client, archiveContainer)
|
||||
t.Logf("archiveObjects len = %d. Contents = %+v", len(archiveObjects), archiveObjects)
|
||||
if len(archiveObjects) != 1 {
|
||||
t.Fatalf("Invalid number of archive objects. Expected 1, got %d", len(archiveObjects))
|
||||
}
|
||||
|
||||
// Download archive state to validate
|
||||
archiveData := downloadSwiftObject(t, b.client, archiveContainer, archiveObjects[0])
|
||||
t.Logf("Archive data downloaded... Looks like: %+v", archiveData)
|
||||
archiveStateFile, err := statefile.Read(archiveData)
|
||||
if err != nil {
|
||||
t.Fatalf("Error Reading State: %s", err)
|
||||
}
|
||||
|
||||
t.Logf("Archive state lineage = %s, serial = %d", archiveStateFile.Lineage, archiveStateFile.Serial)
|
||||
if stateMgr.StateSnapshotMeta().Lineage != archiveStateFile.Lineage {
|
||||
t.Fatal("Got a different lineage")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Helper function to download an object in a Swift container
|
||||
func downloadSwiftObject(t *testing.T, osClient *gophercloud.ServiceClient, container, object string) (data io.Reader) {
|
||||
t.Logf("Attempting to download object %s from container %s", object, container)
|
||||
res := objects.Download(osClient, container, object, nil)
|
||||
if res.Err != nil {
|
||||
t.Fatalf("Error downloading object: %s", res.Err)
|
||||
}
|
||||
data = res.Body
|
||||
return
|
||||
}
|
||||
|
||||
// Helper function to get a list of objects in a Swift container
|
||||
func getSwiftObjectNames(t *testing.T, osClient *gophercloud.ServiceClient, container string) (objectNames []string) {
|
||||
_ = objects.List(osClient, container, nil).EachPage(func(page pagination.Page) (bool, error) {
|
||||
// Get a slice of object names
|
||||
names, err := objects.ExtractNames(page)
|
||||
if err != nil {
|
||||
t.Fatalf("Error extracting object names from page: %s", err)
|
||||
}
|
||||
for _, object := range names {
|
||||
objectNames = append(objectNames, object)
|
||||
defer func() {
|
||||
client := &RemoteClient{
|
||||
client: be0.client,
|
||||
container: be0.container,
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Helper function to delete Swift container
|
||||
func deleteSwiftContainer(t *testing.T, osClient *gophercloud.ServiceClient, container string) {
|
||||
warning := "WARNING: Failed to delete the test Swift container. It may have been left in your Openstack account and may incur storage charges. (error was %s)"
|
||||
|
||||
// Remove any objects
|
||||
deleteSwiftObjects(t, osClient, container)
|
||||
|
||||
// Delete the container
|
||||
deleteResult := containers.Delete(osClient, container)
|
||||
if deleteResult.Err != nil {
|
||||
if _, ok := deleteResult.Err.(gophercloud.ErrDefault404); !ok {
|
||||
t.Fatalf(warning, deleteResult.Err)
|
||||
aclient := &RemoteClient{
|
||||
client: be0.client,
|
||||
container: be0.archiveContainer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to delete Swift objects within a container
|
||||
func deleteSwiftObjects(t *testing.T, osClient *gophercloud.ServiceClient, container string) {
|
||||
// Get a slice of object names
|
||||
objectNames := getSwiftObjectNames(t, osClient, container)
|
||||
|
||||
for _, object := range objectNames {
|
||||
result := objects.Delete(osClient, container, object, nil)
|
||||
if result.Err != nil {
|
||||
t.Fatalf("Error deleting object %s from container %s: %s", object, container, result.Err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
defer client.deleteContainer()
|
||||
client.deleteContainer()
|
||||
aclient.deleteContainer()
|
||||
}()
|
||||
|
||||
backend.TestBackendStates(t, be0)
|
||||
backend.TestBackendStateLocks(t, be0, be1)
|
||||
backend.TestBackendStateForceUnlock(t, be0, be1)
|
||||
}
|
||||
|
|
|
@ -2,37 +2,103 @@ package swift
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gophercloud/gophercloud"
|
||||
"github.com/gophercloud/gophercloud/openstack/objectstorage/v1/containers"
|
||||
"github.com/gophercloud/gophercloud/openstack/objectstorage/v1/objects"
|
||||
|
||||
"github.com/gophercloud/gophercloud/pagination"
|
||||
"github.com/hashicorp/terraform/state"
|
||||
"github.com/hashicorp/terraform/state/remote"
|
||||
)
|
||||
|
||||
const (
|
||||
TFSTATE_NAME = "tfstate.tf"
|
||||
TFSTATE_LOCK_NAME = "tfstate.lock"
|
||||
TFSTATE_NAME = "tfstate.tf"
|
||||
|
||||
consistencyTimeout = 15
|
||||
|
||||
// Suffix that will be appended to state file paths
|
||||
// when locking
|
||||
lockSuffix = ".lock"
|
||||
|
||||
// The TTL associated with this lock.
|
||||
lockTTL = 60 * time.Second
|
||||
|
||||
// The Interval associated with this lock periodic renew.
|
||||
lockRenewInterval = 30 * time.Second
|
||||
|
||||
// The amount of time we will retry to delete a container waiting for
|
||||
// the objects to be deleted.
|
||||
deleteRetryTimeout = 60 * time.Second
|
||||
|
||||
// delay when polling the objects
|
||||
deleteRetryPollInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
// RemoteClient implements the Client interface for an Openstack Swift server.
|
||||
// Implements "state/remote".ClientLocker
|
||||
type RemoteClient struct {
|
||||
client *gophercloud.ServiceClient
|
||||
container string
|
||||
archive bool
|
||||
archiveContainer string
|
||||
expireSecs int
|
||||
objectName string
|
||||
|
||||
mu sync.Mutex
|
||||
// lockState is true if we're using locks
|
||||
lockState bool
|
||||
|
||||
info *state.LockInfo
|
||||
|
||||
// lockCancel cancels the Context use for lockRenewPeriodic, and is
|
||||
// called when unlocking, or before creating a new lock if the lock is
|
||||
// lost.
|
||||
lockCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (c *RemoteClient) ListObjectsNames(prefix string, delim string) ([]string, error) {
|
||||
if err := c.ensureContainerExists(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// List our raw path
|
||||
listOpts := objects.ListOpts{
|
||||
Full: false,
|
||||
Prefix: prefix,
|
||||
Delimiter: delim,
|
||||
}
|
||||
|
||||
result := []string{}
|
||||
pager := objects.List(c.client, c.container, listOpts)
|
||||
// Define an anonymous function to be executed on each page's iteration
|
||||
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
||||
objectList, err := objects.ExtractNames(page)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Error extracting names from objects from page %+v", err)
|
||||
}
|
||||
for _, object := range objectList {
|
||||
result = append(result, object)
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
||||
}
|
||||
|
||||
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
||||
log.Printf("[DEBUG] Getting object %s in container %s", TFSTATE_NAME, c.container)
|
||||
result := objects.Download(c.client, c.container, TFSTATE_NAME, nil)
|
||||
|
||||
// Extract any errors from result
|
||||
_, err := result.Extract()
|
||||
payload, err := c.get(c.objectName)
|
||||
|
||||
// 404 response is to be expected if the object doesn't already exist!
|
||||
if _, ok := err.(gophercloud.ErrDefault404); ok {
|
||||
|
@ -40,6 +106,160 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
return payload, err
|
||||
}
|
||||
|
||||
// swift is eventually constistent. Consistency
|
||||
// is ensured by the Get func which will always try
|
||||
// to retrieve the most recent object
|
||||
func (c *RemoteClient) Put(data []byte) error {
|
||||
if c.expireSecs != 0 {
|
||||
log.Printf("[DEBUG] ExpireSecs = %d", c.expireSecs)
|
||||
return c.put(c.objectName, data, c.expireSecs, "")
|
||||
}
|
||||
|
||||
return c.put(c.objectName, data, -1, "")
|
||||
|
||||
}
|
||||
|
||||
func (c *RemoteClient) Delete() error {
|
||||
return c.delete(c.objectName)
|
||||
}
|
||||
|
||||
func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if !c.lockState {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] Acquiring Lock %#v on %s/%s", info, c.container, c.objectName)
|
||||
|
||||
// This check only is to ensure we strictly follow the specification.
|
||||
// Terraform shouldn't ever re-lock, so provide errors for the possible
|
||||
// states if this is called.
|
||||
if c.info != nil {
|
||||
// we have an active lock already
|
||||
return "", fmt.Errorf("state %q already locked", c.lockFilePath())
|
||||
}
|
||||
|
||||
// update the path we're using
|
||||
info.Path = c.lockFilePath()
|
||||
|
||||
if err := c.writeLockInfo(info, lockTTL, "*"); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] Acquired Lock %s on %s", info.ID, c.objectName)
|
||||
|
||||
c.info = info
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c.lockCancel = cancel
|
||||
|
||||
// keep the lock renewed
|
||||
go c.lockRenewPeriodic(ctx, info)
|
||||
|
||||
return info.ID, nil
|
||||
}
|
||||
|
||||
func (c *RemoteClient) Unlock(id string) error {
|
||||
c.mu.Lock()
|
||||
|
||||
if !c.lockState {
|
||||
return nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// The periodic lock renew is canceled
|
||||
// the lockCancel func may not be nil in most usecases
|
||||
// but can typically be nil when using a second client
|
||||
// to ForceUnlock the state based on the same lock Id
|
||||
if c.lockCancel != nil {
|
||||
c.lockCancel()
|
||||
}
|
||||
c.info = nil
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
|
||||
log.Printf("[DEBUG] Releasing Lock %s on %s", id, c.objectName)
|
||||
|
||||
info, err := c.lockInfo()
|
||||
if err != nil {
|
||||
return c.lockError(fmt.Errorf("failed to retrieve lock info: %s", err), nil)
|
||||
}
|
||||
|
||||
c.info = info
|
||||
|
||||
// conflicting lock
|
||||
if info.ID != id {
|
||||
return c.lockError(fmt.Errorf("lock id %q does not match existing lock", id), info)
|
||||
}
|
||||
|
||||
// before the lock object deletion is ordered, we shall
|
||||
// stop periodic renew
|
||||
if c.lockCancel != nil {
|
||||
c.lockCancel()
|
||||
}
|
||||
|
||||
if err = c.delete(c.lockFilePath()); err != nil {
|
||||
return c.lockError(fmt.Errorf("error deleting lock with %q: %s", id, err), info)
|
||||
}
|
||||
|
||||
// Swift is eventually consistent; we have to wait until
|
||||
// the lock is effectively deleted to return, or raise
|
||||
// an error if deadline is reached.
|
||||
|
||||
warning := `
|
||||
WARNING: Waiting for lock deletion timed out.
|
||||
Swift has accepted the deletion order of the lock %s/%s.
|
||||
But as it is eventually consistent, complete deletion
|
||||
may happen later.
|
||||
`
|
||||
deadline := time.Now().Add(deleteRetryTimeout)
|
||||
for {
|
||||
if time.Now().Before(deadline) {
|
||||
info, err := c.lockInfo()
|
||||
|
||||
// 404 response is to be expected if the lock deletion
|
||||
// has been processed
|
||||
if _, ok := err.(gophercloud.ErrDefault404); ok {
|
||||
log.Println("[DEBUG] Lock has been deleted.")
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// conflicting lock
|
||||
if info.ID != id {
|
||||
log.Printf("[DEBUG] Someone else has acquired a lock: %v.", info)
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] Lock is still there, delete again and wait %v.", deleteRetryPollInterval)
|
||||
c.delete(c.lockFilePath())
|
||||
time.Sleep(deleteRetryPollInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf(warning, c.container, c.lockFilePath())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (c *RemoteClient) get(object string) (*remote.Payload, error) {
|
||||
log.Printf("[DEBUG] Getting object %s/%s", c.container, object)
|
||||
result := objects.Download(c.client, c.container, object, objects.DownloadOpts{Newest: true})
|
||||
|
||||
// Extract any errors from result
|
||||
_, err := result.Extract()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bytes, err := result.ExtractContent()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -54,31 +274,225 @@ func (c *RemoteClient) Get() (*remote.Payload, error) {
|
|||
return payload, nil
|
||||
}
|
||||
|
||||
func (c *RemoteClient) Put(data []byte) error {
|
||||
func (c *RemoteClient) put(object string, data []byte, deleteAfter int, ifNoneMatch string) error {
|
||||
log.Printf("[DEBUG] Writing object in %s/%s", c.container, object)
|
||||
if err := c.ensureContainerExists(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] Putting object %s in container %s", TFSTATE_NAME, c.container)
|
||||
reader := bytes.NewReader(data)
|
||||
contentType := "application/json"
|
||||
contentLength := int64(len(data))
|
||||
|
||||
createOpts := objects.CreateOpts{
|
||||
Content: reader,
|
||||
Content: bytes.NewReader(data),
|
||||
ContentType: contentType,
|
||||
ContentLength: int64(contentLength),
|
||||
}
|
||||
|
||||
if c.expireSecs != 0 {
|
||||
log.Printf("[DEBUG] ExpireSecs = %d", c.expireSecs)
|
||||
createOpts.DeleteAfter = c.expireSecs
|
||||
if deleteAfter >= 0 {
|
||||
createOpts.DeleteAfter = deleteAfter
|
||||
}
|
||||
|
||||
result := objects.Create(c.client, c.container, TFSTATE_NAME, createOpts)
|
||||
if ifNoneMatch != "" {
|
||||
createOpts.IfNoneMatch = ifNoneMatch
|
||||
}
|
||||
|
||||
return result.Err
|
||||
result := objects.Create(c.client, c.container, object, createOpts)
|
||||
if result.Err != nil {
|
||||
return result.Err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *RemoteClient) Delete() error {
|
||||
log.Printf("[DEBUG] Deleting object %s in container %s", TFSTATE_NAME, c.container)
|
||||
result := objects.Delete(c.client, c.container, TFSTATE_NAME, nil)
|
||||
return result.Err
|
||||
func (c *RemoteClient) deleteContainer() error {
|
||||
log.Printf("[DEBUG] Deleting container %s", c.container)
|
||||
|
||||
warning := `
|
||||
WARNING: Waiting for container %s deletion timed out.
|
||||
It may have been left in your Openstack account and may incur storage charges.
|
||||
error was: %s
|
||||
`
|
||||
|
||||
deadline := time.Now().Add(deleteRetryTimeout)
|
||||
|
||||
// Swift is eventually consistent; we have to retry until
|
||||
// all objects are effectively deleted to delete the container
|
||||
// If we still have objects in the container, or raise
|
||||
// an error if deadline is reached
|
||||
for {
|
||||
if time.Now().Before(deadline) {
|
||||
// Remove any objects
|
||||
c.cleanObjects()
|
||||
|
||||
// Delete the container
|
||||
log.Printf("[DEBUG] Deleting container %s", c.container)
|
||||
deleteResult := containers.Delete(c.client, c.container)
|
||||
if deleteResult.Err != nil {
|
||||
// container is not found, thus has been deleted
|
||||
if _, ok := deleteResult.Err.(gophercloud.ErrDefault404); ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 409 http error is raised when deleting a container with
|
||||
// remaining objects
|
||||
if respErr, ok := deleteResult.Err.(gophercloud.ErrUnexpectedResponseCode); ok && respErr.Actual == 409 {
|
||||
time.Sleep(deleteRetryPollInterval)
|
||||
log.Printf("[DEBUG] Remaining objects, failed to delete container, retrying...")
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf(warning, deleteResult.Err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf(warning, c.container, "timeout reached")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Helper function to delete Swift objects within a container
|
||||
func (c *RemoteClient) cleanObjects() error {
|
||||
// Get a slice of object names
|
||||
objectNames, err := c.objectNames(c.container)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, object := range objectNames {
|
||||
log.Printf("[DEBUG] Deleting object %s from container %s", object, c.container)
|
||||
result := objects.Delete(c.client, c.container, object, nil)
|
||||
if result.Err == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// if object is not found, it has already been deleted
|
||||
if _, ok := result.Err.(gophercloud.ErrDefault404); !ok {
|
||||
return fmt.Errorf("Error deleting object %s from container %s: %v", object, c.container, result.Err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (c *RemoteClient) delete(object string) error {
|
||||
log.Printf("[DEBUG] Deleting object %s/%s", c.container, object)
|
||||
|
||||
result := objects.Delete(c.client, c.container, object, nil)
|
||||
|
||||
if result.Err != nil {
|
||||
return result.Err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *RemoteClient) writeLockInfo(info *state.LockInfo, deleteAfter time.Duration, ifNoneMatch string) error {
|
||||
err := c.put(c.lockFilePath(), info.Marshal(), int(deleteAfter.Seconds()), ifNoneMatch)
|
||||
|
||||
if httpErr, ok := err.(gophercloud.ErrUnexpectedResponseCode); ok && httpErr.Actual == 412 {
|
||||
log.Printf("[DEBUG] Couldn't write lock %s. One already exists.", info.ID)
|
||||
info2, err2 := c.lockInfo()
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("Couldn't read lock info: %v", err2)
|
||||
}
|
||||
|
||||
return c.lockError(err, info2)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return c.lockError(err, nil)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *RemoteClient) lockError(err error, conflictingLock *state.LockInfo) *state.LockError {
|
||||
lockErr := &state.LockError{
|
||||
Err: err,
|
||||
Info: conflictingLock,
|
||||
}
|
||||
|
||||
return lockErr
|
||||
}
|
||||
|
||||
// lockInfo reads the lock file, parses its contents and returns the parsed
|
||||
// LockInfo struct.
|
||||
func (c *RemoteClient) lockInfo() (*state.LockInfo, error) {
|
||||
raw, err := c.get(c.lockFilePath())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := &state.LockInfo{}
|
||||
|
||||
if err := json.Unmarshal(raw.Data, info); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (c *RemoteClient) lockRenewPeriodic(ctx context.Context, info *state.LockInfo) error {
|
||||
log.Printf("[DEBUG] Renew lock %v", info)
|
||||
|
||||
waitDur := lockRenewInterval
|
||||
lastRenewTime := time.Now()
|
||||
var lastErr error
|
||||
for {
|
||||
if time.Since(lastRenewTime) > lockTTL {
|
||||
return lastErr
|
||||
}
|
||||
select {
|
||||
case <-time.After(waitDur):
|
||||
c.mu.Lock()
|
||||
// Unlock may have released the mu.Lock
|
||||
// in which case we shouldn't renew the lock
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("[DEBUG] Stopping Periodic renew of lock %v", info)
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
info2, err := c.lockInfo()
|
||||
if _, ok := err.(gophercloud.ErrDefault404); ok {
|
||||
log.Println("[DEBUG] Lock has expired trying to reacquire.")
|
||||
err = nil
|
||||
}
|
||||
|
||||
if err == nil && (info2 == nil || info.ID == info2.ID) {
|
||||
info2 = info
|
||||
log.Printf("[DEBUG] Renewing lock %v.", info)
|
||||
err = c.writeLockInfo(info, lockTTL, "")
|
||||
}
|
||||
|
||||
c.mu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] could not reacquire lock (%v): %s", info, err)
|
||||
waitDur = time.Second
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
|
||||
// conflicting lock
|
||||
if info2.ID != info.ID {
|
||||
return c.lockError(fmt.Errorf("lock id %q does not match existing lock %q", info.ID, info2.ID), info2)
|
||||
}
|
||||
|
||||
waitDur = lockRenewInterval
|
||||
lastRenewTime = time.Now()
|
||||
|
||||
case <-ctx.Done():
|
||||
log.Printf("[DEBUG] Stopping Periodic renew of lock %s", info.ID)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *RemoteClient) lockFilePath() string {
|
||||
return c.objectName + lockSuffix
|
||||
}
|
||||
|
||||
func (c *RemoteClient) ensureContainerExists() error {
|
||||
|
@ -105,11 +519,19 @@ func (c *RemoteClient) ensureContainerExists() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func multiEnv(ks []string) string {
|
||||
for _, k := range ks {
|
||||
if v := os.Getenv(k); v != "" {
|
||||
return v
|
||||
// Helper function to get a list of objects in a Swift container
|
||||
func (c *RemoteClient) objectNames(container string) (objectNames []string, err error) {
|
||||
_ = objects.List(c.client, container, nil).EachPage(func(page pagination.Page) (bool, error) {
|
||||
// Get a slice of object names
|
||||
names, err := objects.ExtractNames(page)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Error extracting object names from page: %s", err)
|
||||
}
|
||||
}
|
||||
return ""
|
||||
for _, object := range names {
|
||||
objectNames = append(objectNames, object)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ func TestRemoteClient_impl(t *testing.T) {
|
|||
func TestRemoteClient(t *testing.T) {
|
||||
testACC(t)
|
||||
|
||||
container := fmt.Sprintf("terraform-state-swift-test-%x", time.Now().Unix())
|
||||
container := fmt.Sprintf("terraform-state-swift-testclient-%x", time.Now().Unix())
|
||||
|
||||
b := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{
|
||||
"container": container,
|
||||
|
@ -27,7 +27,12 @@ func TestRemoteClient(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer deleteSwiftContainer(t, b.client, container)
|
||||
client := &RemoteClient{
|
||||
client: b.client,
|
||||
container: b.container,
|
||||
}
|
||||
|
||||
defer client.deleteContainer()
|
||||
|
||||
remote.TestClient(t, state.(*remote.State).Client)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue