terraform/builtin/providers/aws/resource_aws_emr_instance_g...

252 lines
6.5 KiB
Go

package aws
import (
"errors"
"log"
"time"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/emr"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
)
var emrInstanceGroupNotFound = errors.New("No matching EMR Instance Group")
func resourceAwsEMRInstanceGroup() *schema.Resource {
return &schema.Resource{
Create: resourceAwsEMRInstanceGroupCreate,
Read: resourceAwsEMRInstanceGroupRead,
Update: resourceAwsEMRInstanceGroupUpdate,
Delete: resourceAwsEMRInstanceGroupDelete,
Schema: map[string]*schema.Schema{
"cluster_id": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"instance_type": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"instance_count": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 0,
},
"running_instance_count": &schema.Schema{
Type: schema.TypeInt,
Computed: true,
},
"status": &schema.Schema{
Type: schema.TypeString,
Computed: true,
},
"name": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
},
}
}
func resourceAwsEMRInstanceGroupCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
clusterId := d.Get("cluster_id").(string)
instanceType := d.Get("instance_type").(string)
instanceCount := d.Get("instance_count").(int)
groupName := d.Get("name").(string)
params := &emr.AddInstanceGroupsInput{
InstanceGroups: []*emr.InstanceGroupConfig{
{
InstanceRole: aws.String("TASK"),
InstanceCount: aws.Int64(int64(instanceCount)),
InstanceType: aws.String(instanceType),
Name: aws.String(groupName),
},
},
JobFlowId: aws.String(clusterId),
}
log.Printf("[DEBUG] Creating EMR task group params: %s", params)
resp, err := conn.AddInstanceGroups(params)
if err != nil {
return err
}
log.Printf("[DEBUG] Created EMR task group finished: %#v", resp)
if resp == nil || len(resp.InstanceGroupIds) == 0 {
return fmt.Errorf("Error creating instance groups: no instance group returned")
}
d.SetId(*resp.InstanceGroupIds[0])
return nil
}
func resourceAwsEMRInstanceGroupRead(d *schema.ResourceData, meta interface{}) error {
group, err := fetchEMRInstanceGroup(meta, d.Get("cluster_id").(string), d.Id())
if err != nil {
switch err {
case emrInstanceGroupNotFound:
log.Printf("[DEBUG] EMR Instance Group (%s) not found, removing", d.Id())
d.SetId("")
return nil
default:
return err
}
}
// Guard against the chance of fetchEMRInstanceGroup returning nil group but
// not a emrInstanceGroupNotFound error
if group == nil {
log.Printf("[DEBUG] EMR Instance Group (%s) not found, removing", d.Id())
d.SetId("")
return nil
}
d.Set("name", group.Name)
d.Set("instance_count", group.RequestedInstanceCount)
d.Set("running_instance_count", group.RunningInstanceCount)
d.Set("instance_type", group.InstanceType)
if group.Status != nil && group.Status.State != nil {
d.Set("status", group.Status.State)
}
return nil
}
func fetchAllEMRInstanceGroups(meta interface{}, clusterId string) ([]*emr.InstanceGroup, error) {
conn := meta.(*AWSClient).emrconn
req := &emr.ListInstanceGroupsInput{
ClusterId: aws.String(clusterId),
}
var groups []*emr.InstanceGroup
marker := aws.String("intitial")
for marker != nil {
log.Printf("[DEBUG] EMR Cluster Instance Marker: %s", *marker)
respGrps, errGrps := conn.ListInstanceGroups(req)
if errGrps != nil {
return nil, fmt.Errorf("[ERR] Error reading EMR cluster (%s): %s", clusterId, errGrps)
}
if respGrps == nil {
return nil, fmt.Errorf("[ERR] Error reading EMR Instance Group for cluster (%s)", clusterId)
}
if respGrps.InstanceGroups != nil {
for _, g := range respGrps.InstanceGroups {
groups = append(groups, g)
}
} else {
log.Printf("[DEBUG] EMR Instance Group list was empty")
}
marker = respGrps.Marker
}
if len(groups) == 0 {
return nil, fmt.Errorf("[WARN] No instance groups found for EMR Cluster (%s)", clusterId)
}
return groups, nil
}
func fetchEMRInstanceGroup(meta interface{}, clusterId, groupId string) (*emr.InstanceGroup, error) {
groups, err := fetchAllEMRInstanceGroups(meta, clusterId)
if err != nil {
return nil, err
}
var group *emr.InstanceGroup
for _, ig := range groups {
if groupId == *ig.Id {
group = ig
break
}
}
if group != nil {
return group, nil
}
return nil, emrInstanceGroupNotFound
}
func resourceAwsEMRInstanceGroupUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
log.Printf("[DEBUG] Modify EMR task group")
instanceCount := d.Get("instance_count").(int)
params := &emr.ModifyInstanceGroupsInput{
InstanceGroups: []*emr.InstanceGroupModifyConfig{
{
InstanceGroupId: aws.String(d.Id()),
InstanceCount: aws.Int64(int64(instanceCount)),
},
},
}
_, err := conn.ModifyInstanceGroups(params)
if err != nil {
return err
}
stateConf := &resource.StateChangeConf{
Pending: []string{"PROVISIONING", "BOOTSTRAPPING", "RESIZING"},
Target: []string{"RUNNING"},
Refresh: instanceGroupStateRefresh(conn, d.Get("cluster_id").(string), d.Id()),
Timeout: 10 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}
_, err = stateConf.WaitForState()
if err != nil {
return fmt.Errorf(
"Error waiting for instance (%s) to terminate: %s", d.Id(), err)
}
return resourceAwsEMRInstanceGroupRead(d, meta)
}
func instanceGroupStateRefresh(meta interface{}, clusterID, igID string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
group, err := fetchEMRInstanceGroup(meta, clusterID, igID)
if err != nil {
return nil, "Not Found", err
}
if group.Status == nil || group.Status.State == nil {
log.Printf("[WARN] ERM Instance Group found, but without state")
return nil, "Undefined", fmt.Errorf("Undefined EMR Cluster Instance Group state")
}
return group, *group.Status.State, nil
}
}
func resourceAwsEMRInstanceGroupDelete(d *schema.ResourceData, meta interface{}) error {
log.Printf("[WARN] AWS EMR Instance Group does not support DELETE; resizing cluster to zero before removing from state")
conn := meta.(*AWSClient).emrconn
params := &emr.ModifyInstanceGroupsInput{
InstanceGroups: []*emr.InstanceGroupModifyConfig{
{
InstanceGroupId: aws.String(d.Id()),
InstanceCount: aws.Int64(0),
},
},
}
_, err := conn.ModifyInstanceGroups(params)
if err != nil {
return err
}
return nil
}