# Databricks notebook source
from pyspark.sql import SparkSession, functions as F
from datetime import datetime, date, timedelta, timezone, time
from zoneinfo import ZoneInfo
import smtplib, socket
from email.mime.text import MIMEText

# =========================
# KONFIGURASJON
# =========================
USAGE_TABLE      = "system.billing.usage"           # DBU-forbruk
TO_EMAIL         = "[email protected]"    #bevisst valg med å la epost adr være åpent, god reklame

SMTP_HOST        = "send.one.com"   #bytt ut til din epost leverandørs server
SMTP_PORT        = 587  #port
TIMEOUT_S        = 30  #time out i sek

OSLO_TZ          = ZoneInfo("Europe/Oslo")

# Fast pris/kurs (PLAIN)
DBU_PRICE_USD    = 0.35      # <- sett riktig USD-pris per DBU for din SKU, generelt er det 0.35 på SQL og serverless
USD_NOK_RATE     = 10.20     # <- fast valutakurs USD->NOK - kurs pr 03.11.25

 
# E-post (hentes fra Secrets)
FROM_EMAIL = dbutils.secrets.get("mail", "o365_user")
O365_PASS  = dbutils.secrets.get("mail", "o365_password")

spark = SparkSession.builder.getOrCreate()

# =========================
# HJELPERE
# =========================
def fmt_no(n: float) -> str:
    """Norsk pengevri: tusenskilletegn=mellomrom, desimal=komma, 2 desimaler."""
    s = f"{abs(n):,.2f}".replace(",", " ").replace(".", ",")
    return f"-{s}" if n < 0 else s

def fmt_en(n: float) -> str:
    return f"{n:,.2f}"

def norwegian_datetime(dt: datetime) -> str:
    months = ["januar","februar","mars","april","mai","juni","juli","august","september","oktober","november","desember"]
    return f"{dt.day}. {months[dt.month - 1]} {dt.year} kl {dt.strftime('%H:%M:%S')}"

def get_cluster_name() -> str:
    try:
        return spark.conf.get("spark.databricks.clusterUsageTags.clusterName")
    except Exception:
        return "unknown"

def send_mail_o365(from_addr: str, to_addr: str, subject: str, body: str):
    msg = MIMEText(body, _charset="utf-8")
    msg["Subject"] = subject
    msg["From"] = from_addr
    msg["To"] = to_addr
    try:
        with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=TIMEOUT_S) as server:
            server.ehlo()
            server.starttls()
            server.ehlo()
            server.login(from_addr, O365_PASS)
            server.send_message(msg)
    except (smtplib.SMTPException, socket.timeout, OSError) as e:
        raise RuntimeError(f"Feil ved sending av e-post: {e}")

def yesterday_oslo() -> date:
    return (datetime.now(OSLO_TZ) - timedelta(days=1)).date()

def month_bounds_oslo_up_to(d: date) -> tuple[date, date]:
    first = d.replace(day=1)
    return first, d

def usage_with_oslo_date_df():
    """
    Base DF med DBU-forbruk og 'oslo_date' fra usage_start_time (helst) eller usage_date.
    Ingen SQL-strenger, kun DataFrame-API.
    """
    df = spark.table(USAGE_TABLE).where(F.col("usage_unit") == F.lit("DBU"))
    cols = set(df.columns)
    if "usage_start_time" in cols:
        # usage_start_time antas å være UTC -> konverter til Oslo og trekk ut dato
        df = df.withColumn("oslo_date", F.to_date(F.from_utc_timestamp(F.col("usage_start_time"), "Europe/Oslo")))
    elif "usage_date" in cols:
        df = df.withColumn("oslo_date", F.col("usage_date").cast("date"))
    elif "timestamp" in cols:
        df = df.withColumn("oslo_date", F.to_date(F.from_utc_timestamp(F.col("timestamp"), "Europe/Oslo")))
    else:
        raise RuntimeError("Fant ikke usage_start_time/usage_date/timestamp i system.billing.usage")
    return df.select("oslo_date", "usage_quantity")

def daily_usage_df(d_from: date, d_to: date):
    """DBU per Oslo-dato i [d_from, d_to]."""
    df = usage_with_oslo_date_df()
    return (
        df.where((F.col("oslo_date") >= F.lit(d_from.isoformat())) &
                 (F.col("oslo_date") <= F.lit(d_to.isoformat())))
          .groupBy("oslo_date")
          .agg(F.coalesce(F.sum("usage_quantity"), F.lit(0.0)).alias("dbu"))
    )

# =========================
# BEREGNING
# =========================
yday = yesterday_oslo()
m_start, m_end = month_bounds_oslo_up_to(yday)  # MTD: 1. i mnd -> gårsdagen

# Dag
daily_df = daily_usage_df(yday, yday)
daily_row = daily_df.collect()
if daily_row:
    dbu_yday = float(daily_row[0]["dbu"] or 0.0)
else:
    dbu_yday = 0.0

usd_yday = dbu_yday * DBU_PRICE_USD
nok_yday = usd_yday * USD_NOK_RATE

# MTD
mtd_df = daily_usage_df(m_start, m_end)
mtd_row = mtd_df.agg(
    F.coalesce(F.sum("dbu"), F.lit(0.0)).alias("dbu")
).collect()[0]
dbu_mtd = float(mtd_row["dbu"] or 0.0)
usd_mtd = dbu_mtd * DBU_PRICE_USD
nok_mtd = usd_mtd * USD_NOK_RATE

 

# =========================
# E-POST
# =========================
cluster_name = get_cluster_name()
now_oslo = datetime.now(OSLO_TZ)

subject = f"[DBX Kostnad] {yday.isoformat()}{fmt_no(nok_yday)} NOK ({fmt_en(dbu_yday)} DBU)"

body_lines = [
    "Databricks — kostnadsrapport (Europe/Oslo, fast valutakurs)",
    "",
    f"Kjøretidspunkt: {norwegian_datetime(now_oslo)}",
    f"Cluster: {cluster_name}",
    "",
    f"Gårsdagen ({yday.isoformat()}):",
    f"  • DBU brukt: {fmt_en(dbu_yday)} DBU",
    f"  • Pris/DBU: {fmt_en(DBU_PRICE_USD)} USD",
    f"  • USD→NOK (fast): {fmt_en(USD_NOK_RATE)}",
    f"  • Kost (USD): {fmt_en(usd_yday)}",
    f"  • Kost (NOK): {fmt_no(nok_yday)}",
    "",
    f"MTD {m_start.isoformat()}{m_end.isoformat()} (inkl. gårsdagen):",
    f"  • DBU brukt: {fmt_en(dbu_mtd)} DBU",
    f"  • Kost (USD): {fmt_en(usd_mtd)}",
    f"  • Kost (NOK): {fmt_no(nok_mtd)}",
    "",
]

body = "\n".join(body_lines)
print(body)

send_mail_o365(FROM_EMAIL, TO_EMAIL, subject, body)