payload_streamer.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. """ Payload implemenation for coroutines as data provider.
  2. As a simple case, you can upload data from file::
  3. @aiohttp.streamer
  4. async def file_sender(writer, file_name=None):
  5. with open(file_name, 'rb') as f:
  6. chunk = f.read(2**16)
  7. while chunk:
  8. await writer.write(chunk)
  9. chunk = f.read(2**16)
  10. Then you can use `file_sender` like this:
  11. async with session.post('http://httpbin.org/post',
  12. data=file_sender(file_name='huge_file')) as resp:
  13. print(await resp.text())
  14. ..note:: Coroutine must accept `writer` as first argument
  15. """
  16. import types
  17. import warnings
  18. from typing import Any, Awaitable, Callable, Dict, Tuple
  19. from .abc import AbstractStreamWriter
  20. from .payload import Payload, payload_type
  21. __all__ = ("streamer",)
  22. class _stream_wrapper:
  23. def __init__(
  24. self,
  25. coro: Callable[..., Awaitable[None]],
  26. args: Tuple[Any, ...],
  27. kwargs: Dict[str, Any],
  28. ) -> None:
  29. self.coro = types.coroutine(coro)
  30. self.args = args
  31. self.kwargs = kwargs
  32. async def __call__(self, writer: AbstractStreamWriter) -> None:
  33. await self.coro(writer, *self.args, **self.kwargs) # type: ignore
  34. class streamer:
  35. def __init__(self, coro: Callable[..., Awaitable[None]]) -> None:
  36. warnings.warn(
  37. "@streamer is deprecated, use async generators instead",
  38. DeprecationWarning,
  39. stacklevel=2,
  40. )
  41. self.coro = coro
  42. def __call__(self, *args: Any, **kwargs: Any) -> _stream_wrapper:
  43. return _stream_wrapper(self.coro, args, kwargs)
  44. @payload_type(_stream_wrapper)
  45. class StreamWrapperPayload(Payload):
  46. async def write(self, writer: AbstractStreamWriter) -> None:
  47. await self._value(writer)
  48. @payload_type(streamer)
  49. class StreamPayload(StreamWrapperPayload):
  50. def __init__(self, value: Any, *args: Any, **kwargs: Any) -> None:
  51. super().__init__(value(), *args, **kwargs)
  52. async def write(self, writer: AbstractStreamWriter) -> None:
  53. await self._value(writer)