uasyncio

async def と await

MicroPython 1.16 (RP2)

>>> import uasyncio
>>> async def main():
...     print('hello')
...     await uasyncio.sleep(1)
...     print('world')
...
>>> uasyncio.run(main())
hello
world

下記を移植して動作確認していく。

https://docs.python.org/ja/3.9/library/asyncio-task.html

import uasyncio as asyncio
import time
 
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
 
async def main():
    print("started at {}".format(time.ticks_ms()))
    await say_after(1, 'hello')
    await say_after(2, 'world')
    print("finished at {}".format(time.ticks_ms()))
 
asyncio.run(main())
>>> asyncio.run(main())
started at 1830349
hello
world
finished at 1833354
>>>

下記は1秒ごとに print が実行される

import uasyncio, time
 
async def nested():
    time.sleep(1)
    return 42
 
async def main():
    print(await nested())
    print(await nested())
    print(await nested())
    print(await nested())
 
uasyncio.run(main())

そもそも async def とは

>>> async def hello():
...     print("hello")
...
>>> type(hello)
<class 'generator'>
>>> g = hello()
>>> g.__next__()
hello
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration:
>>>

await は function の中でしか使えない

>>> await hello()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
SyntaxError: 'await' outside function

下記のようにすると(意味があるかどうかはともかく)やっぱり generator を作る function になる

>>> def welcome():
...     await hello()
...
>>> welcome()
<generator object 'welcome' at 2000c4d0>
>>> g = welcome()
>>> g.__next__()
hello
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration:

これらを (u)asyncio.run に渡すと実行してくれる

>>> import uasyncio as asyncio
>>> asyncio.run(hello())
hello
>>> asyncio.run(welcome())
hello
>>>

下記の記事を参考にジェネレーターを作って run してみる

https://qiita.com/tomotaka_ito/items/35f3eb108f587022fa09

>>> def my_generator():
...     yield 1
...     yield 2
...     yield 3
...
>>> g = my_generator()
>>> g.__next__()
1
>>> g.__next__()
2
>>> g.__next__()
3
>>> g.__next__()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration:
>>> asyncio.run(my_generator())
>>>

調子に乗って下記をやってみたが、最初の yield しか実行しない

>>> def my_generator():
...     print(1)
...     time.sleep(1)
...     yield 1
...     print(2)
...     time.sleep(1)
...     yield 2
...     print(3)
...     time.sleep(1)
...     yield 3
...
>>> asyncio.run(my_generator())
1
>>>

generator であれば create_task できるのか?

>>> t = asyncio.create_task(my_generator())

task は作れたが、task は run できない

>>> asyncio.run(t)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "uasyncio/core.py", line 1, in run
  File "uasyncio/core.py", line 1, in create_task
TypeError: coroutine expected

task は await できるので function の中で await してみる

>>> def run_task():
...     await t
...

await が入っていると generator 生成関数になる。

>>> run_task()
<generator object 'run_task' at 2000d210>

これは run できる

>>> asyncio.run(run_task())
1
>>>

そもそもコルーチンかどうかはどうやって判定するのか?

CPython なら inspect.iscoroutinefunction() や asyncio.iscoroutinefunction() で判定するという話

https://stackoverflow.com/questions/36076619/test-if-function-or-method-is-normal-or-asynchronous

https://forum.micropython.org/viewtopic.php?f=15&t=9148

https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/__init__.py

上記を参考にして iscoroutine 関数を自作する

>>> async def _g():
...     pass
...
>>> type_coro = type(_g())
>>> type_coro
<class 'generator'>
>>> def iscoroutine(func):
...     return isinstance(func, type_coro)
...
>>>

なるほど asyncio.run できるものは True になる

>>> iscoroutine(hello())
True
>>> iscoroutine(welcome())
True
>>> iscoroutine(my_generator())
True
>>> iscoroutine(run_task())
True
>>> iscoroutine(nested())
True

>>> iscoroutine(run_task)
False
>>> iscoroutine(t)
False
>>> iscoroutine(my_generator)
False

Generator-based Coroutines というのがあって、3.10 で廃止予定

https://docs.python.org/ja/3.9/library/asyncio-task.html#generator-based-coroutines

ところで下記はできて当たり前

>>> for i in my_generator():
...     print(i)
...
1
1
2
2
3
3

