Fixes support for changing just the read / write capacity of a GSI

This commit is contained in:
John Ewart 2015-06-15 17:05:50 -07:00
parent 320e4b222c
commit 4e219b3bad
1 changed files with 165 additions and 52 deletions

View File

@ -121,7 +121,7 @@ func resourceAwsDynamoDbTable() *schema.Resource {
},
"range_key": &schema.Schema{
Type: schema.TypeString,
Required: true,
Optional: true,
},
"projection_type": &schema.Schema{
Type: schema.TypeString,
@ -139,6 +139,8 @@ func resourceAwsDynamoDbTable() *schema.Resource {
var buf bytes.Buffer
m := v.(map[string]interface{})
buf.WriteString(fmt.Sprintf("%s-", m["name"].(string)))
buf.WriteString(fmt.Sprintf("%d-", m["write_capacity"].(int)))
buf.WriteString(fmt.Sprintf("%d-", m["read_capacity"].(int)))
return hashcode.String(buf.String())
},
},
@ -300,6 +302,7 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er
}
if d.HasChange("global_secondary_index") {
log.Printf("[DEBUG] Changed GSI data")
req := &dynamodb.UpdateTableInput{
TableName: aws.String(d.Id()),
}
@ -308,12 +311,29 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er
oldSet := o.(*schema.Set)
newSet := n.(*schema.Set)
changedSet := newSet.Intersection(oldSet)
// Track old names so we can know which ones we need to just update based on
// capacity changes, terraform appears to only diff on the set hash, not the
// contents so we need to make sure we don't delete any indexes that we
// just want to update the capacity for
oldGsiNameSet := make(map[string]bool)
newGsiNameSet := make(map[string]bool)
for _, gsidata := range oldSet.List() {
gsiName := gsidata.(map[string]interface{})["name"].(string)
oldGsiNameSet[gsiName] = true
}
for _, gsidata := range newSet.List() {
gsiName := gsidata.(map[string]interface{})["name"].(string)
newGsiNameSet[gsiName] = true
}
// First determine what's new
for _, newgsidata := range newSet.List() {
updates := []*dynamodb.GlobalSecondaryIndexUpdate{}
if !oldSet.Contains(newgsidata) {
newGsiName := newgsidata.(map[string]interface{})["name"].(string)
if _, exists := oldGsiNameSet[newGsiName]; !exists {
attributes := []*dynamodb.AttributeDefinition{}
gsidata := newgsidata.(map[string]interface{})
gsi := createGSIFromData(&gsidata)
@ -327,12 +347,9 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er
},
}
updates = append(updates, update)
hashkey_type, err := getAttributeType(d, *(gsi.KeySchema[0].AttributeName))
if err != nil {
return err
}
rangekey_type, err := getAttributeType(d, *(gsi.KeySchema[1].AttributeName))
// Hash key is required, range key isn't
hashkey_type, err := getAttributeType(d, *(gsi.KeySchema[0].AttributeName))
if err != nil {
return err
}
@ -341,10 +358,19 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er
AttributeName: gsi.KeySchema[0].AttributeName,
AttributeType: aws.String(hashkey_type),
})
// If there's a range key, there will be 2 elements in KeySchema
if len(gsi.KeySchema) == 2 {
rangekey_type, err := getAttributeType(d, *(gsi.KeySchema[1].AttributeName))
if err != nil {
return err
}
attributes = append(attributes, &dynamodb.AttributeDefinition{
AttributeName: gsi.KeySchema[1].AttributeName,
AttributeType: aws.String(rangekey_type),
})
}
req.AttributeDefinitions = attributes
req.GlobalSecondaryIndexUpdates = updates
@ -362,7 +388,8 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er
for _, oldgsidata := range oldSet.List() {
updates := []*dynamodb.GlobalSecondaryIndexUpdate{}
if !newSet.Contains(oldgsidata) {
oldGsiName := oldgsidata.(map[string]interface{})["name"].(string)
if _, exists := newGsiNameSet[oldGsiName]; !exists {
gsidata := oldgsidata.(map[string]interface{})
log.Printf("[DEBUG] Deleting GSI %s", gsidata["name"].(string))
update := &dynamodb.GlobalSecondaryIndexUpdate{
@ -382,24 +409,70 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er
waitForTableToBeActive(d.Id(), meta)
}
}
}
// Update any out-of-date read / write capacity
if gsiObjects, ok := d.GetOk("global_secondary_index"); ok {
gsiSet := gsiObjects.(*schema.Set)
if len(gsiSet.List()) > 0 {
log.Printf("Updating capacity as needed!")
// We can only change throughput, but we need to make sure it's actually changed
tableDescription, err := dynamodbconn.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String(d.Id()),
})
if err != nil {
return err
}
table := tableDescription.Table
for _, updatedgsidata := range changedSet.List() {
updates := []*dynamodb.GlobalSecondaryIndexUpdate{}
for _, updatedgsidata := range gsiSet.List() {
gsidata := updatedgsidata.(map[string]interface{})
log.Printf("[DEBUG] Updating GSI %s", gsidata["name"].(string))
gsiName := gsidata["name"].(string)
gsiWriteCapacity := gsidata["write_capacity"].(int)
gsiReadCapacity := gsidata["read_capacity"].(int)
log.Printf("[DEBUG] Updating GSI %s", gsiName)
gsi, err := getGlobalSecondaryIndex(gsiName, table.GlobalSecondaryIndexes)
if err != nil {
return err
}
capacityUpdated := false
if int64(gsiReadCapacity) != *(gsi.ProvisionedThroughput.ReadCapacityUnits) ||
int64(gsiWriteCapacity) != *(gsi.ProvisionedThroughput.WriteCapacityUnits) {
capacityUpdated = true
}
if capacityUpdated {
update := &dynamodb.GlobalSecondaryIndexUpdate{
Update: &dynamodb.UpdateGlobalSecondaryIndexAction{
IndexName: aws.String(gsidata["name"].(string)),
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
WriteCapacityUnits: aws.Long(int64(gsidata["write_capacity"].(int))),
ReadCapacityUnits: aws.Long(int64(gsidata["read_capacity"].(int))),
WriteCapacityUnits: aws.Long(int64(gsiWriteCapacity)),
ReadCapacityUnits: aws.Long(int64(gsiReadCapacity)),
},
},
}
updates = append(updates, update)
}
if len(updates) > 0 {
req := &dynamodb.UpdateTableInput{
TableName: aws.String(d.Id()),
}
req.GlobalSecondaryIndexUpdates = updates
log.Printf("[DEBUG] Updating GSI read / write capacity on %s", d.Id())
_, err := dynamodbconn.UpdateTable(req)
if err != nil {
@ -408,12 +481,16 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er
}
}
}
}
}
return resourceAwsDynamoDbTableRead(d, meta)
}
func resourceAwsDynamoDbTableRead(d *schema.ResourceData, meta interface{}) error {
dynamodbconn := meta.(*AWSClient).dynamodbconn
log.Printf("[DEBUG] Loading data for DynamoDB table '%s'", d.Id())
req := &dynamodb.DescribeTableInput{
TableName: aws.String(d.Id()),
}
@ -431,21 +508,39 @@ func resourceAwsDynamoDbTableRead(d *schema.ResourceData, meta interface{}) erro
attributes := []interface{}{}
for _, attrdef := range table.AttributeDefinitions {
attribute := make(map[string]string)
attribute["name"] = *(attrdef.AttributeName)
attribute["type"] = *(attrdef.AttributeType)
attribute := map[string]string{
"name": *(attrdef.AttributeName),
"type": *(attrdef.AttributeType),
}
attributes = append(attributes, attribute)
log.Printf("[DEBUG] Added Attribute: %s", attribute["name"])
}
d.Set("attribute", attributes)
gsiList := []interface{}{}
gsiList := make([]map[string]interface{}, 0, len(table.GlobalSecondaryIndexes))
for _, gsiObject := range table.GlobalSecondaryIndexes {
gsi := make(map[string]interface{})
gsi["write_capacity"] = gsiObject.ProvisionedThroughput.WriteCapacityUnits
gsi["read_capacity"] = gsiObject.ProvisionedThroughput.ReadCapacityUnits
gsi["name"] = gsiObject.IndexName
gsi := map[string]interface{}{
"write_capacity": *(gsiObject.ProvisionedThroughput.WriteCapacityUnits),
"read_capacity": *(gsiObject.ProvisionedThroughput.ReadCapacityUnits),
"name": *(gsiObject.IndexName),
}
for _, attribute := range gsiObject.KeySchema {
if *attribute.KeyType == "HASH" {
gsi["hash_key"] = *attribute.AttributeName
}
if *attribute.KeyType == "RANGE" {
gsi["range_key"] = *attribute.AttributeName
}
}
gsi["projection_type"] = *(gsiObject.Projection.ProjectionType)
gsi["non_key_attributes"] = gsiObject.Projection.NonKeyAttributes
gsiList = append(gsiList, gsi)
log.Printf("[DEBUG] Added GSI: %s - Read: %d / Write: %d", gsi["name"], gsi["read_capacity"], gsi["write_capacity"])
}
d.Set("global_secondary_index", gsiList)
@ -486,18 +581,26 @@ func createGSIFromData(data *map[string]interface{}) dynamodb.GlobalSecondaryInd
writeCapacity := (*data)["write_capacity"].(int)
readCapacity := (*data)["read_capacity"].(int)
return dynamodb.GlobalSecondaryIndex{
IndexName: aws.String((*data)["name"].(string)),
KeySchema: []*dynamodb.KeySchemaElement{
key_schema := []*dynamodb.KeySchemaElement{
&dynamodb.KeySchemaElement{
AttributeName: aws.String((*data)["hash_key"].(string)),
KeyType: aws.String("HASH"),
},
&dynamodb.KeySchemaElement{
AttributeName: aws.String((*data)["range_key"].(string)),
}
range_key_name := (*data)["range_key"]
if range_key_name != "" {
range_key_element := &dynamodb.KeySchemaElement{
AttributeName: aws.String(range_key_name.(string)),
KeyType: aws.String("RANGE"),
},
},
}
key_schema = append(key_schema, range_key_element)
}
return dynamodb.GlobalSecondaryIndex{
IndexName: aws.String((*data)["name"].(string)),
KeySchema: key_schema,
Projection: projection,
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
WriteCapacityUnits: aws.Long(int64(writeCapacity)),
@ -506,6 +609,16 @@ func createGSIFromData(data *map[string]interface{}) dynamodb.GlobalSecondaryInd
}
}
func getGlobalSecondaryIndex(indexName string, indexList []*dynamodb.GlobalSecondaryIndexDescription) (*dynamodb.GlobalSecondaryIndexDescription, error) {
for _, gsi := range indexList {
if *(gsi.IndexName) == indexName {
return gsi, nil
}
}
return &dynamodb.GlobalSecondaryIndexDescription{}, fmt.Errorf("Can't find a GSI by that name...")
}
func getAttributeType(d *schema.ResourceData, attributeName string) (string, error) {
if attributedata, ok := d.GetOk("attribute"); ok {
attributeSet := attributedata.(*schema.Set)
@ -581,8 +694,8 @@ func waitForTableToBeActive(tableName string, meta interface{}) error {
// Wait for a few seconds
if !activeState {
log.Printf("[DEBUG] Sleeping for 3 seconds for table to become active")
time.Sleep(3 * time.Second)
log.Printf("[DEBUG] Sleeping for 5 seconds for table to become active")
time.Sleep(5 * time.Second)
}
}