uasyncio
since 2021-08-24
http://docs.micropython.org/en/latest/library/uasyncio.html
https://docs.python.org/ja/3.9/library/asyncio.html のサブセットという話
async def と await
>>> import uasyncio >>> async def main(): ... print('hello') ... await uasyncio.sleep(1) ... print('world') ... >>> uasyncio.run(main()) hello world
下記を移植して動作確認していく。
https://docs.python.org/ja/3.9/library/asyncio-task.html
import uasyncio as asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print("started at {}".format(time.ticks_ms())) await say_after(1, 'hello') await say_after(2, 'world') print("finished at {}".format(time.ticks_ms())) asyncio.run(main())
>>> asyncio.run(main()) started at 1830349 hello world finished at 1833354 >>>
下記は1秒ごとに print が実行される
import uasyncio, time async def nested(): time.sleep(1) return 42 async def main(): print(await nested()) print(await nested()) print(await nested()) print(await nested()) uasyncio.run(main())
そもそも async def とは
>>> async def hello(): ... print("hello") ... >>> type(hello) <class 'generator'> >>> g = hello() >>> g.__next__() hello Traceback (most recent call last): File "<stdin>", line 1, in <module> StopIteration: >>>
await は function の中でしか使えない
>>> await hello() Traceback (most recent call last): File "<stdin>", line 1, in <module> SyntaxError: 'await' outside function
下記のようにすると(意味があるかどうかはともかく)やっぱり generator を作る function になる
>>> def welcome(): ... await hello() ... >>> welcome() <generator object 'welcome' at 2000c4d0> >>> g = welcome() >>> g.__next__() hello Traceback (most recent call last): File "<stdin>", line 1, in <module> StopIteration:
これらを (u)asyncio.run に渡すと実行してくれる
>>> import uasyncio as asyncio >>> asyncio.run(hello()) hello >>> asyncio.run(welcome()) hello >>>
下記の記事を参考にジェネレーターを作って run してみる
https://qiita.com/tomotaka_ito/items/35f3eb108f587022fa09
>>> def my_generator(): ... yield 1 ... yield 2 ... yield 3 ... >>> g = my_generator() >>> g.__next__() 1 >>> g.__next__() 2 >>> g.__next__() 3 >>> g.__next__() Traceback (most recent call last): File "<stdin>", line 1, in <module> StopIteration: >>> asyncio.run(my_generator()) >>>
調子に乗って下記をやってみたが、最初の yield しか実行しない
>>> def my_generator(): ... print(1) ... time.sleep(1) ... yield 1 ... print(2) ... time.sleep(1) ... yield 2 ... print(3) ... time.sleep(1) ... yield 3 ... >>> asyncio.run(my_generator()) 1 >>>
generator であれば create_task できるのか?
>>> t = asyncio.create_task(my_generator())
task は作れたが、task は run できない
>>> asyncio.run(t) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "uasyncio/core.py", line 1, in run File "uasyncio/core.py", line 1, in create_task TypeError: coroutine expected
task は await できるので function の中で await してみる
>>> def run_task(): ... await t ...
await が入っていると generator 生成関数になる。
>>> run_task() <generator object 'run_task' at 2000d210>
これは run できる
>>> asyncio.run(run_task()) 1 >>>
そもそもコルーチンかどうかはどうやって判定するのか?
CPython なら inspect.iscoroutinefunction() や asyncio.iscoroutinefunction() で判定するという話
https://stackoverflow.com/questions/36076619/test-if-function-or-method-is-normal-or-asynchronous
https://forum.micropython.org/viewtopic.php?f=15&t=9148
https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/__init__.py
上記を参考にして iscoroutine 関数を自作する
>>> async def _g(): ... pass ... >>> type_coro = type(_g()) >>> type_coro <class 'generator'> >>> def iscoroutine(func): ... return isinstance(func, type_coro) ... >>>
なるほど asyncio.run できるものは True になる
>>> iscoroutine(hello()) True >>> iscoroutine(welcome()) True >>> iscoroutine(my_generator()) True >>> iscoroutine(run_task()) True >>> iscoroutine(nested()) True >>> iscoroutine(run_task) False >>> iscoroutine(t) False >>> iscoroutine(my_generator) False
Generator-based Coroutines というのがあって、3.10 で廃止予定
https://docs.python.org/ja/3.9/library/asyncio-task.html#generator-based-coroutines
ところで下記はできて当たり前
>>> for i in my_generator(): ... print(i) ... 1 1 2 2 3 3
iscoroutine が実質的に generator かどうかの判定なので、 かなり乱暴だが asyncio.run できるものは for in でも実行できるのでは?
>>> for i in hello(): ... print(i) ... hello >>> for i in nested(): ... print(i) ... >>> for i in main(): ... print(i) ... 42 42 42 42 >>> for i in main(): ... assert i is None ... 42 42 42 42 >>>
だが print(i) は何も出さない。hello の中の print は実行されている。 ジェネレーターとしては None を返すだけである。
Task をもうちょっと追及する
https://docs.python.org/ja/3.9/library/asyncio-task.html#task-object
async def cancel_me(): print('cancel_me(): before sleep') try: await asyncio.sleep(3600) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): task = asyncio.create_task(cancel_me()) await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main())
このままペーストモードで実行したら動いた
>>> paste mode; Ctrl-C to cancel, Ctrl-D to finish === async def cancel_me(): === print('cancel_me(): before sleep') === try: === await asyncio.sleep(3600) === except asyncio.CancelledError: === print('cancel_me(): cancel sleep') === raise === finally: === print('cancel_me(): after sleep') === async def main(): === task = asyncio.create_task(cancel_me()) === await asyncio.sleep(1) === task.cancel() === try: === await task === except asyncio.CancelledError: === print("main(): cancel_me is cancelled now") === asyncio.run(main()) === === cancel_me(): before sleep cancel_me(): cancel sleep cancel_me(): after sleep main(): cancel_me is cancelled now
ちなみに asyncio.run で cancel_me は直接実行できる(たぶん1時間かえってこない)
>>> asyncio.run(cancel_me()) cancel_me(): before sleep
ふと asyncio.sleep は何者だろうかと思って試した
>>> asyncio.run(asyncio.sleep(1)) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "uasyncio/core.py", line 1, in run File "uasyncio/core.py", line 1, in create_task TypeError: coroutine expected >>> >>> iscoroutine(asyncio.sleep(1)) False >>> asyncio.sleep(1) <SingletonGenerator object at 20008f20>
こんなこともやってみた。結論を先に言うと sleep1 は1秒待たないで終了する。SingletonGenerator を作っただけで実行していない様子。
>>> async def sleep1(): ... asyncio.sleep(1) ... >>> iscoroutine(sleep1()) True >>> asyncio.run(sleep1()) # すぐ終わる >>>
こちらは期待通りに1秒待つ。CPython では正しくない書き方。
>>> def sleep2(): ... await asyncio.sleep(1) ... >>> iscoroutine(sleep2()) True >>> asyncio.run(sleep2()) # 1秒待つ >>>
下記も sleep2 と同様に動作する。正しい書き方。
>>> async def sleep3(): ... await asyncio.sleep(1) ... >>> iscoroutine(sleep3()) True >>> asyncio.run(sleep3()) # 1秒待つ >>>
CPython では sleep2 は文法エラーになる
>>> import asyncio >>> def sleep2(): ... await asyncio.sleep(1) ... File "<stdin>", line 2 SyntaxError: 'await' outside async function
こちらは CPython で動く
>>> async def sleep3(): ... await asyncio.sleep(1) ... >>> asyncio.run(sleep3()) >>>
https://docs.python.org/ja/3.9/reference/compound_stmts.html#async-def
async with と async for
async with
async for
gather
ここまで見てきて、CPython と内部実装はかなり異なることがわかってきた。。
Python実践入門 10.3節 の asyncio の説明を読みつつ、同じことが uasyncio でできるのか見ていく。
import asyncio async def my_sleep(n): asyncio.sleep(n) return n async def coro(): return await asyncio.gather(my_sleep(1), my_sleep(2)) asyncio.run(coro())
実は間違っているので、CPython 3.9 だとこうなる
<stdin>:2: RuntimeWarning: coroutine 'sleep' was never awaited RuntimeWarning: Enable tracemalloc to get the object allocation traceback [1, 2]
この never awaited 警告は MicroPython 1.16 だと出ない。
本来書きたかったのは下記。
async def my_sleep(n): await asyncio.sleep(n) return n
uasyncio にないもの
- get_running_loop がない (get_event_loopがある)
- run_in_executor がない
aiohttp
ESP32 で WiFi 接続済みの REPL から
>>> import upip >>> upip.install("micropython-uaiohttpclient") Installing to: /lib/ Warning: micropython.org SSL certificate is not validated Installing micropython-uaiohttpclient 0.5.1 from https://micropython.org/pi/uaiohttpclient/uaiohttpclient-0.5.1.tar.gz >>> import uaiohttpclient as aiohttp
やってみると ClientSession がない。
だからこういう example なのか。。
https://github.com/micropython/micropython-lib/blob/master/micropython/uaiohttpclient/example.py
CPython で確認したら、下記はどちらでもよさそう
loop = asyncio.get_event_loop() loop.run_until_complete(main())
asyncio.run(main())
ESPr One (ESP8266)
since 2021-09-06
MicroPython 1.17
オンボードの青色LED (IO14) を点滅させる。
import uasyncio as asyncio from machine import Pin led = Pin(14, Pin.OUT) led.off() async def toggle_led(delay): await asyncio.sleep(delay) led.on() await asyncio.sleep(delay) led.off() async def main(): for _ in range(10): await toggle_led(1) asyncio.run(main())
NeoPixel 8 LED でやってみた。なるほど。
# NeoPixel ESPr One # DIN -- IO4 # VCC -- 5V # GND -- GND import uasyncio as asyncio from machine import Pin from neopixel import NeoPixel np = NeoPixel(Pin(4, Pin.OUT), 8) def reset_np(): for i in range(8): np[i] = (0, 0, 0) np.write() def set_np(pos, r, g, b): np[pos] = (r, g, b) np.write() async def toggle_led(pos, delay): await asyncio.sleep(delay) set_np(pos, 255, 255, 255) await asyncio.sleep(delay) set_np(pos, 0, 0, 0) async def toggle_led_loop(pos, count, delay): for _ in range(count): await toggle_led(pos, delay) async def main(): task = asyncio.gather( toggle_led_loop(0, 60, 1), toggle_led_loop(1, 30, 2), toggle_led_loop(2, 20, 3), toggle_led_loop(3, 15, 4), toggle_led_loop(4, 12, 5), toggle_led_loop(5, 10, 6), ) return await task reset_np() asyncio.run(main())
無限ループにしても gather できる。
async def toggle_led_loop(pos, delay): while True: await toggle_led(pos, delay) async def main(): task = asyncio.gather( toggle_led_loop(0, 1), toggle_led_loop(1, 2), toggle_led_loop(2, 3), toggle_led_loop(3, 4), toggle_led_loop(4, 5), toggle_led_loop(5, 6), toggle_led_loop(6, 7), toggle_led_loop(7, 8), ) return await task
もうちょっと見栄えのするものを考えた。
全体として滑らかに点滅しつつ、個々のランプがそれぞれの周期で点滅。
# NeoPixel ESPr One # DIN -- IO0 # VCC -- 5V # GND -- GND import uasyncio as asyncio from machine import Pin from neopixel import NeoPixel import math np = NeoPixel(Pin(4, Pin.OUT), 8) np_buf = [[0, 0, 0] for _ in range(8)] def reset_np_buf(): for i in range(8): np_buf[i] = [0, 0, 0] async def toggle_led(pos, delay): await asyncio.sleep(delay) np_buf[pos] = [255, 255, 255] await asyncio.sleep(delay) np_buf[pos] = [0, 0, 0] async def blinking_display_loop(): count = 0 while True: a = math.sin(count) for i in range(8): np[i] = ( int(np_buf[i][0] * a), int(np_buf[i][1] * a), int(np_buf[i][2] * a), ) np.write() await asyncio.sleep(0.01) count += 0.05 if count > 3.14: count = 0 async def toggle_led_loop(pos, delay): while True: await toggle_led(pos, delay) async def main(): reset_np_buf() task = asyncio.gather( blinking_display_loop(), toggle_led_loop(0, 0.2), toggle_led_loop(1, 0.4), toggle_led_loop(2, 0.8), toggle_led_loop(3, 1.6), toggle_led_loop(4, 3.2), toggle_led_loop(5, 6.4), toggle_led_loop(6, 12.8), toggle_led_loop(7, 25.6), ) return await task asyncio.run(main())
入力ループと出力ループを非同期で回してみる。
下記は ESPr One の mode スイッチを押す(押して離す)たびに「緑点灯」「緑点滅」「黄色」「赤」の状態を繰り返す。
import uasyncio as asyncio from machine import Pin from neopixel import NeoPixel import math mode_sw = Pin(0, Pin.IN) np = NeoPixel(Pin(4, Pin.OUT), 8) np_buf = [[0, 0, 0] for _ in range(8)] events = [] app_status = 0 def mode_sw_irq_handler(pin): if pin.value() == 0: events.append("pressed") else: events.append("released") def reset_np_buf(): for i in range(8): np_buf[i] = [0, 0, 0] def update_lights(): reset_np_buf() if app_status in (0, 1): np_buf[0] = (0, 255, 0) elif app_status == 2: np_buf[1] = (255, 255, 0) elif app_status == 3: np_buf[2] = (255, 0, 0) async def input_loop(): global app_status while True: if events: event = events.pop(0) if event == "released": app_status += 1 if app_status == 4: app_status = 0 update_lights() await asyncio.sleep(0.01) async def blinking_display_loop(): count = 0 while True: for i in range(8): if app_status == 1: a = math.sin(count) np[i] = ( int(np_buf[i][0] * a), int(np_buf[i][1] * a), int(np_buf[i][2] * a), ) else: np[i] = tuple(np_buf[i]) np.write() await asyncio.sleep(0.01) count += 0.05 if count > 3.14: count = 0 async def main(): reset_np_buf() update_lights() task = asyncio.gather( input_loop(), blinking_display_loop(), ) return await task mode_sw.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=mode_sw_irq_handler) asyncio.run(main())
ボタンを3秒以上長押しすると3つのライトが全部点滅する「裏モード」になるという実装。変更点のみ。
モードの見通しが悪い、テストはどう書けばいいか、など、新しい課題が見えてくる。
import time pressed_at = time.ticks_ms() def mode_sw_irq_handler(pin): event_time = time.ticks_ms() if pin.value() == 0: events.append(("pressed", event_time)) else: events.append(("released", event_time)) def update_lights(): reset_np_buf() if app_status in (0, 1, 4): np_buf[0] = (0, 255, 0) if app_status in (2, 4): np_buf[1] = (255, 255, 0) if app_status in (3, 4): np_buf[2] = (255, 0, 0) async def input_loop(): global app_status, pressed_at while True: while events: event_name, event_time = events.pop(0) if event_name == "pressed": pressed_at = event_time elif event_name == "released": duration = time.ticks_diff(event_time, pressed_at) if duration >= 3000: app_status = 4 else: app_status += 1 if app_status >= 4: app_status = 0 update_lights() await asyncio.sleep(0.01) async def blinking_display_loop(): count = 0 while True: for i in range(8): if app_status in (1, 4): a = math.sin(count) np[i] = ( int(np_buf[i][0] * a), int(np_buf[i][1] * a), int(np_buf[i][2] * a), ) else: np[i] = tuple(np_buf[i]) np.write() await asyncio.sleep(0.01) count += 0.05 if count > 3.14: count = 0