saved_message_service.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from typing import Optional, Union
  2. from extensions.ext_database import db
  3. from libs.infinite_scroll_pagination import InfiniteScrollPagination
  4. from models.account import Account
  5. from models.model import App, EndUser
  6. from models.web import SavedMessage
  7. from services.message_service import MessageService
  8. class SavedMessageService:
  9. @classmethod
  10. def pagination_by_last_id(
  11. cls, app_model: App, user: Optional[Union[Account, EndUser]], last_id: Optional[str], limit: int
  12. ) -> InfiniteScrollPagination:
  13. if not user:
  14. raise ValueError("User is required")
  15. saved_messages = (
  16. db.session.query(SavedMessage)
  17. .filter(
  18. SavedMessage.app_id == app_model.id,
  19. SavedMessage.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  20. SavedMessage.created_by == user.id,
  21. )
  22. .order_by(SavedMessage.created_at.desc())
  23. .all()
  24. )
  25. message_ids = [sm.message_id for sm in saved_messages]
  26. return MessageService.pagination_by_last_id(
  27. app_model=app_model, user=user, last_id=last_id, limit=limit, include_ids=message_ids
  28. )
  29. @classmethod
  30. def save(cls, app_model: App, user: Optional[Union[Account, EndUser]], message_id: str):
  31. if not user:
  32. return
  33. saved_message = (
  34. db.session.query(SavedMessage)
  35. .filter(
  36. SavedMessage.app_id == app_model.id,
  37. SavedMessage.message_id == message_id,
  38. SavedMessage.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  39. SavedMessage.created_by == user.id,
  40. )
  41. .first()
  42. )
  43. if saved_message:
  44. return
  45. message = MessageService.get_message(app_model=app_model, user=user, message_id=message_id)
  46. saved_message = SavedMessage(
  47. app_id=app_model.id,
  48. message_id=message.id,
  49. created_by_role="account" if isinstance(user, Account) else "end_user",
  50. created_by=user.id,
  51. )
  52. db.session.add(saved_message)
  53. db.session.commit()
  54. @classmethod
  55. def delete(cls, app_model: App, user: Optional[Union[Account, EndUser]], message_id: str):
  56. if not user:
  57. return
  58. saved_message = (
  59. db.session.query(SavedMessage)
  60. .filter(
  61. SavedMessage.app_id == app_model.id,
  62. SavedMessage.message_id == message_id,
  63. SavedMessage.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  64. SavedMessage.created_by == user.id,
  65. )
  66. .first()
  67. )
  68. if not saved_message:
  69. return
  70. db.session.delete(saved_message)
  71. db.session.commit()