iscoroutine が実質的に generator かどうかの判定なので、 かなり乱暴だが asyncio.run できるものは for in でも実行できるのでは?

>>> for i in hello():
...     print(i)
...
hello

>>> for i in nested():
...     print(i)
...

>>> for i in main():
...     print(i)
...
42
42
42
42

>>> for i in main():
...     assert i is None
...
42
42
42
42
>>>

だが print(i) は何も出さない。hello の中の print は実行されている。 ジェネレーターとしては None を返すだけである。

Task をもうちょっと追及する

https://docs.python.org/ja/3.9/library/asyncio-task.html#task-object

async def cancel_me():
    print('cancel_me(): before sleep')
    try:
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')
async def main():
    task = asyncio.create_task(cancel_me())
    await asyncio.sleep(1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")
asyncio.run(main())

このままペーストモードで実行したら動いた

>>>
paste mode; Ctrl-C to cancel, Ctrl-D to finish
=== async def cancel_me():
===     print('cancel_me(): before sleep')
===     try:
===         await asyncio.sleep(3600)
===     except asyncio.CancelledError:
===         print('cancel_me(): cancel sleep')
===         raise
===     finally:
===         print('cancel_me(): after sleep')
=== async def main():
===     task = asyncio.create_task(cancel_me())
===     await asyncio.sleep(1)
===     task.cancel()
===     try:
===         await task
===     except asyncio.CancelledError:
===         print("main(): cancel_me is cancelled now")
=== asyncio.run(main())
===
===
cancel_me(): before sleep
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now

ちなみに asyncio.run で cancel_me は直接実行できる(たぶん1時間かえってこない)

>>> asyncio.run(cancel_me())
cancel_me(): before sleep

ふと asyncio.sleep は何者だろうかと思って試した

>>> asyncio.run(asyncio.sleep(1))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "uasyncio/core.py", line 1, in run
  File "uasyncio/core.py", line 1, in create_task
TypeError: coroutine expected
>>>

>>> iscoroutine(asyncio.sleep(1))
False

>>> asyncio.sleep(1)
<SingletonGenerator object at 20008f20>

こんなこともやってみた。結論を先に言うと sleep1 は1秒待たないで終了する。SingletonGenerator を作っただけで実行していない様子。

>>> async def sleep1():
...     asyncio.sleep(1)
...
>>> iscoroutine(sleep1())
True
>>> asyncio.run(sleep1())  # すぐ終わる
>>>

こちらは期待通りに1秒待つ。CPython では正しくない書き方。

>>> def sleep2():
...     await asyncio.sleep(1)
...
>>> iscoroutine(sleep2())
True
>>> asyncio.run(sleep2())  # 1秒待つ
>>>

下記も sleep2 と同様に動作する。正しい書き方。

>>> async def sleep3():
...     await asyncio.sleep(1)
...
>>> iscoroutine(sleep3())
True
>>> asyncio.run(sleep3())  # 1秒待つ
>>>

CPython では sleep2 は文法エラーになる

>>> import asyncio
>>> def sleep2():
...     await asyncio.sleep(1)
...
  File "<stdin>", line 2
SyntaxError: 'await' outside async function

こちらは CPython で動く

>>> async def sleep3():
...     await asyncio.sleep(1)
...
>>> asyncio.run(sleep3())
>>>

https://docs.python.org/ja/3.9/reference/compound_stmts.html#async-def

async with と async for

gather

ここまで見てきて、CPython と内部実装はかなり異なることがわかってきた。。

Python実践入門 10.3節 の asyncio の説明を読みつつ、同じことが uasyncio でできるのか見ていく。

import asyncio
async def my_sleep(n):
  asyncio.sleep(n)
  return n
async def coro():
  return await asyncio.gather(my_sleep(1), my_sleep(2))
asyncio.run(coro())

実は間違っているので、CPython 3.9 だとこうなる

<stdin>:2: RuntimeWarning: coroutine 'sleep' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
[1, 2]

この never awaited 警告は MicroPython 1.16 だと出ない。

本来書きたかったのは下記。

async def my_sleep(n):
  await asyncio.sleep(n)
  return n

uasyncio にないもの

  • get_running_loop がない (get_event_loopがある)
  • run_in_executor がない

aiohttp

ESP32 で WiFi 接続済みの REPL から

>>> import upip
>>> upip.install("micropython-uaiohttpclient")
Installing to: /lib/
Warning: micropython.org SSL certificate is not validated
Installing micropython-uaiohttpclient 0.5.1 from https://micropython.org/pi/uaiohttpclient/uaiohttpclient-0.5.1.tar.gz
>>> import uaiohttpclient as aiohttp

やってみると ClientSession がない。

だからこういう example なのか。。

https://github.com/micropython/micropython-lib/blob/master/micropython/uaiohttpclient/example.py

CPython で確認したら、下記はどちらでもよさそう

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
asyncio.run(main())

ESPr One (ESP8266)

since 2021-09-06

MicroPython 1.17

オンボードの青色LED (IO14) を点滅させる。

import uasyncio as asyncio
from machine import Pin
led = Pin(14, Pin.OUT)
led.off()
 
async def toggle_led(delay):
    await asyncio.sleep(delay)
    led.on()
    await asyncio.sleep(delay)
    led.off()
 
async def main():
    for _ in range(10):
        await toggle_led(1)
 
asyncio.run(main())

NeoPixel 8 LED でやってみた。なるほど。

# NeoPixel      ESPr One
# DIN       --  IO4
# VCC       --  5V
# GND       --  GND
 
import uasyncio as asyncio
from machine import Pin
from neopixel import NeoPixel
 
np = NeoPixel(Pin(4, Pin.OUT), 8)
 
 
def reset_np():
    for i in range(8):
        np[i] = (0, 0, 0)
    np.write()
 
 
def set_np(pos, r, g, b):
    np[pos] = (r, g, b)
    np.write()
 
 
async def toggle_led(pos, delay):
    await asyncio.sleep(delay)
    set_np(pos, 255, 255, 255)
    await asyncio.sleep(delay)
    set_np(pos, 0, 0, 0)
 
 
async def toggle_led_loop(pos, count, delay):
    for _ in range(count):
        await toggle_led(pos, delay)
 
 
async def main():
    task = asyncio.gather(
        toggle_led_loop(0, 60, 1),
        toggle_led_loop(1, 30, 2),
        toggle_led_loop(2, 20, 3),
        toggle_led_loop(3, 15, 4),
        toggle_led_loop(4, 12, 5),
        toggle_led_loop(5, 10, 6),
    )
    return await task
 
 
reset_np()
asyncio.run(main())

無限ループにしても gather できる。

async def toggle_led_loop(pos, delay):
    while True:
        await toggle_led(pos, delay)
 
 
async def main():
    task = asyncio.gather(
        toggle_led_loop(0, 1),
        toggle_led_loop(1, 2),
        toggle_led_loop(2, 3),
        toggle_led_loop(3, 4),
        toggle_led_loop(4, 5),
        toggle_led_loop(5, 6),
        toggle_led_loop(6, 7),
        toggle_led_loop(7, 8),
    )
    return await task

もうちょっと見栄えのするものを考えた。

全体として滑らかに点滅しつつ、個々のランプがそれぞれの周期で点滅。

# NeoPixel      ESPr One
# DIN       --  IO0
# VCC       --  5V
# GND       --  GND
 
 
import uasyncio as asyncio
from machine import Pin
from neopixel import NeoPixel
import math
 
np = NeoPixel(Pin(4, Pin.OUT), 8)
np_buf = [[0, 0, 0] for _ in range(8)]
 
 
def reset_np_buf():
    for i in range(8):
        np_buf[i] = [0, 0, 0]
 
 
async def toggle_led(pos, delay):
    await asyncio.sleep(delay)
    np_buf[pos] = [255, 255, 255]
    await asyncio.sleep(delay)
    np_buf[pos] = [0, 0, 0]
 
 
async def blinking_display_loop():
    count = 0
    while True:
        a = math.sin(count)
        for i in range(8):
            np[i] = (
                int(np_buf[i][0] * a),
                int(np_buf[i][1] * a),
                int(np_buf[i][2] * a),
            )
        np.write()
        await asyncio.sleep(0.01)
        count += 0.05
        if count > 3.14:
            count = 0
 
 
async def toggle_led_loop(pos, delay):
    while True:
        await toggle_led(pos, delay)
 
 
async def main():
    reset_np_buf()
    task = asyncio.gather(
        blinking_display_loop(),
        toggle_led_loop(0, 0.2),
        toggle_led_loop(1, 0.4),
        toggle_led_loop(2, 0.8),
        toggle_led_loop(3, 1.6),
        toggle_led_loop(4, 3.2),
        toggle_led_loop(5, 6.4),
        toggle_led_loop(6, 12.8),
        toggle_led_loop(7, 25.6),
    )
    return await task
 
 
asyncio.run(main())

入力ループと出力ループを非同期で回してみる。

下記は ESPr One の mode スイッチを押す(押して離す)たびに「緑点灯」「緑点滅」「黄色」「赤」の状態を繰り返す。

import uasyncio as asyncio
from machine import Pin
from neopixel import NeoPixel
import math
 
mode_sw = Pin(0, Pin.IN)
np = NeoPixel(Pin(4, Pin.OUT), 8)
np_buf = [[0, 0, 0] for _ in range(8)]
events = []
app_status = 0
 
 
def mode_sw_irq_handler(pin):
    if pin.value() == 0:
        events.append("pressed")
    else:
        events.append("released")
 
 
def reset_np_buf():
    for i in range(8):
        np_buf[i] = [0, 0, 0]
 
 
def update_lights():
    reset_np_buf()
    if app_status in (0, 1):
        np_buf[0] = (0, 255, 0)
    elif app_status == 2:
        np_buf[1] = (255, 255, 0)
    elif app_status == 3:
        np_buf[2] = (255, 0, 0)
 
 
async def input_loop():
    global app_status
    while True:
        if events:
            event = events.pop(0)
            if event == "released":
                app_status += 1
                if app_status == 4:
                    app_status = 0
                update_lights()
        await asyncio.sleep(0.01)
 
 
async def blinking_display_loop():
    count = 0
    while True:
        for i in range(8):
            if app_status == 1:
                a = math.sin(count)
                np[i] = (
                    int(np_buf[i][0] * a),
                    int(np_buf[i][1] * a),
                    int(np_buf[i][2] * a),
                )
            else:
                np[i] = tuple(np_buf[i])
        np.write()
        await asyncio.sleep(0.01)
        count += 0.05
        if count > 3.14:
            count = 0
 
 
async def main():
    reset_np_buf()
    update_lights()
    task = asyncio.gather(
        input_loop(),
        blinking_display_loop(),
    )
    return await task
 
 
mode_sw.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=mode_sw_irq_handler)
asyncio.run(main())

