from collections.abc import Mapping
from datetime import datetime
from enum import Enum, StrEnum
from typing import Any, Optional

from pydantic import BaseModel

from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk
from core.workflow.entities.node_entities import AgentNodeStrategyInit, NodeRunMetadataKey
from core.workflow.graph_engine.entities.graph_runtime_state import GraphRuntimeState
from core.workflow.nodes import NodeType
from core.workflow.nodes.base import BaseNodeData


class QueueEvent(StrEnum):
    """
    QueueEvent enum
    """

    LLM_CHUNK = "llm_chunk"
    TEXT_CHUNK = "text_chunk"
    AGENT_MESSAGE = "agent_message"
    MESSAGE_REPLACE = "message_replace"
    MESSAGE_END = "message_end"
    ADVANCED_CHAT_MESSAGE_END = "advanced_chat_message_end"
    WORKFLOW_STARTED = "workflow_started"
    WORKFLOW_SUCCEEDED = "workflow_succeeded"
    WORKFLOW_FAILED = "workflow_failed"
    WORKFLOW_PARTIAL_SUCCEEDED = "workflow_partial_succeeded"
    ITERATION_START = "iteration_start"
    ITERATION_NEXT = "iteration_next"
    ITERATION_COMPLETED = "iteration_completed"
    LOOP_START = "loop_start"
    LOOP_NEXT = "loop_next"
    LOOP_COMPLETED = "loop_completed"
    NODE_STARTED = "node_started"
    NODE_SUCCEEDED = "node_succeeded"
    NODE_FAILED = "node_failed"
    NODE_EXCEPTION = "node_exception"
    RETRIEVER_RESOURCES = "retriever_resources"
    ANNOTATION_REPLY = "annotation_reply"
    AGENT_THOUGHT = "agent_thought"
    MESSAGE_FILE = "message_file"
    PARALLEL_BRANCH_RUN_STARTED = "parallel_branch_run_started"
    PARALLEL_BRANCH_RUN_SUCCEEDED = "parallel_branch_run_succeeded"
    PARALLEL_BRANCH_RUN_FAILED = "parallel_branch_run_failed"
    AGENT_LOG = "agent_log"
    ERROR = "error"
    PING = "ping"
    STOP = "stop"
    RETRY = "retry"


class AppQueueEvent(BaseModel):
    """
    QueueEvent abstract entity
    """

    event: QueueEvent


class QueueLLMChunkEvent(AppQueueEvent):
    """
    QueueLLMChunkEvent entity
    Only for basic mode apps
    """

    event: QueueEvent = QueueEvent.LLM_CHUNK
    chunk: LLMResultChunk


class QueueIterationStartEvent(AppQueueEvent):
    """
    QueueIterationStartEvent entity
    """

    event: QueueEvent = QueueEvent.ITERATION_START
    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    start_at: datetime

    node_run_index: int
    inputs: Optional[Mapping[str, Any]] = None
    predecessor_node_id: Optional[str] = None
    metadata: Optional[Mapping[str, Any]] = None


class QueueIterationNextEvent(AppQueueEvent):
    """
    QueueIterationNextEvent entity
    """

    event: QueueEvent = QueueEvent.ITERATION_NEXT

    index: int
    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    parallel_mode_run_id: Optional[str] = None
    """iteratoin run in parallel mode run id"""
    node_run_index: int
    output: Optional[Any] = None  # output for the current iteration
    duration: Optional[float] = None


class QueueIterationCompletedEvent(AppQueueEvent):
    """
    QueueIterationCompletedEvent entity
    """

    event: QueueEvent = QueueEvent.ITERATION_COMPLETED

    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    start_at: datetime

    node_run_index: int
    inputs: Optional[Mapping[str, Any]] = None
    outputs: Optional[Mapping[str, Any]] = None
    metadata: Optional[Mapping[str, Any]] = None
    steps: int = 0

    error: Optional[str] = None


class QueueLoopStartEvent(AppQueueEvent):
    """
    QueueLoopStartEvent entity
    """

    event: QueueEvent = QueueEvent.LOOP_START
    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    start_at: datetime

    node_run_index: int
    inputs: Optional[Mapping[str, Any]] = None
    predecessor_node_id: Optional[str] = None
    metadata: Optional[Mapping[str, Any]] = None


class QueueLoopNextEvent(AppQueueEvent):
    """
    QueueLoopNextEvent entity
    """

    event: QueueEvent = QueueEvent.LOOP_NEXT

    index: int
    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    parallel_mode_run_id: Optional[str] = None
    """iteratoin run in parallel mode run id"""
    node_run_index: int
    output: Optional[Any] = None  # output for the current loop
    duration: Optional[float] = None


class QueueLoopCompletedEvent(AppQueueEvent):
    """
    QueueLoopCompletedEvent entity
    """

    event: QueueEvent = QueueEvent.LOOP_COMPLETED

    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    start_at: datetime

    node_run_index: int
    inputs: Optional[Mapping[str, Any]] = None
    outputs: Optional[Mapping[str, Any]] = None
    metadata: Optional[Mapping[str, Any]] = None
    steps: int = 0

    error: Optional[str] = None


