18.5.5. ストリーム (コルーチンベースの API) — Python 3.6.5 ドキュメント
ソースコード: Lib/asyncio/streams.py
18.5.5.1. ストリーム関数¶
注釈
このモジュール内のトップレベル関数は、便利なラッパーとしてのみ意図されています。特別なことは何もありませんし、それらが思い通りに動作しない場合は、ご自由にコードをコピーしてください。
-
coroutine
asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, **kwds)¶ create_connection()のラッパーで (reader, writer) ペアを返します。返されたリーダーは
StreamReaderのインスタンスで、ライターはStreamWriterのインスタンスです。引数は protocol_factory を除き、
AbstractEventLoop.create_connection()の通常の引数です; 最も一般的なものは位置引数のホストとポートで、その後にオプションのキーワード引数が続きます。追加のキーワード引数 loop には使用するイベントループインスタンスを、limit には
StreamReaderに渡すバッファーリミットを設定します。この関数は コルーチン です。
-
coroutine
asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, **kwds)¶ 各クライアントが接続するコールバックでソケットサーバーを開始します。戻り値は
create_server()と同じです。client_connected_cb 引数は client_reader と client_writer という 2 個の引数で呼び出されます。 client_reader は
StreamReaderオブジェクトで、client_writer はStreamWriterオブジェクトです。 client_connected_cb 引数には単純なコールバック関数か コルーチン関数 のどちらかを指定できます; コルーチン関数の場合、自動的にTaskに変換されます。残りの引数は protocol_factory を除きすべて
create_server()の通常の引数です; 最も一般的なのは位置引数 host と port で、さまざまなオプションのキーワード引数が続きます。追加のキーワード引数 loop には使用するイベントループインスタンスを、limit には
StreamReaderに渡すバッファーリミットを設定します。この関数は コルーチン です。
-
coroutine
asyncio.open_unix_connection(path=None, *, loop=None, limit=None, **kwds)¶ create_unix_connection()のラッパーで (reader, writer) ペアを返します。戻り値やその他詳細については
open_connection()を参照してください。この関数は コルーチン です。
利用できる環境: UNIX。
-
coroutine
asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, **kwds)¶ 接続された各クライアントごとのコールバックとともに UNIX ドメインソケットサーバーを開始します。
戻り値やその他詳細については
start_server()を参照してください。この関数は コルーチン です。
利用できる環境: UNIX。
18.5.5.2. StreamReader¶
-
class
asyncio.StreamReader(limit=None, loop=None)¶ このクラスは スレッド安全ではありません。
-
exception()¶ 例外を取得します。
-
feed_eof()¶ EOF の肯定応答を行います。
-
feed_data(data)¶ バイト列 data を内部バッファーに取り込みます。データを待っているあらゆる処理が再開されます。
-
set_exception(exc)¶ 例外を設定します。
-
set_transport(transport)¶ トランスポートを設定します。
-
coroutine
read(n=-1)¶ n バイト読み込みます。n が指定されないか
-1が指定されていた場合 EOF になるまで読み込み、全データを返します。EOF に達しており内部バッファーが空であれば、空の
bytesオブジェクトを返します。このメソッドは コルーチン です。
-
coroutine
readline()¶ 1 行読み込みます。 "行" とは、
\nで終了するバイト列のシーケンスです。EOF を受信し、かつ
\nが見つからない場合、このメソッドは読み込んだ分の不完全なバイト列を返します。EOF に達しており内部バッファーが空であれば、空の
bytesオブジェクトを返します。このメソッドは コルーチン です。
-
coroutine
readexactly(n)¶ 厳密に n バイト読み込みます。n バイト読み込む前にストリームの終端に達したとき、
IncompleteReadErrorを送出します。例外のIncompleteReadError.partial属性に、読み込んだ分の不完全なバイト列が格納されます。このメソッドは コルーチン です。
-
coroutine
readuntil(separator=b'\n')¶ separatorが見つかるまでストリームからデータを読み込みます。成功時には、データと区切り文字は内部バッファから削除されます (消費されます)。返されるデータの最後には区切り文字が含まれます。
設定したストリームの制限は、結果を検証するために使用されます。制限により、返すことのできるデータの最大長さ (区切り文字を含まず) が設定されます。
EOF が発生し、完全な区切り文字が見つからない場合には、
IncompleteReadError例外が創出されるとともに、内部バッファはリセットされます。IncompleteReadError.partial属性に、区切り文字が部分的に含まれる場合があります。制限超過のためにデータを読み取ることができない場合、
LimitOverrunError例外が送出され、データは内部バッファ内に残ります。そのため、データを再度読み取ることができます。バージョン 3.5.2 で追加.
-
at_eof()¶ バッファーが空で
feed_eof()が呼ばれていた場合Trueを返します。
-
18.5.5.3. StreamWriter¶
-
class
asyncio.StreamWriter(transport, protocol, reader, loop)¶ トランスポートをラップします。
これは
write()、writelines()、can_write_eof()、write_eof()、get_extra_info()およびclose()メソッドを提供します。フロー制御を待機できる任意のFutureを返すdrain()メソッドを追加します。また、Transportを直接参照する transport 属性も追加します。このクラスは スレッド安全ではありません。
-
transport¶ トランスポートです。
-
can_write_eof()¶ トランスポートが
write_eof()をサポートしている場合はTrueを、していない場合はFalseを返します。WriteTransport.can_write_eof()を参照してください。
-
close()¶ トランスポートを閉じます:
BaseTransport.close()を参照してください。
-
coroutine
drain()¶ 下層のトランスポートの書き込みバッファーがフラッシュされる機会を与えます。
意図されている用途は書き込みです:
w.write(data) yield from w.drain()
トランスポートバッファのサイズが最高水位点に達した場合 (プロトコルが一時停止された場合)、バッファサイズが最低水位点まで引き出されて、プロトコルが再開されるまで、ブロックします。待機するものがなくなると、直ちに yield-from を続行します。
drain()から yield することにより、ループは書き込み操作をスケジュールし、バッファのフラッシュを行うことができます。このことは、トランスポートに大量のデータが書き込まれる可能性がある場合に非常に有用で、コルーチンはwrite()呼び出しの間に yield-from を行いません。このメソッドは コルーチン です。
オプションのトランスポート情報を返します:
BaseTransport.get_extra_info()を参照してください。
-
write(data)¶ トランスポートにバイト列 data を書き込みます:
WriteTransport.write()を参照してください。
-
writelines(data)¶ バイト列のデータのリスト (またはリテラブル) をトランスポートに書き込みます:
WriteTransport.writelines()を参照してください。
-
write_eof()¶ バッファーされたデータをフラッシュした後送信側のトランスポートをクローズします:
WriteTransport.write_eof()を参照してください。
-
18.5.5.4. StreamReaderProtocol¶
-
class
asyncio.StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)¶ ProtocolとStreamReaderを適合させる些末なヘルパークラスです。Protocolのサブクラスです。stream_reader は
StreamReaderのインスタンスです。client_connected_cb は接続されたときに (stream_reader, stream_writer) を引数として呼び出されるオプションの関数です。loop は使用するイベントループのインスタンスです。(これは
StreamReader自身をProtocolのサブクラスとする代わりのヘルパークラスです。StreamReaderはその他の潜在的な用途を持つため、そしてStreamReaderの利用者が誤って不適切なプロトコルのメソッドを呼び出すことを回避するためこのように実装されています)
18.5.5.5. IncompleteReadError¶
18.5.5.6. LimitOverrunError¶
18.5.5.7. ストリームの例¶
18.5.5.7.1. ストリームを使った TCP Echo クライアント¶
asyncio.open_connection() 関数を使った TCP Echo クライアントです:
import asyncio @asyncio.coroutine def tcp_echo_client(message, loop): reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888, loop=loop) print('Send: %r' % message) writer.write(message.encode()) data = yield from reader.read(100) print('Received: %r' % data.decode()) print('Close the socket') writer.close() message = 'Hello World!' loop = asyncio.get_event_loop() loop.run_until_complete(tcp_echo_client(message, loop)) loop.close()
18.5.5.7.2. ストリームを使った TCP Echo サーバー¶
asyncio.start_server() 関数を使った TCP Echo サーバーです:
import asyncio @asyncio.coroutine def handle_echo(reader, writer): data = yield from reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print("Received %r from %r" % (message, addr)) print("Send: %r" % message) writer.write(data) yield from writer.drain() print("Close the client socket") writer.close() loop = asyncio.get_event_loop() coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop) server = loop.run_until_complete(coro) # Serve requests until Ctrl+C is pressed print('Serving on {}'.format(server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: pass # Close the server server.close() loop.run_until_complete(server.wait_closed()) loop.close()
18.5.5.7.4. ストリームを使ってデータを待つオープンソケットの登録¶
open_connection() 関数を使ってソケットがデータを受信するまで待つコルーチンです:
import asyncio try: from socket import socketpair except ImportError: from asyncio.windows_utils import socketpair @asyncio.coroutine def wait_for_data(loop): # Create a pair of connected sockets rsock, wsock = socketpair() # Register the open socket to wait for data reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop) # Simulate the reception of data from the network loop.call_soon(wsock.send, 'abc'.encode()) # Wait for data data = yield from reader.read(100) # Got data, we are done: close the socket print("Received:", data.decode()) writer.close() # Close the second socket wsock.close() loop = asyncio.get_event_loop() loop.run_until_complete(wait_for_data(loop)) loop.close()