#!/bin/bash
# exampleJob.sh <CloudWatchNamespace>

# Create a python script that will be a sample spark job
CWNamespace=$1

cat > /tmp/samplejob.py << EOF
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# create a SparkSession
spark = SparkSession.builder.appName("skewed_dataframe").getOrCreate()

# generate a dataframe with 100 million rows and 10 columns
df = spark.range(100000000).toDF("id")
for i in range(40):
    df = df.withColumn("col_"+str(i), when(col("id") % 10 == i, rand()*100).otherwise(randn()*10))

df.repartition(800)#.count()

# skew the data by repeating rows with col_0 values > 80
skewed_df = df.filter(col("col_0") > 80).withColumn("id", monotonically_increasing_id()).limit(10000000)

# write the DataFrame to HDFS
skewed_df.write.format("parquet").mode("overwrite").save("/user/hadoop/data/tmp2/")

# read the DataFrame back from HDFS
df2 = spark.read.format("parquet").load("/user/hadoop/data/tmp2/")
df2 = df.union(df2)

newdf = df2.orderBy(col("col_0").asc())

# write the dataframe to disk
newdf.write.parquet("/user/hadoop/data/001/")

# stop the SparkSession
spark.stop()
EOF

spark-submit --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false --num-executors 6 --executor-cores 6 /tmp/samplejob.py 

CLUSTERID=$(jq '.jobFlowId' -r /emr/instance-controller/lib/info/job-flow.json)
APPLICATIONID=$(yarn application -list -appStates ALL | grep "application_" | cut -f1 | sort | tail -n1)"_1"
REGION=$(jq '.region' -r /emr/instance-controller/lib/info/extraInstanceData.json)

CWNamespace="$CWNamespace/CLUSTERIDSTRING"
# If this is EMR 7.x, it may already be configured with cloudwatch agent
EMRRelease=$(grep "releaseLabel" /emr/instance-controller/lib/info/job-flow-state.txt | cut -d'"' -f2)
if [[ ${EMRRelease:0:5} == "emr-7" ]]; then
  echo "$EMRRelease"
  if [[ $(grep "emr-amazon-cloudwatch-agent" /emr/instance-controller/lib/info/job-flow-state.txt | wc -l) -gt 0 ]]; then
    echo "CW Agent already installed"
    CWNamespace="CWAgent"
  fi
fi

