Switch pg backend to use native Postgres locks

This commit is contained in:
Mars Hall 2019-02-25 16:05:53 -08:00
parent 8cb2943b6b
commit b9a91b7c1e
5 changed files with 115 additions and 181 deletions

View File

@ -11,8 +11,6 @@ import (
) )
const ( const (
locksTableName = "locks"
locksIndexName = "locks_by_name"
statesTableName = "states" statesTableName = "states"
statesIndexName = "states_by_name" statesIndexName = "states_by_name"
) )
@ -27,17 +25,10 @@ func New() backend.Backend {
Description: "Postgres connection string; a `postgres://` URL", Description: "Postgres connection string; a `postgres://` URL",
}, },
"lock": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Description: "Use locks to synchronize state access",
Default: true,
},
"schema_name": &schema.Schema{ "schema_name": &schema.Schema{
Type: schema.TypeString, Type: schema.TypeString,
Optional: true, Optional: true,
Description: "Name of the automatically managed Postgres schema to store locks & state", Description: "Name of the automatically managed Postgres schema to store state",
Default: "terraform_remote_backend", Default: "terraform_remote_backend",
}, },
}, },
@ -56,7 +47,6 @@ type Backend struct {
configData *schema.ResourceData configData *schema.ResourceData
connStr string connStr string
schemaName string schemaName string
lock bool
} }
func (b *Backend) configure(ctx context.Context) error { func (b *Backend) configure(ctx context.Context) error {
@ -66,7 +56,6 @@ func (b *Backend) configure(ctx context.Context) error {
b.connStr = data.Get("conn_str").(string) b.connStr = data.Get("conn_str").(string)
b.schemaName = data.Get("schema_name").(string) b.schemaName = data.Get("schema_name").(string)
b.lock = data.Get("lock").(bool)
db, err := sql.Open("postgres", b.connStr) db, err := sql.Open("postgres", b.connStr)
if err != nil { if err != nil {
@ -79,25 +68,10 @@ func (b *Backend) configure(ctx context.Context) error {
if _, err := db.Query(fmt.Sprintf(query, b.schemaName)); err != nil { if _, err := db.Query(fmt.Sprintf(query, b.schemaName)); err != nil {
return err return err
} }
query = `SET search_path TO %s`
if _, err := db.Query(fmt.Sprintf(query, b.schemaName)); err != nil {
return err
}
query = `CREATE TABLE IF NOT EXISTS %s.%s ( query = `CREATE TABLE IF NOT EXISTS %s.%s (
name text, id SERIAL PRIMARY KEY,
info jsonb, name TEXT,
created_at timestamp default current_timestamp data TEXT
)`
if _, err := db.Query(fmt.Sprintf(query, b.schemaName, locksTableName)); err != nil {
return err
}
query = `CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s.%s (name)`
if _, err := db.Query(fmt.Sprintf(query, locksIndexName, b.schemaName, locksTableName)); err != nil {
return err
}
query = `CREATE TABLE IF NOT EXISTS %s.%s (
name text,
data text
)` )`
if _, err := db.Query(fmt.Sprintf(query, b.schemaName, statesTableName)); err != nil { if _, err := db.Query(fmt.Sprintf(query, b.schemaName, statesTableName)); err != nil {
return err return err

View File

@ -2,7 +2,6 @@ package pg
import ( import (
"fmt" "fmt"
"strings"
"github.com/hashicorp/terraform/backend" "github.com/hashicorp/terraform/backend"
"github.com/hashicorp/terraform/state" "github.com/hashicorp/terraform/state"
@ -55,23 +54,13 @@ func (b *Backend) StateMgr(name string) (state.State, error) {
Client: b.db, Client: b.db,
Name: name, Name: name,
SchemaName: b.schemaName, SchemaName: b.schemaName,
lock: b.lock,
}, },
} }
// If we're not locking, disable it
if !b.lock {
stateMgr = &state.LockDisabled{Inner: stateMgr}
}
// Check to see if this state already exists. // Check to see if this state already exists.
// If we're trying to force-unlock a state, we can't take the lock before // If we're trying to force-unlock a state, we can't take the lock before
// fetching the state. If the state doesn't exist, we have to assume this // fetching the state. If the state doesn't exist, we have to assume this
// is a normal create operation, and take the lock at that point. // is a normal create operation, and take the lock at that point.
//
// If we need to force-unlock, but for some reason the state no longer
// exists, the user will have to use the `psql` tool to manually fix the
// situation.
existing, err := b.Workspaces() existing, err := b.Workspaces()
if err != nil { if err != nil {
return nil, err return nil, err
@ -99,13 +88,11 @@ func (b *Backend) StateMgr(name string) (state.State, error) {
// Local helper function so we can call it multiple places // Local helper function so we can call it multiple places
lockUnlock := func(parent error) error { lockUnlock := func(parent error) error {
if err := stateMgr.Unlock(lockId); err != nil { if err := stateMgr.Unlock(lockId); err != nil {
return fmt.Errorf(strings.TrimSpace(errStateUnlock), lockId, err) return fmt.Errorf(`error unlocking Postgres state: %s`, err)
} }
return parent return parent
} }
// If we have no state, we have to create an empty state
if v := stateMgr.State(); v == nil { if v := stateMgr.State(); v == nil {
if err := stateMgr.WriteState(states.NewState()); err != nil { if err := stateMgr.WriteState(states.NewState()); err != nil {
err = lockUnlock(err) err = lockUnlock(err)
@ -125,13 +112,3 @@ func (b *Backend) StateMgr(name string) (state.State, error) {
return stateMgr, nil return stateMgr, nil
} }
const errStateUnlock = `
Error unlocking Postgres state. Lock ID: %s
Error: %s
You may have to force-unlock this state in order to use it again.
The "pg" backend acquires a lock during initialization to ensure
the minimum required key/values are prepared.
`

View File

@ -1,7 +1,7 @@
package pg package pg
// Create the test database: createdb terraform_backend_pg_test // Create the test database: createdb terraform_backend_pg_test
// TF_PG_TEST=true make test TEST=./backend/remote-state/pg TESTARGS='-v -run ^TestBackend' // TF_ACC=1 make test TEST=./backend/remote-state/pg TESTARGS='-v -run ^TestBackend'
import ( import (
"database/sql" "database/sql"
@ -50,11 +50,6 @@ func TestBackendConfig(t *testing.T) {
t.Fatal("Backend could not be configured") t.Fatal("Backend could not be configured")
} }
_, err = b.db.Query(fmt.Sprintf("SELECT name, info, created_at FROM %s.%s LIMIT 1", schemaName, locksTableName))
if err != nil {
t.Fatal(err)
}
_, err = b.db.Query(fmt.Sprintf("SELECT name, data FROM %s.%s LIMIT 1", schemaName, statesTableName)) _, err = b.db.Query(fmt.Sprintf("SELECT name, data FROM %s.%s LIMIT 1", schemaName, statesTableName))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -125,38 +120,6 @@ func TestBackendStateLocks(t *testing.T) {
} }
backend.TestBackendStateLocks(t, b, bb) backend.TestBackendStateLocks(t, b, bb)
backend.TestBackendStateForceUnlock(t, b, bb)
}
func TestBackendStateLocksDisabled(t *testing.T) {
testACC(t)
connStr := getDatabaseUrl()
schemaName := fmt.Sprintf("terraform_%s", t.Name())
dbCleaner, err := sql.Open("postgres", connStr)
if err != nil {
t.Fatal(err)
}
defer dbCleaner.Query(fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schemaName))
config := backend.TestWrapConfig(map[string]interface{}{
"conn_str": connStr,
"schema_name": schemaName,
"lock": false,
})
b := backend.TestBackendConfig(t, New(), config).(*Backend)
if b == nil {
t.Fatal("Backend could not be configured")
}
bb := backend.TestBackendConfig(t, New(), config).(*Backend)
if bb == nil {
t.Fatal("Backend could not be configured")
}
backend.TestBackendStates(t, b)
backend.TestBackendStateLocks(t, b, bb)
} }
func getDatabaseUrl() string { func getDatabaseUrl() string {

View File

@ -1,13 +1,11 @@
package pg package pg
import ( import (
"context"
"crypto/md5" "crypto/md5"
"database/sql" "database/sql"
"encoding/json"
"errors"
"fmt" "fmt"
multierror "github.com/hashicorp/go-multierror"
uuid "github.com/hashicorp/go-uuid" uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/terraform/state" "github.com/hashicorp/terraform/state"
"github.com/hashicorp/terraform/state/remote" "github.com/hashicorp/terraform/state/remote"
@ -20,31 +18,48 @@ type RemoteClient struct {
Name string Name string
SchemaName string SchemaName string
// Uses locks to synchronize state access // In-flight database transaction. Empty unless Locked.
lock bool txn *sql.Tx
info *state.LockInfo info *state.LockInfo
} }
func (c *RemoteClient) Get() (*remote.Payload, error) { func (c *RemoteClient) Get() (*remote.Payload, error) {
query := `SELECT data FROM %s.%s WHERE name = $1` query := `SELECT data FROM %s.%s WHERE name = $1`
row := c.Client.QueryRow(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name) var row *sql.Row
// Use the open transaction when present
if c.txn != nil {
row = c.txn.QueryRow(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
} else {
row = c.Client.QueryRow(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
}
var data []byte var data []byte
err := row.Scan(&data) err := row.Scan(&data)
if err != nil { switch {
case err == sql.ErrNoRows:
// No existing state returns empty.
return nil, nil return nil, nil
case err != nil:
return nil, err
default:
md5 := md5.Sum(data)
return &remote.Payload{
Data: data,
MD5: md5[:],
}, nil
} }
md5 := md5.Sum(data)
return &remote.Payload{
Data: data,
MD5: md5[:],
}, nil
} }
func (c *RemoteClient) Put(data []byte) error { func (c *RemoteClient) Put(data []byte) error {
query := `INSERT INTO %s.%s (name, data) VALUES ($1, $2) query := `INSERT INTO %s.%s (name, data) VALUES ($1, $2)
ON CONFLICT (name) DO UPDATE ON CONFLICT (name) DO UPDATE
SET data = $2 WHERE %s.name = $1` SET data = $2 WHERE %s.name = $1`
_, err := c.Client.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName, statesTableName), c.Name, data) var err error
// Use the open transaction when present
if c.txn != nil {
_, err = c.txn.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName, statesTableName), c.Name, data)
} else {
_, err = c.Client.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName, statesTableName), c.Name, data)
}
if err != nil { if err != nil {
return err return err
} }
@ -53,7 +68,13 @@ func (c *RemoteClient) Put(data []byte) error {
func (c *RemoteClient) Delete() error { func (c *RemoteClient) Delete() error {
query := `DELETE FROM %s.%s WHERE name = $1` query := `DELETE FROM %s.%s WHERE name = $1`
_, err := c.Client.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name) var err error
// Use the open transaction when present
if c.txn != nil {
_, err = c.txn.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
} else {
_, err = c.Client.Exec(fmt.Sprintf(query, c.SchemaName, statesTableName), c.Name)
}
if err != nil { if err != nil {
return err return err
} }
@ -61,98 +82,100 @@ func (c *RemoteClient) Delete() error {
} }
func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) { func (c *RemoteClient) Lock(info *state.LockInfo) (string, error) {
// No-op when locking is disabled var err error
if !c.lock { var lockID string
return "", nil var txn *sql.Tx
}
c.info = info
if info.ID == "" { if info.ID == "" {
lockID, err := uuid.GenerateUUID() lockID, err = uuid.GenerateUUID()
if err != nil { if err != nil {
return "", err return "", err
} }
info.Operation = "client"
info.ID = lockID info.ID = lockID
} }
lockInfo, _ := c.getLockInfo() if c.txn == nil {
if lockInfo != nil { // Most strict transaction isolation to prevent cross-talk
lockErr := &state.LockError{ // between incomplete state transactions.
Info: lockInfo, txn, err = c.Client.BeginTx(context.Background(), &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return "", err
} }
lockErr.Err = errors.New("state locked") c.txn = txn
return "", lockErr } else {
return "", fmt.Errorf("Already in a transaction")
} }
query := `INSERT INTO %s.%s (name, info) VALUES ($1, $2)` // Do not wait before giving up on a contended lock.
data, err := json.Marshal(info) _, err = c.Client.Exec(`SET LOCAL lock_timeout = 0`)
if err != nil {
return "", err
}
_, err = c.Client.Exec(fmt.Sprintf(query, c.SchemaName, locksTableName), c.Name, data)
if err != nil { if err != nil {
c.rollback(info)
return "", err return "", err
} }
if err != nil { // Try to acquire lock for the existing row.
lockInfo, infoErr := c.getLockInfo() query := `SELECT pg_try_advisory_xact_lock(%s.id) FROM %s.%s WHERE %s.name = $1`
if infoErr != nil { row := c.txn.QueryRow(fmt.Sprintf(query, statesTableName, c.SchemaName, statesTableName, statesTableName), c.Name)
err = multierror.Append(err, infoErr) var didLock []byte
err = row.Scan(&didLock)
switch {
case err == sql.ErrNoRows:
// When the row does not yet exist in state, take
// the `-1` lock to create the new row.
innerRow := c.txn.QueryRow(`SELECT pg_try_advisory_xact_lock(-1)`)
var innerDidLock []byte
err := innerRow.Scan(&innerDidLock)
if err != nil {
c.rollback(info)
return "", err
} }
if string(innerDidLock) == "false" {
lockErr := &state.LockError{ c.rollback(info)
Err: err, return "", &state.LockError{Info: info}
Info: lockInfo,
} }
return "", lockErr case err != nil:
c.rollback(info)
return "", err
case string(didLock) == "false":
c.rollback(info)
return "", &state.LockError{Info: info}
default:
} }
return info.ID, nil return info.ID, nil
} }
func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) { func (c *RemoteClient) getLockInfo() (*state.LockInfo, error) {
query := `SELECT info FROM %s.%s WHERE name = $1` return c.info, nil
row := c.Client.QueryRow(fmt.Sprintf(query, c.SchemaName, locksTableName), c.Name)
var data []byte
err := row.Scan(&data)
if err != nil {
return nil, err
}
lockInfo := &state.LockInfo{}
err = json.Unmarshal(data, lockInfo)
if err != nil {
return nil, err
}
return lockInfo, nil
} }
func (c *RemoteClient) Unlock(id string) error { func (c *RemoteClient) Unlock(id string) error {
lockErr := &state.LockError{} if c.txn != nil {
err := c.txn.Commit()
lockInfo, err := c.getLockInfo() if err != nil {
if err != nil { return err
lockErr.Err = fmt.Errorf("failed to retrieve lock info: %s", err) }
return lockErr c.txn = nil
}
c.info = nil
return nil
}
// This must be called from any code path where the
// transaction would not be committed (unlocked),
// otherwise the transactions will leak and prevent
// the process from exiting cleanly.
func (c *RemoteClient) rollback(info *state.LockInfo) error {
if c.txn != nil {
err := c.txn.Rollback()
if err != nil {
return err
}
c.txn = nil
} }
lockErr.Info = lockInfo
if lockInfo.ID != id {
lockErr.Err = fmt.Errorf("lock id %q does not match existing lock", id)
return lockErr
}
query := `DELETE FROM %s.%s WHERE name = $1`
_, err = c.Client.Exec(fmt.Sprintf(query, c.SchemaName, locksTableName), c.Name)
if err != nil {
lockErr.Err = err
return lockErr
}
c.info = nil c.info = nil
return nil return nil
} }

View File

@ -46,23 +46,20 @@ data "terraform_remote_state" "network" {
The following configuration options or environment variables are supported: The following configuration options or environment variables are supported:
* `conn_str` - (Required) Postgres connection string; a `postgres://` URL * `conn_str` - (Required) Postgres connection string; a `postgres://` URL
* `lock` - Use locks to synchronize state access, default `true`
* `schema_name` - Name of the automatically-managed Postgres schema to store locks & state, default `terraform_remote_backend`. * `schema_name` - Name of the automatically-managed Postgres schema to store locks & state, default `terraform_remote_backend`.
## Technical Design ## Technical Design
Postgres version 9.5 or newer is required to support the "ON CONFLICT" upsert syntax and *jsonb* data type. Postgres version 9.5 or newer is required to support advisory locks, the "ON CONFLICT" upsert syntax, *jsonb* data type.
This backend creates two tables, **states** and **locks**, in the automatically-managed Postgres schema configured by the `schema_name` variable. This backend creates one table **states** in the automatically-managed Postgres schema configured by the `schema_name` variable.
Both tables are keyed by the [workspace](/docs/state/workspaces.html) name. If workspaces are not in use, the name `default` is used. The table is keyed by the [workspace](/docs/state/workspaces.html) name. If workspaces are not in use, the name `default` is used.
Locking is supported using [Postgres advisory locks](https://www.postgresql.org/docs/9.5/explicit-locking.html#ADVISORY-LOCKS).
The **states** table contains: The **states** table contains:
* a serial integer `id`, used as the key for advisory locks
* the workspace `name` key as *text* with a unique index * the workspace `name` key as *text* with a unique index
* the Terraform state `data` JSON as *text*. * the Terraform state `data` as *text*
The **locks** table contains:
* the workspace `name` key as *text* with a unique index
* the lock's `info` JSON as *jsonb*.