from datetime import datetime

from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessStartJobOperator,
)

from airflow import DAG
from airflow.models import Variable

# Replace these with your correct values
JOB_ROLE_ARN = "<<Role name arn here>>"
S3_LOGS_BUCKET = "<<bucket_name_here>>"
S3_BASE_BUCKET = "<<bucket_name_here>>"
EMR_SERVERLESS_APPLICATION_ID = "<<emr serverless application ID here>>" 

DEFAULT_MONITORING_CONFIG = {
    "monitoringConfiguration": {
        "s3MonitoringConfiguration": {"logUri": f"s3://{S3_LOGS_BUCKET}/logs/"}
    },
}

with DAG(
    dag_id="blog_dag_mwaa_emrs_ny_taxi",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    tags=["blog_dag_mwaa_emrs_ny_taxi"],
    catchup=False,
) as dag:
    yellow = EmrServerlessStartJobOperator(
        task_id="yellow_taxi_zone_lookup",
        application_id=EMR_SERVERLESS_APPLICATION_ID,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": f"s3://{S3_BASE_BUCKET}/scripts/yellow_zone.py",
                "entryPointArguments": [S3_BASE_BUCKET]
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    green = EmrServerlessStartJobOperator(
        task_id="green_taxi_zone_lookup",
        application_id=EMR_SERVERLESS_APPLICATION_ID,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": f"s3://{S3_BASE_BUCKET}/scripts/green_zone.py",
                "entryPointArguments": [S3_BASE_BUCKET]
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    summary = EmrServerlessStartJobOperator(
        task_id="ny_taxi_summary",
        application_id=EMR_SERVERLESS_APPLICATION_ID,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
               "entryPoint": f"s3://{S3_BASE_BUCKET}/scripts/ny_taxi_summary.py",
                "entryPointArguments": [S3_BASE_BUCKET]
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    ([yellow, green] >> summary)