formdata.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import io
  2. from typing import Any, Iterable, List, Optional
  3. from urllib.parse import urlencode
  4. from multidict import MultiDict, MultiDictProxy
  5. from . import hdrs, multipart, payload
  6. from .helpers import guess_filename
  7. from .payload import Payload
  8. __all__ = ("FormData",)
  9. class FormData:
  10. """Helper class for multipart/form-data and
  11. application/x-www-form-urlencoded body generation."""
  12. def __init__(
  13. self,
  14. fields: Iterable[Any] = (),
  15. quote_fields: bool = True,
  16. charset: Optional[str] = None,
  17. ) -> None:
  18. self._writer = multipart.MultipartWriter("form-data")
  19. self._fields = [] # type: List[Any]
  20. self._is_multipart = False
  21. self._is_processed = False
  22. self._quote_fields = quote_fields
  23. self._charset = charset
  24. if isinstance(fields, dict):
  25. fields = list(fields.items())
  26. elif not isinstance(fields, (list, tuple)):
  27. fields = (fields,)
  28. self.add_fields(*fields)
  29. @property
  30. def is_multipart(self) -> bool:
  31. return self._is_multipart
  32. def add_field(
  33. self,
  34. name: str,
  35. value: Any,
  36. *,
  37. content_type: Optional[str] = None,
  38. filename: Optional[str] = None,
  39. content_transfer_encoding: Optional[str] = None
  40. ) -> None:
  41. if isinstance(value, io.IOBase):
  42. self._is_multipart = True
  43. elif isinstance(value, (bytes, bytearray, memoryview)):
  44. if filename is None and content_transfer_encoding is None:
  45. filename = name
  46. type_options = MultiDict({"name": name}) # type: MultiDict[str]
  47. if filename is not None and not isinstance(filename, str):
  48. raise TypeError(
  49. "filename must be an instance of str. " "Got: %s" % filename
  50. )
  51. if filename is None and isinstance(value, io.IOBase):
  52. filename = guess_filename(value, name)
  53. if filename is not None:
  54. type_options["filename"] = filename
  55. self._is_multipart = True
  56. headers = {}
  57. if content_type is not None:
  58. if not isinstance(content_type, str):
  59. raise TypeError(
  60. "content_type must be an instance of str. " "Got: %s" % content_type
  61. )
  62. headers[hdrs.CONTENT_TYPE] = content_type
  63. self._is_multipart = True
  64. if content_transfer_encoding is not None:
  65. if not isinstance(content_transfer_encoding, str):
  66. raise TypeError(
  67. "content_transfer_encoding must be an instance"
  68. " of str. Got: %s" % content_transfer_encoding
  69. )
  70. headers[hdrs.CONTENT_TRANSFER_ENCODING] = content_transfer_encoding
  71. self._is_multipart = True
  72. self._fields.append((type_options, headers, value))
  73. def add_fields(self, *fields: Any) -> None:
  74. to_add = list(fields)
  75. while to_add:
  76. rec = to_add.pop(0)
  77. if isinstance(rec, io.IOBase):
  78. k = guess_filename(rec, "unknown")
  79. self.add_field(k, rec) # type: ignore
  80. elif isinstance(rec, (MultiDictProxy, MultiDict)):
  81. to_add.extend(rec.items())
  82. elif isinstance(rec, (list, tuple)) and len(rec) == 2:
  83. k, fp = rec
  84. self.add_field(k, fp) # type: ignore
  85. else:
  86. raise TypeError(
  87. "Only io.IOBase, multidict and (name, file) "
  88. "pairs allowed, use .add_field() for passing "
  89. "more complex parameters, got {!r}".format(rec)
  90. )
  91. def _gen_form_urlencoded(self) -> payload.BytesPayload:
  92. # form data (x-www-form-urlencoded)
  93. data = []
  94. for type_options, _, value in self._fields:
  95. data.append((type_options["name"], value))
  96. charset = self._charset if self._charset is not None else "utf-8"
  97. if charset == "utf-8":
  98. content_type = "application/x-www-form-urlencoded"
  99. else:
  100. content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset
  101. return payload.BytesPayload(
  102. urlencode(data, doseq=True, encoding=charset).encode(),
  103. content_type=content_type,
  104. )
  105. def _gen_form_data(self) -> multipart.MultipartWriter:
  106. """Encode a list of fields using the multipart/form-data MIME format"""
  107. if self._is_processed:
  108. raise RuntimeError("Form data has been processed already")
  109. for dispparams, headers, value in self._fields:
  110. try:
  111. if hdrs.CONTENT_TYPE in headers:
  112. part = payload.get_payload(
  113. value,
  114. content_type=headers[hdrs.CONTENT_TYPE],
  115. headers=headers,
  116. encoding=self._charset,
  117. )
  118. else:
  119. part = payload.get_payload(
  120. value, headers=headers, encoding=self._charset
  121. )
  122. except Exception as exc:
  123. raise TypeError(
  124. "Can not serialize value type: %r\n "
  125. "headers: %r\n value: %r" % (type(value), headers, value)
  126. ) from exc
  127. if dispparams:
  128. part.set_content_disposition(
  129. "form-data", quote_fields=self._quote_fields, **dispparams
  130. )
  131. # FIXME cgi.FieldStorage doesn't likes body parts with
  132. # Content-Length which were sent via chunked transfer encoding
  133. assert part.headers is not None
  134. part.headers.popall(hdrs.CONTENT_LENGTH, None)
  135. self._writer.append_payload(part)
  136. self._is_processed = True
  137. return self._writer
  138. def __call__(self) -> Payload:
  139. if self._is_multipart:
  140. return self._gen_form_data()
  141. else:
  142. return self._gen_form_urlencoded()