比如开发需求是请求一个 http API,得到数据,解析一下返回,那么一般的做法是封装一个方法,比如

import httpx


def get_sth(p1, default=MY_VAL):
    # network
    r = httpx.get(API_URL, params={'t1': p1})
    # parsing
    res = r.json().get('my_key') or MY_VAL

但是如果想在 async/await 里用这段代码,就得改成

import httpx


async def get_sth(p1, default=MY_VAL):
    # network
    with http.AsyncClient() as client:
        r = await client.get(API_URL, params={'t1': p1})
    # parsing
    res = r.json().get('my_key') or MY_VAL

注意其中 def get_sth() 也必须改成 async def get_sth 。这就是所谓 async/await 传染性

这个时候,如果你想把这块代码抽象出来,让同步/异步的库都能调用,我在最近重构里找到一个最佳实践:

import httpx


def get_sth(p1, default=MY_VAL):
    r = httpx.Request('GET', API_URL, params={'t1': p1})
    r.parse = lambda x: (x.get('my_key') or MY_VAL)

# 同步调用:
with httpx.Client() as client:
    r = client.send(get_sth())
r.request.parse(r.json())

# 异步调用:
async with httpx.AsyncClient() as client:
    r = await client.send(get_sth())
r.request.parse(r.json())

思路是 逻辑和 transport解耦,也是某种意义上的 Sans-IO 了吧?