import sys
import boto3
import json
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.conf import SparkConf

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

args = getResolvedOptions(sys.argv, ['JOB_NAME','s3_bucket_name','db_name','table_name'])
job_name = args['JOB_NAME']
s3_bucket_name = args['s3_bucket_name']
db_name = args['db_name']
table_name = args['table_name']

#initialize the job
job.init(job_name, args)

# Script generated for node Salesforce
input_Salesforce_Dyf = glueContext.create_dynamic_frame.from_options(connection_type="salesforce", connection_options={"entityName": "Account", "apiVersion": "v60.0", "connectionName": "Salesforce_Connection"}, transformation_ctx="inputSalesforceDyf")

#Upload data into transactional data lake
tables_collection = spark.catalog.listTables(db_name)
table_names_in_db = [table.name for table in tables_collection]
table_exists = table_name in table_names_in_db

input_Salesforce_Df = input_Salesforce_Dyf.toDF()

if table_exists:
    print(f"Table {db_name}.{table_name} already exists, upserting data")
    
    input_Salesforce_Df.createOrReplaceTempView("input_data")
    
    stmt_merge_insert_update = f"""
        WITH SOURCE AS
        (
            SELECT  b.*
            FROM    input_data b
            LEFT OUTER JOIN glue_catalog.{db_name}.{table_name} a
            ON b.id = a.id
        )
        MERGE INTO glue_catalog.{db_name}.{table_name} AS TARGET
        USING SOURCE
        ON TARGET.id = SOURCE.id
        WHEN MATCHED AND TARGET.lastmodifieddate < SOURCE.lastmodifieddate THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """
        
    spark.sql(stmt_merge_insert_update)
    
    stmt_merge_delete = f"""
        WITH DELETED AS
        (
            SELECT  a.*
            FROM    input_data b
            RIGHT OUTER JOIN glue_catalog.{db_name}.{table_name} a
            ON b.id = a.id
            WHERE b.id IS NULL
        )
        MERGE INTO glue_catalog.{db_name}.{table_name} AS TARGET
        USING DELETED
        ON TARGET.id = DELETED.id
        WHEN MATCHED THEN DELETE
        """
    
    spark.sql(stmt_merge_delete)
else:
    print(f"Table {db_name}.{table_name} does not exist. Creating the table and inserting data")
    
    additional_options = {"write.parquet.compression-codec": "gzip"}
    
    input_Salesforce_Df.writeTo(f"glue_catalog.{db_name}.{table_name}") \
        .tableProperty("format-version", "2") \
        .tableProperty("location", f's3://{s3_bucket_name}/{db_name}/{table_name}') \
        .options(**additional_options) \
        .create()

job.commit()