factory.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from collections.abc import Mapping
  2. from typing import Any
  3. from core.file.file_obj import FileVar
  4. from .segments import Segment, StringSegment
  5. from .types import SegmentType
  6. from .variables import (
  7. ArrayVariable,
  8. FileVariable,
  9. FloatVariable,
  10. IntegerVariable,
  11. NoneVariable,
  12. ObjectVariable,
  13. SecretVariable,
  14. StringVariable,
  15. Variable,
  16. )
  17. def build_variable_from_mapping(m: Mapping[str, Any], /) -> Variable:
  18. if (value_type := m.get('value_type')) is None:
  19. raise ValueError('missing value type')
  20. if not m.get('name'):
  21. raise ValueError('missing name')
  22. if (value := m.get('value')) is None:
  23. raise ValueError('missing value')
  24. match value_type:
  25. case SegmentType.STRING:
  26. return StringVariable.model_validate(m)
  27. case SegmentType.NUMBER if isinstance(value, int):
  28. return IntegerVariable.model_validate(m)
  29. case SegmentType.NUMBER if isinstance(value, float):
  30. return FloatVariable.model_validate(m)
  31. case SegmentType.SECRET:
  32. return SecretVariable.model_validate(m)
  33. case SegmentType.NUMBER if not isinstance(value, float | int):
  34. raise ValueError(f'invalid number value {value}')
  35. raise ValueError(f'not supported value type {value_type}')
  36. def build_anonymous_variable(value: Any, /) -> Variable:
  37. if value is None:
  38. return NoneVariable(name='anonymous')
  39. if isinstance(value, str):
  40. return StringVariable(name='anonymous', value=value)
  41. if isinstance(value, int):
  42. return IntegerVariable(name='anonymous', value=value)
  43. if isinstance(value, float):
  44. return FloatVariable(name='anonymous', value=value)
  45. if isinstance(value, dict):
  46. # TODO: Limit the depth of the object
  47. obj = {k: build_anonymous_variable(v) for k, v in value.items()}
  48. return ObjectVariable(name='anonymous', value=obj)
  49. if isinstance(value, list):
  50. # TODO: Limit the depth of the array
  51. elements = [build_anonymous_variable(v) for v in value]
  52. return ArrayVariable(name='anonymous', value=elements)
  53. if isinstance(value, FileVar):
  54. return FileVariable(name='anonymous', value=value)
  55. raise ValueError(f'not supported value {value}')
  56. def build_segment(value: Any, /) -> Segment:
  57. if isinstance(value, str):
  58. return StringSegment(value=value)
  59. raise ValueError(f'not supported value {value}')