Add draining feature to asyncio.Queue by nhumrich · Pull Request #415 · python/asyncio
This feature adds the ability to drain a Queue. This is useful for cleanup steps, especially in the simple websocket case.
Task was destroyed but it is pending!
task: <Task pending coro=<wait_for_it() running at test.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>
Exception ignored in: <coroutine object wait_for_it at 0x7f594abb7830>
Traceback (most recent call last):
File "test.py", line 6, in wait_for_it
File "/usr/lib/python3.5/asyncio/queues.py", line 170, in get
File "/usr/lib/python3.5/asyncio/futures.py", line 227, in cancel
File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
import asyncio q = asyncio.Queue() async def wait_for_it(): while True: t = await q.get() q.task_done() loop = asyncio.get_event_loop() task = loop.create_task(wait_for_it()) try: loop.run_forever() except KeyboardInterrupt: pass finally: task.cancel() pending = asyncio.Task.all_tasks() try: loop.run_until_complete(asyncio.gather(*pending)) except asyncio.CancelledError: print('expected') loop.close()
which is very unwieldy and not obvious.
import asyncio q = asyncio.Queue() async def wait_for_it(): try: while True: t = await q.get() q.task_done() except asyncio.QueueClosed: print('closed') loop = asyncio.get_event_loop() asyncio.ensure_future(wait_for_it()) asyncio.ensure_future(wait_for_it()) try: loop.run_forever() except KeyboardInterrupt: pass finally: loop.run_until_complete(q.drain()) loop.close()
which is much cleaner and simple.