base.py 1.1 KB

1234567891011121314151617181920212223242526272829
  1. from collections.abc import Generator, Mapping
  2. from typing import Generic, Optional, TypeVar
  3. from pydantic import BaseModel
  4. class BaseBackwardsInvocation:
  5. @classmethod
  6. def convert_to_event_stream(cls, response: Generator[BaseModel | Mapping | str, None, None] | BaseModel | Mapping):
  7. if isinstance(response, Generator):
  8. try:
  9. for chunk in response:
  10. if isinstance(chunk, BaseModel | dict):
  11. yield BaseBackwardsInvocationResponse(data=chunk).model_dump_json().encode() + b"\n\n"
  12. elif isinstance(chunk, str):
  13. yield f"event: {chunk}\n\n".encode()
  14. except Exception as e:
  15. error_message = BaseBackwardsInvocationResponse(error=str(e)).model_dump_json()
  16. yield f"{error_message}\n\n".encode()
  17. else:
  18. yield BaseBackwardsInvocationResponse(data=response).model_dump_json().encode() + b"\n\n"
  19. T = TypeVar("T", bound=dict | Mapping | str | bool | int | BaseModel)
  20. class BaseBackwardsInvocationResponse(BaseModel, Generic[T]):
  21. data: Optional[T] = None
  22. error: str = ""