cat > dashboard.json << EOF
{
    "start": "-PT1H",
    "widgets": [
        {
            "height": 6,
            "width": 10,
            "y": 10,
            "x": 0,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "SELECT MAX(heap_used) FROM SCHEMA(\"$CWNamespace/CLUSTERIDSTRING\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION", "period": 10 } ]
                ],
                "sparkline": true,
                "view": "timeSeries",
                "stacked": true,
                "region": "$REGION",
                "stat": "Average",
                "period": 10,
                "setPeriodToTimeRange": false,
                "trend": true,
                "title": "MemoryUsed",
                "legend": {
                    "position": "right"
                },
                "yAxis": {
                    "left": {
                        "label": "",
                        "showUnits": false
                    }
                }
            }
        },
        {
            "height": 8,
            "width": 6,
            "y": 6,
            "x": 10,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "SELECT MAX(bytesRead) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION", "visible": false, "period": 3600 } ],
                    [ { "expression": "q1/1000000000", "label": "", "id": "e1", "period": 3600, "region": "$REGION" } ]
                ],
                "sparkline": false,
                "view": "bar",
                "stacked": false,
                "region": "$REGION",
                "stat": "Average",
                "period": 3600,
                "setPeriodToTimeRange": true,
                "trend": false,
                "title": "GiBsRead",
                "legend": {
                    "position": "hidden"
                }
            }
        },
        {
            "height": 8,
            "width": 6,
            "y": 14,
            "x": 10,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "SELECT MAX(bytesWritten) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION", "visible": false } ],
                    [ { "expression": "q1/1000000000", "label": "", "id": "e1", "region": "$REGION" } ]
                ],
                "sparkline": false,
                "view": "bar",
                "stacked": false,
                "region": "$REGION",
                "stat": "Average",
                "period": 300,
                "setPeriodToTimeRange": true,
                "trend": false,
                "title": "GiBsWritten",
                "legend": {
                    "position": "hidden"
                }
            }
        },
        {
            "height": 16,
            "width": 3,
            "y": 6,
            "x": 21,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "q1", "label": "Expression1", "id": "e1", "visible": false } ],
                    [ { "expression": "SELECT MAX(threadpool_completeTasks) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION", "period": 900 } ]
                ],
                "sparkline": false,
                "view": "singleValue",
                "stacked": false,
                "region": "$REGION",
                "stat": "Average",
                "period": 900,
                "setPeriodToTimeRange": true,
                "trend": false,
                "title": "CompletedTasks"
            }
        },
        {
            "height": 16,
            "width": 2,
            "y": 6,
            "x": 19,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "q1", "label": "Expression1", "id": "e1", "visible": false } ],
                    [ { "expression": "SELECT MAX(threadpool_startedTasks) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION", "period": 900 } ]
                ],
                "sparkline": false,
                "view": "singleValue",
                "stacked": false,
                "region": "$REGION",
                "stat": "Average",
                "period": 900,
                "setPeriodToTimeRange": true,
                "trend": false,
                "title": "StartedTasks"
            }
        },
        {
            "height": 4,
            "width": 24,
            "y": 22,
            "x": 0,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "q1/60000", "label": "", "id": "e1", "stat": "Sum", "region": "$REGION", "period": 10 } ],
                    [ { "expression": "SELECT SUM(TotalGCTime) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION", "period": 10, "stat": "Sum", "visible": false } ]
                ],
                "sparkline": true,
                "view": "singleValue",
                "stacked": true,
                "region": "$REGION",
                "stat": "Sum",
                "period": 10,
                "setPeriodToTimeRange": false,
                "trend": true,
                "title": "TotalGCTime (Mins)",
                "legend": {
                    "position": "hidden"
                }
            }
        },
        {
            "height": 16,
            "width": 3,
            "y": 6,
            "x": 16,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "SELECT MAX(runTime) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION", "period": 900, "stat": "Maximum", "visible": false } ],
                    [ { "expression": "q1/60000", "label": "", "id": "e1", "stat": "Maximum", "region": "$REGION" } ]
                ],
                "sparkline": false,
                "view": "singleValue",
                "stacked": false,
                "region": "$REGION",
                "stat": "Maximum",
                "period": 900,
                "setPeriodToTimeRange": false,
                "trend": true,
                "title": "TotalRuntime(Mins)"
            }
        },
        {
            "height": 4,
            "width": 10,
            "y": 6,
            "x": 0,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"applicationMaster_numExecutorsFailed\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 300)", "label": "\${PROP('MetricName')}", "id": "e1", "region": "$REGION" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"tasks_unexcludedExecutors\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 300)", "label": "\${PROP('MetricName')}", "id": "e2", "region": "$REGION" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"tasks_excludedExecutors\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 300)", "label": "\${PROP('MetricName')}", "id": "e3", "region": "$REGION" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"applicationMaster_numExecutorsRunning\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 300)", "label": "\${PROP('MetricName')}", "id": "e4", "region": "$REGION" } ]
                ],
                "sparkline": false,
                "view": "singleValue",
                "stacked": false,
                "region": "$REGION",
                "stat": "Maximum",
                "period": 900,
                "setPeriodToTimeRange": true,
                "trend": false,
                "labels": {
                    "visible": true
                },
                "legend": {
                    "position": "hidden"
                },
                "title": "Executors"
            }
        },
        {
            "height": 6,
            "width": 18,
            "y": 0,
            "x": 6,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"jobs_succeededJobs\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 10)", "label": "\${PROP('MetricName')}", "id": "e4", "region": "$REGION" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"jobs_failedJobs\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 10)", "label": "\${PROP('MetricName')}", "id": "e5", "region": "$REGION" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"stages_completedStages\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 10)", "label": "\${PROP('MetricName')}", "id": "e1", "region": "$REGION", "color": "#2ca02c" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"stages_failedStages\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 10)", "label": "\${PROP('MetricName')}", "id": "e2", "region": "$REGION" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"stages_skippedStages\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 10)", "label": "\${PROP('MetricName')}", "id": "e3", "region": "$REGION" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"tasks_completedTasks\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 10)", "label": "\${PROP('MetricName')}", "id": "e6", "region": "$REGION" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"tasks_failedTasks\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 10)", "label": "\${PROP('MetricName')}", "id": "e7", "region": "$REGION" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"tasks_skippedTasks\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 10)", "label": "\${PROP('MetricName')}", "id": "e8", "region": "$REGION" } ],
                    [ { "expression": "SEARCH('{$CWNamespace,ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp} MetricName=\"tasks_killedTasks\" ApplicationID=\"APPLICATIONIDSTRING\"', 'Maximum', 10)", "label": "\${PROP('MetricName')}", "id": "e9", "region": "$REGION" } ]
                ],
                "sparkline": false,
                "view": "singleValue",
                "stacked": true,
                "region": "$REGION",
                "stat": "Maximum",
                "period": 900,
                "setPeriodToTimeRange": true,
                "trend": false,
                "labels": {
                    "visible": true
                },
                "legend": {
                    "position": "hidden"
                },
                "title": "Application stats"
            }
        },
        {
            "height": 6,
            "width": 10,
            "y": 16,
            "x": 0,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "q1*1.6666666666667E-11", "label": "", "id": "e1", "region": "$REGION", "period": 10 } ],
                    [ { "expression": "SELECT MAX(jvmCpuTime) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "Query1", "id": "q1", "visible": false, "region": "$REGION", "color": "#9edae5", "period": 10 } ]
                ],
                "sparkline": false,
                "view": "timeSeries",
                "stacked": true,
                "region": "$REGION",
                "stat": "Maximum",
                "period": 10,
                "setPeriodToTimeRange": true,
                "trend": false,
                "labels": {
                    "visible": true
                },
                "legend": {
                    "position": "right"
                },
                "title": "JVMCPUTime (Mins)",
                "yAxis": {
                    "left": {
                        "showUnits": false
                    }
                }
            }
        },
        {
            "height": 4,
            "width": 24,
            "y": 34,
            "x": 0,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "SELECT SUM(MajorGCCount) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION" } ]
                ],
                "sparkline": true,
                "view": "singleValue",
                "stacked": false,
                "region": "$REGION",
                "stat": "Average",
                "period": 60,
                "setPeriodToTimeRange": false,
                "trend": true,
                "title": "MajorGCCount",
                "legend": {
                    "position": "hidden"
                },
                "yAxis": {
                    "left": {
                        "label": "",
                        "showUnits": false
                    }
                }
            }
        },
        {
            "height": 4,
            "width": 24,
            "y": 38,
            "x": 0,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "SELECT SUM(MinorGCCount) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION" } ]
                ],
                "sparkline": true,
                "view": "singleValue",
                "stacked": true,
                "region": "$REGION",
                "stat": "Average",
                "period": 60,
                "setPeriodToTimeRange": false,
                "trend": true,
                "title": "MinorGCCount",
                "legend": {
                    "position": "right"
                },
                "yAxis": {
                    "left": {
                        "label": "",
                        "showUnits": false
                    }
                }
            }
        },
        {
            "height": 4,
            "width": 24,
            "y": 26,
            "x": 0,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "q1/60000", "label": "", "id": "e1", "region": "$REGION" } ],
                    [ { "expression": "SELECT SUM(MajorGCTime) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION", "visible": false } ]
                ],
                "sparkline": true,
                "view": "singleValue",
                "stacked": true,
                "region": "$REGION",
                "stat": "Average",
                "period": 60,
                "setPeriodToTimeRange": false,
                "trend": true,
                "title": "MajorGCTime(Mins)",
                "legend": {
                    "position": "hidden"
                },
                "yAxis": {
                    "left": {
                        "label": "",
                        "showUnits": false
                    }
                }
            }
        },
        {
            "height": 4,
            "width": 24,
            "y": 30,
            "x": 0,
            "type": "metric",
            "properties": {
                "metrics": [
                    [ { "expression": "q1/60000", "label": "", "id": "e1" } ],
                    [ { "expression": "SELECT SUM(MinorGCTime) FROM SCHEMA(\"$CWNamespace\", ApplicationID,FullMetricName,MetricInstance,Namespace,instanceID,jobflowId,metric_type,privateIp) WHERE ApplicationID = 'APPLICATIONIDSTRING' AND jobflowId = 'CLUSTERIDSTRING' GROUP BY MetricInstance", "label": "", "id": "q1", "region": "$REGION", "visible": false } ]
                ],
                "sparkline": true,
                "view": "singleValue",
                "stacked": true,
                "region": "$REGION",
                "stat": "Average",
                "period": 60,
                "setPeriodToTimeRange": false,
                "trend": true,
                "title": "MinorGCTime(Mins)",
                "legend": {
                    "position": "hidden"
                },
                "yAxis": {
                    "left": {
                        "label": "",
                        "showUnits": false
                    }
                }
            }
        },
        {
            "height": 6,
            "width": 6,
            "y": 0,
            "x": 0,
            "type": "text",
            "properties": {
                "markdown": "Information | _\n----|----- \nClusterID | CLUSTERIDSTRING \nApplicationID | APPLICATIONIDSTRING \nDashboard | [SparkMonitoring](#dashboards:name=SparkMonitoring) \nCluster Overview | [EMR Console](https://$REGION.console.aws.amazon.com/emr/home?region=$REGION#/clusterDetails/CLUSTERIDSTRING)\n Additional Metrics | [Cloudwatch Console](https://$REGION.console.aws.amazon.com/cloudwatch/home?region=$REGION#metricsV2:graph=~(view~'timeSeries~stacked~false~region~'$REGION~stat~'Average);query=~'*7bAWS*2fElasticMapReduce*2cJobFlowId*7d*20JobFlowId*3dCLUSTERIDSTRING)\n",
                "background": "solid"
            }
        }
    ]
}
EOF

sed -i "s/CLUSTERIDSTRING/$CLUSTERID/g" dashboard.json
sed -i "s/APPLICATIONIDSTRING/$APPLICATIONID/g" dashboard.json

aws cloudwatch put-dashboard --dashboard-name SparkMonitoring --dashboard-body "$(cat ./dashboard.json)"