Add draining feature to asyncio.Queue by nhumrich · Pull Request #415 · python/asyncio

This feature adds the ability to drain a Queue. This is useful for cleanup steps, especially in the simple websocket case.

Task was destroyed but it is pending!
task: <Task pending coro=<wait_for_it() running at test.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>
Exception ignored in: <coroutine object wait_for_it at 0x7f594abb7830>
Traceback (most recent call last):
  File "test.py", line 6, in wait_for_it
  File "/usr/lib/python3.5/asyncio/queues.py", line 170, in get
  File "/usr/lib/python3.5/asyncio/futures.py", line 227, in cancel
  File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
  File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
import asyncio

q = asyncio.Queue()

async def wait_for_it():
        while True:
            t = await q.get()
            q.task_done()


loop = asyncio.get_event_loop()
task = loop.create_task(wait_for_it())

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    task.cancel()
    pending = asyncio.Task.all_tasks()
    try:
        loop.run_until_complete(asyncio.gather(*pending))
    except asyncio.CancelledError:
        print('expected')
    loop.close()

which is very unwieldy and not obvious.

import asyncio

q = asyncio.Queue()

async def wait_for_it():
    try:
        while True:
            t = await q.get()
            q.task_done()
    except asyncio.QueueClosed:
        print('closed')


loop = asyncio.get_event_loop()
asyncio.ensure_future(wait_for_it())
asyncio.ensure_future(wait_for_it())

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.run_until_complete(q.drain())
    loop.close()

which is much cleaner and simple.