1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- """ Payload implemenation for coroutines as data provider.
- As a simple case, you can upload data from file::
- @aiohttp.streamer
- async def file_sender(writer, file_name=None):
- with open(file_name, 'rb') as f:
- chunk = f.read(2**16)
- while chunk:
- await writer.write(chunk)
- chunk = f.read(2**16)
- Then you can use `file_sender` like this:
- async with session.post('http://httpbin.org/post',
- data=file_sender(file_name='huge_file')) as resp:
- print(await resp.text())
- ..note:: Coroutine must accept `writer` as first argument
- """
- import types
- import warnings
- from typing import Any, Awaitable, Callable, Dict, Tuple
- from .abc import AbstractStreamWriter
- from .payload import Payload, payload_type
- __all__ = ("streamer",)
- class _stream_wrapper:
- def __init__(
- self,
- coro: Callable[..., Awaitable[None]],
- args: Tuple[Any, ...],
- kwargs: Dict[str, Any],
- ) -> None:
- self.coro = types.coroutine(coro)
- self.args = args
- self.kwargs = kwargs
- async def __call__(self, writer: AbstractStreamWriter) -> None:
- await self.coro(writer, *self.args, **self.kwargs) # type: ignore
- class streamer:
- def __init__(self, coro: Callable[..., Awaitable[None]]) -> None:
- warnings.warn(
- "@streamer is deprecated, use async generators instead",
- DeprecationWarning,
- stacklevel=2,
- )
- self.coro = coro
- def __call__(self, *args: Any, **kwargs: Any) -> _stream_wrapper:
- return _stream_wrapper(self.coro, args, kwargs)
- @payload_type(_stream_wrapper)
- class StreamWrapperPayload(Payload):
- async def write(self, writer: AbstractStreamWriter) -> None:
- await self._value(writer)
- @payload_type(streamer)
- class StreamPayload(StreamWrapperPayload):
- def __init__(self, value: Any, *args: Any, **kwargs: Any) -> None:
- super().__init__(value(), *args, **kwargs)
- async def write(self, writer: AbstractStreamWriter) -> None:
- await self._value(writer)
|