class QueueTextChunkEvent(AppQueueEvent):
    """
    QueueTextChunkEvent entity
    """

    event: QueueEvent = QueueEvent.TEXT_CHUNK
    text: str
    from_variable_selector: Optional[list[str]] = None
    """from variable selector"""
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""


class QueueAgentMessageEvent(AppQueueEvent):
    """
    QueueMessageEvent entity
    """

    event: QueueEvent = QueueEvent.AGENT_MESSAGE
    chunk: LLMResultChunk


class QueueMessageReplaceEvent(AppQueueEvent):
    """
    QueueMessageReplaceEvent entity
    """

    event: QueueEvent = QueueEvent.MESSAGE_REPLACE
    text: str


class QueueRetrieverResourcesEvent(AppQueueEvent):
    """
    QueueRetrieverResourcesEvent entity
    """

    event: QueueEvent = QueueEvent.RETRIEVER_RESOURCES
    retriever_resources: list[dict]
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""


class QueueAnnotationReplyEvent(AppQueueEvent):
    """
    QueueAnnotationReplyEvent entity
    """

    event: QueueEvent = QueueEvent.ANNOTATION_REPLY
    message_annotation_id: str


class QueueMessageEndEvent(AppQueueEvent):
    """
    QueueMessageEndEvent entity
    """

    event: QueueEvent = QueueEvent.MESSAGE_END
    llm_result: Optional[LLMResult] = None


class QueueAdvancedChatMessageEndEvent(AppQueueEvent):
    """
    QueueAdvancedChatMessageEndEvent entity
    """

    event: QueueEvent = QueueEvent.ADVANCED_CHAT_MESSAGE_END


class QueueWorkflowStartedEvent(AppQueueEvent):
    """
    QueueWorkflowStartedEvent entity
    """

    event: QueueEvent = QueueEvent.WORKFLOW_STARTED
    graph_runtime_state: GraphRuntimeState


class QueueWorkflowSucceededEvent(AppQueueEvent):
    """
    QueueWorkflowSucceededEvent entity
    """

    event: QueueEvent = QueueEvent.WORKFLOW_SUCCEEDED
    outputs: Optional[dict[str, Any]] = None


class QueueWorkflowFailedEvent(AppQueueEvent):
    """
    QueueWorkflowFailedEvent entity
    """

    event: QueueEvent = QueueEvent.WORKFLOW_FAILED
    error: str
    exceptions_count: int


class QueueWorkflowPartialSuccessEvent(AppQueueEvent):
    """
    QueueWorkflowFailedEvent entity
    """

    event: QueueEvent = QueueEvent.WORKFLOW_PARTIAL_SUCCEEDED
    exceptions_count: int
    outputs: Optional[dict[str, Any]] = None


class QueueNodeStartedEvent(AppQueueEvent):
    """
    QueueNodeStartedEvent entity
    """

    event: QueueEvent = QueueEvent.NODE_STARTED

    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    node_run_index: int = 1
    predecessor_node_id: Optional[str] = None
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""
    start_at: datetime
    parallel_mode_run_id: Optional[str] = None
    """iteratoin run in parallel mode run id"""
    agent_strategy: Optional[AgentNodeStrategyInit] = None


class QueueNodeSucceededEvent(AppQueueEvent):
    """
    QueueNodeSucceededEvent entity
    """

    event: QueueEvent = QueueEvent.NODE_SUCCEEDED

    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""
    start_at: datetime

    inputs: Optional[Mapping[str, Any]] = None
    process_data: Optional[Mapping[str, Any]] = None
    outputs: Optional[Mapping[str, Any]] = None
    execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None

    error: Optional[str] = None
    """single iteration duration map"""
    iteration_duration_map: Optional[dict[str, float]] = None
    """single loop duration map"""
    loop_duration_map: Optional[dict[str, float]] = None


class QueueAgentLogEvent(AppQueueEvent):
    """
    QueueAgentLogEvent entity
    """

    event: QueueEvent = QueueEvent.AGENT_LOG
    id: str
    label: str
    node_execution_id: str
    parent_id: str | None
    error: str | None
    status: str
    data: Mapping[str, Any]
    metadata: Optional[Mapping[str, Any]] = None
    node_id: str


class QueueNodeRetryEvent(QueueNodeStartedEvent):
    """QueueNodeRetryEvent entity"""

    event: QueueEvent = QueueEvent.RETRY

    inputs: Optional[Mapping[str, Any]] = None
    process_data: Optional[Mapping[str, Any]] = None
    outputs: Optional[Mapping[str, Any]] = None
    execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None

    error: str
    retry_index: int  # retry index


class QueueNodeInIterationFailedEvent(AppQueueEvent):
    """
    QueueNodeInIterationFailedEvent entity
    """

    event: QueueEvent = QueueEvent.NODE_FAILED

    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""
    start_at: datetime

    inputs: Optional[Mapping[str, Any]] = None
    process_data: Optional[Mapping[str, Any]] = None
    outputs: Optional[Mapping[str, Any]] = None
    execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None

    error: str


