From 4e219b3bad3b34532b58f2f8119ff658b7fe97f1 Mon Sep 17 00:00:00 2001 From: John Ewart Date: Mon, 15 Jun 2015 17:05:50 -0700 Subject: [PATCH] Fixes support for changing just the read / write capacity of a GSI --- .../aws/resource_aws_dynamodb_table.go | 217 +++++++++++++----- 1 file changed, 165 insertions(+), 52 deletions(-) diff --git a/builtin/providers/aws/resource_aws_dynamodb_table.go b/builtin/providers/aws/resource_aws_dynamodb_table.go index bbc0fd861..163d14d8d 100644 --- a/builtin/providers/aws/resource_aws_dynamodb_table.go +++ b/builtin/providers/aws/resource_aws_dynamodb_table.go @@ -121,7 +121,7 @@ func resourceAwsDynamoDbTable() *schema.Resource { }, "range_key": &schema.Schema{ Type: schema.TypeString, - Required: true, + Optional: true, }, "projection_type": &schema.Schema{ Type: schema.TypeString, @@ -139,6 +139,8 @@ func resourceAwsDynamoDbTable() *schema.Resource { var buf bytes.Buffer m := v.(map[string]interface{}) buf.WriteString(fmt.Sprintf("%s-", m["name"].(string))) + buf.WriteString(fmt.Sprintf("%d-", m["write_capacity"].(int))) + buf.WriteString(fmt.Sprintf("%d-", m["read_capacity"].(int))) return hashcode.String(buf.String()) }, }, @@ -300,6 +302,7 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er } if d.HasChange("global_secondary_index") { + log.Printf("[DEBUG] Changed GSI data") req := &dynamodb.UpdateTableInput{ TableName: aws.String(d.Id()), } @@ -308,12 +311,29 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er oldSet := o.(*schema.Set) newSet := n.(*schema.Set) - changedSet := newSet.Intersection(oldSet) + + // Track old names so we can know which ones we need to just update based on + // capacity changes, terraform appears to only diff on the set hash, not the + // contents so we need to make sure we don't delete any indexes that we + // just want to update the capacity for + oldGsiNameSet := make(map[string]bool) + newGsiNameSet := make(map[string]bool) + + for _, gsidata := range oldSet.List() { + gsiName := gsidata.(map[string]interface{})["name"].(string) + oldGsiNameSet[gsiName] = true + } + + for _, gsidata := range newSet.List() { + gsiName := gsidata.(map[string]interface{})["name"].(string) + newGsiNameSet[gsiName] = true + } // First determine what's new for _, newgsidata := range newSet.List() { updates := []*dynamodb.GlobalSecondaryIndexUpdate{} - if !oldSet.Contains(newgsidata) { + newGsiName := newgsidata.(map[string]interface{})["name"].(string) + if _, exists := oldGsiNameSet[newGsiName]; !exists { attributes := []*dynamodb.AttributeDefinition{} gsidata := newgsidata.(map[string]interface{}) gsi := createGSIFromData(&gsidata) @@ -327,12 +347,9 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er }, } updates = append(updates, update) - hashkey_type, err := getAttributeType(d, *(gsi.KeySchema[0].AttributeName)) - if err != nil { - return err - } - rangekey_type, err := getAttributeType(d, *(gsi.KeySchema[1].AttributeName)) + // Hash key is required, range key isn't + hashkey_type, err := getAttributeType(d, *(gsi.KeySchema[0].AttributeName)) if err != nil { return err } @@ -341,10 +358,19 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er AttributeName: gsi.KeySchema[0].AttributeName, AttributeType: aws.String(hashkey_type), }) - attributes = append(attributes, &dynamodb.AttributeDefinition{ - AttributeName: gsi.KeySchema[1].AttributeName, - AttributeType: aws.String(rangekey_type), - }) + + // If there's a range key, there will be 2 elements in KeySchema + if len(gsi.KeySchema) == 2 { + rangekey_type, err := getAttributeType(d, *(gsi.KeySchema[1].AttributeName)) + if err != nil { + return err + } + + attributes = append(attributes, &dynamodb.AttributeDefinition{ + AttributeName: gsi.KeySchema[1].AttributeName, + AttributeType: aws.String(rangekey_type), + }) + } req.AttributeDefinitions = attributes req.GlobalSecondaryIndexUpdates = updates @@ -362,7 +388,8 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er for _, oldgsidata := range oldSet.List() { updates := []*dynamodb.GlobalSecondaryIndexUpdate{} - if !newSet.Contains(oldgsidata) { + oldGsiName := oldgsidata.(map[string]interface{})["name"].(string) + if _, exists := newGsiNameSet[oldGsiName]; !exists { gsidata := oldgsidata.(map[string]interface{}) log.Printf("[DEBUG] Deleting GSI %s", gsidata["name"].(string)) update := &dynamodb.GlobalSecondaryIndexUpdate{ @@ -382,31 +409,80 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er waitForTableToBeActive(d.Id(), meta) } } + } - for _, updatedgsidata := range changedSet.List() { - updates := []*dynamodb.GlobalSecondaryIndexUpdate{} - gsidata := updatedgsidata.(map[string]interface{}) - log.Printf("[DEBUG] Updating GSI %s", gsidata["name"].(string)) - update := &dynamodb.GlobalSecondaryIndexUpdate{ - Update: &dynamodb.UpdateGlobalSecondaryIndexAction{ - IndexName: aws.String(gsidata["name"].(string)), - ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ - WriteCapacityUnits: aws.Long(int64(gsidata["write_capacity"].(int))), - ReadCapacityUnits: aws.Long(int64(gsidata["read_capacity"].(int))), - }, - }, - } - updates = append(updates, update) + // Update any out-of-date read / write capacity + if gsiObjects, ok := d.GetOk("global_secondary_index"); ok { + gsiSet := gsiObjects.(*schema.Set) + if len(gsiSet.List()) > 0 { + log.Printf("Updating capacity as needed!") - req.GlobalSecondaryIndexUpdates = updates - - _, err := dynamodbconn.UpdateTable(req) + // We can only change throughput, but we need to make sure it's actually changed + tableDescription, err := dynamodbconn.DescribeTable(&dynamodb.DescribeTableInput{ + TableName: aws.String(d.Id()), + }) if err != nil { - log.Printf("[DEBUG] Error updating table: %s", err) return err } + + table := tableDescription.Table + + updates := []*dynamodb.GlobalSecondaryIndexUpdate{} + + for _, updatedgsidata := range gsiSet.List() { + gsidata := updatedgsidata.(map[string]interface{}) + gsiName := gsidata["name"].(string) + gsiWriteCapacity := gsidata["write_capacity"].(int) + gsiReadCapacity := gsidata["read_capacity"].(int) + + log.Printf("[DEBUG] Updating GSI %s", gsiName) + gsi, err := getGlobalSecondaryIndex(gsiName, table.GlobalSecondaryIndexes) + + if err != nil { + return err + } + + capacityUpdated := false + + if int64(gsiReadCapacity) != *(gsi.ProvisionedThroughput.ReadCapacityUnits) || + int64(gsiWriteCapacity) != *(gsi.ProvisionedThroughput.WriteCapacityUnits) { + capacityUpdated = true + } + + if capacityUpdated { + update := &dynamodb.GlobalSecondaryIndexUpdate{ + Update: &dynamodb.UpdateGlobalSecondaryIndexAction{ + IndexName: aws.String(gsidata["name"].(string)), + ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ + WriteCapacityUnits: aws.Long(int64(gsiWriteCapacity)), + ReadCapacityUnits: aws.Long(int64(gsiReadCapacity)), + }, + }, + } + updates = append(updates, update) + + } + + if len(updates) > 0 { + + req := &dynamodb.UpdateTableInput{ + TableName: aws.String(d.Id()), + } + + req.GlobalSecondaryIndexUpdates = updates + + log.Printf("[DEBUG] Updating GSI read / write capacity on %s", d.Id()) + _, err := dynamodbconn.UpdateTable(req) + + if err != nil { + log.Printf("[DEBUG] Error updating table: %s", err) + return err + } + } + } } + } return resourceAwsDynamoDbTableRead(d, meta) @@ -414,6 +490,7 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er func resourceAwsDynamoDbTableRead(d *schema.ResourceData, meta interface{}) error { dynamodbconn := meta.(*AWSClient).dynamodbconn + log.Printf("[DEBUG] Loading data for DynamoDB table '%s'", d.Id()) req := &dynamodb.DescribeTableInput{ TableName: aws.String(d.Id()), } @@ -431,21 +508,39 @@ func resourceAwsDynamoDbTableRead(d *schema.ResourceData, meta interface{}) erro attributes := []interface{}{} for _, attrdef := range table.AttributeDefinitions { - attribute := make(map[string]string) - attribute["name"] = *(attrdef.AttributeName) - attribute["type"] = *(attrdef.AttributeType) + attribute := map[string]string{ + "name": *(attrdef.AttributeName), + "type": *(attrdef.AttributeType), + } attributes = append(attributes, attribute) + log.Printf("[DEBUG] Added Attribute: %s", attribute["name"]) } d.Set("attribute", attributes) - gsiList := []interface{}{} + gsiList := make([]map[string]interface{}, 0, len(table.GlobalSecondaryIndexes)) for _, gsiObject := range table.GlobalSecondaryIndexes { - gsi := make(map[string]interface{}) - gsi["write_capacity"] = gsiObject.ProvisionedThroughput.WriteCapacityUnits - gsi["read_capacity"] = gsiObject.ProvisionedThroughput.ReadCapacityUnits - gsi["name"] = gsiObject.IndexName + gsi := map[string]interface{}{ + "write_capacity": *(gsiObject.ProvisionedThroughput.WriteCapacityUnits), + "read_capacity": *(gsiObject.ProvisionedThroughput.ReadCapacityUnits), + "name": *(gsiObject.IndexName), + } + + for _, attribute := range gsiObject.KeySchema { + if *attribute.KeyType == "HASH" { + gsi["hash_key"] = *attribute.AttributeName + } + + if *attribute.KeyType == "RANGE" { + gsi["range_key"] = *attribute.AttributeName + } + } + + gsi["projection_type"] = *(gsiObject.Projection.ProjectionType) + gsi["non_key_attributes"] = gsiObject.Projection.NonKeyAttributes + gsiList = append(gsiList, gsi) + log.Printf("[DEBUG] Added GSI: %s - Read: %d / Write: %d", gsi["name"], gsi["read_capacity"], gsi["write_capacity"]) } d.Set("global_secondary_index", gsiList) @@ -486,18 +581,26 @@ func createGSIFromData(data *map[string]interface{}) dynamodb.GlobalSecondaryInd writeCapacity := (*data)["write_capacity"].(int) readCapacity := (*data)["read_capacity"].(int) - return dynamodb.GlobalSecondaryIndex{ - IndexName: aws.String((*data)["name"].(string)), - KeySchema: []*dynamodb.KeySchemaElement{ - &dynamodb.KeySchemaElement{ - AttributeName: aws.String((*data)["hash_key"].(string)), - KeyType: aws.String("HASH"), - }, - &dynamodb.KeySchemaElement{ - AttributeName: aws.String((*data)["range_key"].(string)), - KeyType: aws.String("RANGE"), - }, + key_schema := []*dynamodb.KeySchemaElement{ + &dynamodb.KeySchemaElement{ + AttributeName: aws.String((*data)["hash_key"].(string)), + KeyType: aws.String("HASH"), }, + } + + range_key_name := (*data)["range_key"] + if range_key_name != "" { + range_key_element := &dynamodb.KeySchemaElement{ + AttributeName: aws.String(range_key_name.(string)), + KeyType: aws.String("RANGE"), + } + + key_schema = append(key_schema, range_key_element) + } + + return dynamodb.GlobalSecondaryIndex{ + IndexName: aws.String((*data)["name"].(string)), + KeySchema: key_schema, Projection: projection, ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ WriteCapacityUnits: aws.Long(int64(writeCapacity)), @@ -506,6 +609,16 @@ func createGSIFromData(data *map[string]interface{}) dynamodb.GlobalSecondaryInd } } +func getGlobalSecondaryIndex(indexName string, indexList []*dynamodb.GlobalSecondaryIndexDescription) (*dynamodb.GlobalSecondaryIndexDescription, error) { + for _, gsi := range indexList { + if *(gsi.IndexName) == indexName { + return gsi, nil + } + } + + return &dynamodb.GlobalSecondaryIndexDescription{}, fmt.Errorf("Can't find a GSI by that name...") +} + func getAttributeType(d *schema.ResourceData, attributeName string) (string, error) { if attributedata, ok := d.GetOk("attribute"); ok { attributeSet := attributedata.(*schema.Set) @@ -581,8 +694,8 @@ func waitForTableToBeActive(tableName string, meta interface{}) error { // Wait for a few seconds if !activeState { - log.Printf("[DEBUG] Sleeping for 3 seconds for table to become active") - time.Sleep(3 * time.Second) + log.Printf("[DEBUG] Sleeping for 5 seconds for table to become active") + time.Sleep(5 * time.Second) } }