比如开发需求是请求一个 http API,得到数据,解析一下返回,那么一般的做法是封装一个方法,比如
import httpx
def get_sth(p1, default=MY_VAL):
# network
r = httpx.get(API_URL, params={'t1': p1})
# parsing
res = r.json().get('my_key') or MY_VAL
但是如果想在 async/await 里用这段代码,就得改成
import httpx
async def get_sth(p1, default=MY_VAL):
# network
with http.AsyncClient() as client:
r = await client.get(API_URL, params={'t1': p1})
# parsing
res = r.json().get('my_key') or MY_VAL
注意其中 def get_sth()
也必须改成 async def get_sth
。这就是所谓 async/await 传染性
这个时候,如果你想把这块代码抽象出来,让同步/异步的库都能调用,我在最近重构里找到一个最佳实践:
import httpx
def get_sth(p1, default=MY_VAL):
r = httpx.Request('GET', API_URL, params={'t1': p1})
r.parse = lambda x: (x.get('my_key') or MY_VAL)
# 同步调用:
with httpx.Client() as client:
r = client.send(get_sth())
r.request.parse(r.json())
# 异步调用:
async with httpx.AsyncClient() as client:
r = await client.send(get_sth())
r.request.parse(r.json())
思路是 逻辑和 transport解耦,也是某种意义上的 Sans-IO 了吧?