class QueueNodeInLoopFailedEvent(AppQueueEvent):
    """
    QueueNodeInLoopFailedEvent entity
    """

    event: QueueEvent = QueueEvent.NODE_FAILED

    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""
    start_at: datetime

    inputs: Optional[Mapping[str, Any]] = None
    process_data: Optional[Mapping[str, Any]] = None
    outputs: Optional[Mapping[str, Any]] = None
    execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None

    error: str


class QueueNodeExceptionEvent(AppQueueEvent):
    """
    QueueNodeExceptionEvent entity
    """

    event: QueueEvent = QueueEvent.NODE_EXCEPTION

    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""
    start_at: datetime

    inputs: Optional[Mapping[str, Any]] = None
    process_data: Optional[Mapping[str, Any]] = None
    outputs: Optional[Mapping[str, Any]] = None
    execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None

    error: str


class QueueNodeFailedEvent(AppQueueEvent):
    """
    QueueNodeFailedEvent entity
    """

    event: QueueEvent = QueueEvent.NODE_FAILED

    node_execution_id: str
    node_id: str
    node_type: NodeType
    node_data: BaseNodeData
    parallel_id: Optional[str] = None
    """parallel id if node is in parallel"""
    parallel_start_node_id: Optional[str] = None
    """parallel start node id if node is in parallel"""
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""
    start_at: datetime

    inputs: Optional[Mapping[str, Any]] = None
    process_data: Optional[Mapping[str, Any]] = None
    outputs: Optional[Mapping[str, Any]] = None
    execution_metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None

    error: str


class QueueAgentThoughtEvent(AppQueueEvent):
    """
    QueueAgentThoughtEvent entity
    """

    event: QueueEvent = QueueEvent.AGENT_THOUGHT
    agent_thought_id: str


class QueueMessageFileEvent(AppQueueEvent):
    """
    QueueAgentThoughtEvent entity
    """

    event: QueueEvent = QueueEvent.MESSAGE_FILE
    message_file_id: str


class QueueErrorEvent(AppQueueEvent):
    """
    QueueErrorEvent entity
    """

    event: QueueEvent = QueueEvent.ERROR
    error: Any = None


class QueuePingEvent(AppQueueEvent):
    """
    QueuePingEvent entity
    """

    event: QueueEvent = QueueEvent.PING


class QueueStopEvent(AppQueueEvent):
    """
    QueueStopEvent entity
    """

    class StopBy(Enum):
        """
        Stop by enum
        """

        USER_MANUAL = "user-manual"
        ANNOTATION_REPLY = "annotation-reply"
        OUTPUT_MODERATION = "output-moderation"
        INPUT_MODERATION = "input-moderation"

    event: QueueEvent = QueueEvent.STOP
    stopped_by: StopBy

    def get_stop_reason(self) -> str:
        """
        To stop reason
        """
        reason_mapping = {
            QueueStopEvent.StopBy.USER_MANUAL: "Stopped by user.",
            QueueStopEvent.StopBy.ANNOTATION_REPLY: "Stopped by annotation reply.",
            QueueStopEvent.StopBy.OUTPUT_MODERATION: "Stopped by output moderation.",
            QueueStopEvent.StopBy.INPUT_MODERATION: "Stopped by input moderation.",
        }

        return reason_mapping.get(self.stopped_by, "Stopped by unknown reason.")


class QueueMessage(BaseModel):
    """
    QueueMessage abstract entity
    """

    task_id: str
    app_mode: str
    event: AppQueueEvent


class MessageQueueMessage(QueueMessage):
    """
    MessageQueueMessage entity
    """

    message_id: str
    conversation_id: str


class WorkflowQueueMessage(QueueMessage):
    """
    WorkflowQueueMessage entity
    """

    pass


class QueueParallelBranchRunStartedEvent(AppQueueEvent):
    """
    QueueParallelBranchRunStartedEvent entity
    """

    event: QueueEvent = QueueEvent.PARALLEL_BRANCH_RUN_STARTED

    parallel_id: str
    parallel_start_node_id: str
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""


class QueueParallelBranchRunSucceededEvent(AppQueueEvent):
    """
    QueueParallelBranchRunSucceededEvent entity
    """

    event: QueueEvent = QueueEvent.PARALLEL_BRANCH_RUN_SUCCEEDED

    parallel_id: str
    parallel_start_node_id: str
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""


class QueueParallelBranchRunFailedEvent(AppQueueEvent):
    """
    QueueParallelBranchRunFailedEvent entity
    """

    event: QueueEvent = QueueEvent.PARALLEL_BRANCH_RUN_FAILED

    parallel_id: str
    parallel_start_node_id: str
    parent_parallel_id: Optional[str] = None
    """parent parallel id if node is in parallel"""
    parent_parallel_start_node_id: Optional[str] = None
    """parent parallel start node id if node is in parallel"""
    in_iteration_id: Optional[str] = None
    """iteration id if node is in iteration"""
    in_loop_id: Optional[str] = None
    """loop id if node is in loop"""
    error: str