|
from __future__ import annotations |
|
|
|
import asyncio |
|
from asyncio import AbstractEventLoop, runners |
|
from typing import Union, Callable, AsyncGenerator, Generator |
|
|
|
from ..errors import NestAsyncioError |
|
|
|
try: |
|
import nest_asyncio |
|
has_nest_asyncio = True |
|
except ImportError: |
|
has_nest_asyncio = False |
|
try: |
|
import uvloop |
|
has_uvloop = True |
|
except ImportError: |
|
has_uvloop = False |
|
|
|
def get_running_loop(check_nested: bool) -> Union[AbstractEventLoop, None]: |
|
try: |
|
loop = asyncio.get_running_loop() |
|
|
|
if has_uvloop: |
|
if isinstance(loop, uvloop.Loop): |
|
return loop |
|
if not hasattr(loop.__class__, "_nest_patched"): |
|
if has_nest_asyncio: |
|
nest_asyncio.apply(loop) |
|
elif check_nested: |
|
raise NestAsyncioError('Install "nest_asyncio" package | pip install -U nest_asyncio') |
|
return loop |
|
except RuntimeError: |
|
pass |
|
|
|
|
|
async def await_callback(callback: Callable): |
|
return await callback() |
|
|
|
async def async_generator_to_list(generator: AsyncGenerator) -> list: |
|
return [item async for item in generator] |
|
|
|
def to_sync_generator(generator: AsyncGenerator) -> Generator: |
|
loop = get_running_loop(check_nested=False) |
|
new_loop = False |
|
if loop is None: |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
new_loop = True |
|
gen = generator.__aiter__() |
|
try: |
|
while True: |
|
yield loop.run_until_complete(await_callback(gen.__anext__)) |
|
except StopAsyncIteration: |
|
pass |
|
finally: |
|
if new_loop: |
|
try: |
|
runners._cancel_all_tasks(loop) |
|
loop.run_until_complete(loop.shutdown_asyncgens()) |
|
if hasattr(loop, "shutdown_default_executor"): |
|
loop.run_until_complete(loop.shutdown_default_executor()) |
|
finally: |
|
asyncio.set_event_loop(None) |
|
loop.close() |