import base64

from configs import dify_config
from core.file import file_repository
from core.helper import ssrf_proxy
from core.model_runtime.entities import AudioPromptMessageContent, ImagePromptMessageContent
from extensions.ext_database import db
from extensions.ext_storage import storage

from . import helpers
from .enums import FileAttribute
from .models import File, FileTransferMethod, FileType
from .tool_file_parser import ToolFileParser


def get_attr(*, file: File, attr: FileAttribute):
    match attr:
        case FileAttribute.TYPE:
            return file.type.value
        case FileAttribute.SIZE:
            return file.size
        case FileAttribute.NAME:
            return file.filename
        case FileAttribute.MIME_TYPE:
            return file.mime_type
        case FileAttribute.TRANSFER_METHOD:
            return file.transfer_method.value
        case FileAttribute.URL:
            return file.remote_url
        case FileAttribute.EXTENSION:
            return file.extension
        case _:
            raise ValueError(f"Invalid file attribute: {attr}")


def to_prompt_message_content(f: File, /):
    """
    Convert a File object to an ImagePromptMessageContent object.

    This function takes a File object and converts it to an ImagePromptMessageContent
    object, which can be used as a prompt for image-based AI models.

    Args:
        file (File): The File object to convert. Must be of type FileType.IMAGE.

    Returns:
        ImagePromptMessageContent: An object containing the image data and detail level.

    Raises:
        ValueError: If the file is not an image or if the file data is missing.

    Note:
        The detail level of the image prompt is determined by the file's extra_config.
        If not specified, it defaults to ImagePromptMessageContent.DETAIL.LOW.
    """
    match f.type:
        case FileType.IMAGE:
            if dify_config.MULTIMODAL_SEND_IMAGE_FORMAT == "url":
                data = _to_url(f)
            else:
                data = _to_base64_data_string(f)

            if f._extra_config and f._extra_config.image_config and f._extra_config.image_config.detail:
                detail = f._extra_config.image_config.detail
            else:
                detail = ImagePromptMessageContent.DETAIL.LOW

            return ImagePromptMessageContent(data=data, detail=detail)
        case FileType.AUDIO:
            encoded_string = _file_to_encoded_string(f)
            if f.extension is None:
                raise ValueError("Missing file extension")
            return AudioPromptMessageContent(data=encoded_string, format=f.extension.lstrip("."))
        case _:
            raise ValueError(f"file type {f.type} is not supported")


def download(f: File, /):
    if f.transfer_method == FileTransferMethod.TOOL_FILE:
        tool_file = file_repository.get_tool_file(session=db.session(), file=f)
        return _download_file_content(tool_file.file_key)
    elif f.transfer_method == FileTransferMethod.LOCAL_FILE:
        upload_file = file_repository.get_upload_file(session=db.session(), file=f)
        return _download_file_content(upload_file.key)
    # remote file
    response = ssrf_proxy.get(f.remote_url, follow_redirects=True)
    response.raise_for_status()
    return response.content


def _download_file_content(path: str, /):
    """
    Download and return the contents of a file as bytes.

    This function loads the file from storage and ensures it's in bytes format.

    Args:
        path (str): The path to the file in storage.

    Returns:
        bytes: The contents of the file as a bytes object.

    Raises:
        ValueError: If the loaded file is not a bytes object.
    """
    data = storage.load(path, stream=False)
    if not isinstance(data, bytes):
        raise ValueError(f"file {path} is not a bytes object")
    return data


def _get_encoded_string(f: File, /):
    match f.transfer_method:
        case FileTransferMethod.REMOTE_URL:
            response = ssrf_proxy.get(f.remote_url)
            response.raise_for_status()
            content = response.content
            encoded_string = base64.b64encode(content).decode("utf-8")
            return encoded_string
        case FileTransferMethod.LOCAL_FILE:
            upload_file = file_repository.get_upload_file(session=db.session(), file=f)
            data = _download_file_content(upload_file.key)
            encoded_string = base64.b64encode(data).decode("utf-8")
            return encoded_string
        case FileTransferMethod.TOOL_FILE:
            tool_file = file_repository.get_tool_file(session=db.session(), file=f)
            data = _download_file_content(tool_file.file_key)
            encoded_string = base64.b64encode(data).decode("utf-8")
            return encoded_string
        case _:
            raise ValueError(f"Unsupported transfer method: {f.transfer_method}")


def _to_base64_data_string(f: File, /):
    encoded_string = _get_encoded_string(f)
    return f"data:{f.mime_type};base64,{encoded_string}"


def _file_to_encoded_string(f: File, /):
    match f.type:
        case FileType.IMAGE:
            return _to_base64_data_string(f)
        case FileType.AUDIO:
            return _get_encoded_string(f)
        case _:
            raise ValueError(f"file type {f.type} is not supported")


def _to_url(f: File, /):
    if f.transfer_method == FileTransferMethod.REMOTE_URL:
        if f.remote_url is None:
            raise ValueError("Missing file remote_url")
        return f.remote_url
    elif f.transfer_method == FileTransferMethod.LOCAL_FILE:
        if f.related_id is None:
            raise ValueError("Missing file related_id")
        return helpers.get_signed_file_url(upload_file_id=f.related_id)
    elif f.transfer_method == FileTransferMethod.TOOL_FILE:
        # add sign url
        if f.related_id is None or f.extension is None:
            raise ValueError("Missing file related_id or extension")
        return ToolFileParser.get_tool_file_manager().sign_file(tool_file_id=f.related_id, extension=f.extension)
    else:
        raise ValueError(f"Unsupported transfer method: {f.transfer_method}")