123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import io
- from typing import Any, Iterable, List, Optional
- from urllib.parse import urlencode
- from multidict import MultiDict, MultiDictProxy
- from . import hdrs, multipart, payload
- from .helpers import guess_filename
- from .payload import Payload
- __all__ = ("FormData",)
- class FormData:
- """Helper class for multipart/form-data and
- application/x-www-form-urlencoded body generation."""
- def __init__(
- self,
- fields: Iterable[Any] = (),
- quote_fields: bool = True,
- charset: Optional[str] = None,
- ) -> None:
- self._writer = multipart.MultipartWriter("form-data")
- self._fields = [] # type: List[Any]
- self._is_multipart = False
- self._is_processed = False
- self._quote_fields = quote_fields
- self._charset = charset
- if isinstance(fields, dict):
- fields = list(fields.items())
- elif not isinstance(fields, (list, tuple)):
- fields = (fields,)
- self.add_fields(*fields)
- @property
- def is_multipart(self) -> bool:
- return self._is_multipart
- def add_field(
- self,
- name: str,
- value: Any,
- *,
- content_type: Optional[str] = None,
- filename: Optional[str] = None,
- content_transfer_encoding: Optional[str] = None
- ) -> None:
- if isinstance(value, io.IOBase):
- self._is_multipart = True
- elif isinstance(value, (bytes, bytearray, memoryview)):
- if filename is None and content_transfer_encoding is None:
- filename = name
- type_options = MultiDict({"name": name}) # type: MultiDict[str]
- if filename is not None and not isinstance(filename, str):
- raise TypeError(
- "filename must be an instance of str. " "Got: %s" % filename
- )
- if filename is None and isinstance(value, io.IOBase):
- filename = guess_filename(value, name)
- if filename is not None:
- type_options["filename"] = filename
- self._is_multipart = True
- headers = {}
- if content_type is not None:
- if not isinstance(content_type, str):
- raise TypeError(
- "content_type must be an instance of str. " "Got: %s" % content_type
- )
- headers[hdrs.CONTENT_TYPE] = content_type
- self._is_multipart = True
- if content_transfer_encoding is not None:
- if not isinstance(content_transfer_encoding, str):
- raise TypeError(
- "content_transfer_encoding must be an instance"
- " of str. Got: %s" % content_transfer_encoding
- )
- headers[hdrs.CONTENT_TRANSFER_ENCODING] = content_transfer_encoding
- self._is_multipart = True
- self._fields.append((type_options, headers, value))
- def add_fields(self, *fields: Any) -> None:
- to_add = list(fields)
- while to_add:
- rec = to_add.pop(0)
- if isinstance(rec, io.IOBase):
- k = guess_filename(rec, "unknown")
- self.add_field(k, rec) # type: ignore
- elif isinstance(rec, (MultiDictProxy, MultiDict)):
- to_add.extend(rec.items())
- elif isinstance(rec, (list, tuple)) and len(rec) == 2:
- k, fp = rec
- self.add_field(k, fp) # type: ignore
- else:
- raise TypeError(
- "Only io.IOBase, multidict and (name, file) "
- "pairs allowed, use .add_field() for passing "
- "more complex parameters, got {!r}".format(rec)
- )
- def _gen_form_urlencoded(self) -> payload.BytesPayload:
- # form data (x-www-form-urlencoded)
- data = []
- for type_options, _, value in self._fields:
- data.append((type_options["name"], value))
- charset = self._charset if self._charset is not None else "utf-8"
- if charset == "utf-8":
- content_type = "application/x-www-form-urlencoded"
- else:
- content_type = "application/x-www-form-urlencoded; " "charset=%s" % charset
- return payload.BytesPayload(
- urlencode(data, doseq=True, encoding=charset).encode(),
- content_type=content_type,
- )
- def _gen_form_data(self) -> multipart.MultipartWriter:
- """Encode a list of fields using the multipart/form-data MIME format"""
- if self._is_processed:
- raise RuntimeError("Form data has been processed already")
- for dispparams, headers, value in self._fields:
- try:
- if hdrs.CONTENT_TYPE in headers:
- part = payload.get_payload(
- value,
- content_type=headers[hdrs.CONTENT_TYPE],
- headers=headers,
- encoding=self._charset,
- )
- else:
- part = payload.get_payload(
- value, headers=headers, encoding=self._charset
- )
- except Exception as exc:
- raise TypeError(
- "Can not serialize value type: %r\n "
- "headers: %r\n value: %r" % (type(value), headers, value)
- ) from exc
- if dispparams:
- part.set_content_disposition(
- "form-data", quote_fields=self._quote_fields, **dispparams
- )
- # FIXME cgi.FieldStorage doesn't likes body parts with
- # Content-Length which were sent via chunked transfer encoding
- assert part.headers is not None
- part.headers.popall(hdrs.CONTENT_LENGTH, None)
- self._writer.append_payload(part)
- self._is_processed = True
- return self._writer
- def __call__(self) -> Payload:
- if self._is_multipart:
- return self._gen_form_data()
- else:
- return self._gen_form_urlencoded()
|