ボタンを3秒以上長押しすると3つのライトが全部点滅する「裏モード」になるという実装。変更点のみ。

モードの見通しが悪い、テストはどう書けばいいか、など、新しい課題が見えてくる。

import time
 
pressed_at = time.ticks_ms()
 
 
def mode_sw_irq_handler(pin):
    event_time = time.ticks_ms()
    if pin.value() == 0:
        events.append(("pressed", event_time))
    else:
        events.append(("released", event_time))
 
 
def update_lights():
    reset_np_buf()
    if app_status in (0, 1, 4):
        np_buf[0] = (0, 255, 0)
    if app_status in (2, 4):
        np_buf[1] = (255, 255, 0)
    if app_status in (3, 4):
        np_buf[2] = (255, 0, 0)
 
 
async def input_loop():
    global app_status, pressed_at
    while True:
        while events:
            event_name, event_time = events.pop(0)
            if event_name == "pressed":
                pressed_at = event_time
            elif event_name == "released":
                duration = time.ticks_diff(event_time, pressed_at)
                if duration >= 3000:
                    app_status = 4
                else:
                    app_status += 1
                    if app_status >= 4:
                        app_status = 0
                update_lights()
        await asyncio.sleep(0.01)
 
 
async def blinking_display_loop():
    count = 0
    while True:
        for i in range(8):
            if app_status in (1, 4):
                a = math.sin(count)
                np[i] = (
                    int(np_buf[i][0] * a),
                    int(np_buf[i][1] * a),
                    int(np_buf[i][2] * a),
                )
            else:
                np[i] = tuple(np_buf[i])
        np.write()
        await asyncio.sleep(0.01)
        count += 0.05
        if count > 3.14:
            count = 0
uasyncio.txt · 最終更新: 2021/09/07 12:29 by Takuya Nishimoto
www.chimeric.de Valid CSS Driven by DokuWiki do yourself a favour and use a real browser - get firefox!! Recent changes RSS feed Valid XHTML 1.0