Fastapi系列-同步和异步相互转换处理实践

前言

我们知道Fastapi本身是一个混搭的框架,它既可以使用同步的方式也可以使用的异步的方式来运行我们的服务。

  • 同步的方式主要是使用多线程的方式来实现并发
  • 异步的方式则主要是基于协程的方式来实现并发

混搭的框架中主要遇到的问题点有:

  • 在同步函数里面如果有涉及到调用异步的函数(或协程对象的)
  • 在异步的函数里如果遇到的同步的代码(耗时的会引发阻塞的或部分的库不支持异步协程的)

以上两种场景情况都需要我们进行处理转化。

基础知识点

1 转换的说明

不管是从同步到异步,还是异步到同步,他们涉及的问题相对复杂,甚至转化过程其实会有相关的性能损耗,因为在转换处理中,我们需要考虑的是线程安全或协程安全等问题。

  • 同步到异步的转化:

    • 问题处理过程是需要把同步运行的代码丢进一个额外的线程池里面去执行,为避免拥堵阻塞,并且在单独线程执行完成我们的同步函数的逻辑处理之后需要把结果返回异步协程中。
  • 异步到同步的转化:

    • 问题处理过程是需要在把在运行在主线程上的是协程函数的调用转化为一个在子线程中运行的普通函数,并在异步函数处理完成的逻辑之后返回到同步运行的子线程中。

2 涉及一些函数

2.1 run_in_executor(同步到异步)

  • 用于在异步函数中(被async修饰的异步函数)里面,调用同步函数(如果此时同步函数是阻塞的话,那么有可能引起整个单线程模式的整个事件循环进行阻塞)的时候把同步函数的调用放到线程池里面进行执行,让其他任务也可以继续在事件循环中保持执行
  • 简而言之就是:一个是主线程,直接新开线程,运行阻塞函数

2.2 asyncio.run_coroutine_threadsafe(异步到同步)

  • 用于在同步函数里面调用协程对象的处理方案(非协程函数中不能使用await进行调用异步对象)。它最终的处理机制值:这个是将asyncio包的future对象转化返回一个concurrent.futures包的future对象。
  • 简而言之就是:动态的加入协程,参数为一个回调函数和一个loop对象,返回值为future对象,通过future.result()获取回调函数返回值

2.3 asyncio.to_thread

该函数的在py3.9版本中才提供的,它是对asyncio的操作多线程一种方案,使用它可以直接进行函数直接开线程。

转化处理实际案例

下面我们展开具体的案例来进行说明。

1、异步到同步的转化

这场景是比较场景的,比如:你需要的多线程的模式下读取对应提交过来的body的时候。由于本身Fastapi提供的reques.body()是一个协程函数,也就是需要使用awite reques.body()所以就遇到转换问题。

1.1 问题表现

@app.post("/get/access_token")
def access_token(reques:Request,name=Body(...)):
     print(reques.body())
    await reques.body()
    return 'pk'

在上面中await reques.body()明显是会错,在普通函数里面嵌套使用await调用携程函数是不可以。所以我们需要进行转换。问题解决,引入django中提供一个关于转换包asgiref。这个包独立的,他可以独立安装。框架安装的时候本身其实就默认的自带安装asgiref。

1.2 asgiref包介绍

asgiref提供了两个非常方便的转换函数,通过asgiref可以把异步或同步函数进行相互的转换。

  • AsyncToSync把异步转同步 :就是在异步函数(协程函数)转换普通的函数,方便在同步线程函数中调用
  • SynctoAsync把同步转异步:就是把一个同线程调用函数使用线程的模式转换为多线程的调用并包装为一个(协程函数)返回

但是如果你查看asgiref说明,其中有需要注意的点就是:

默认情况下, sync_to_async出于安全原因,将在同一线程中运行程序中的所有同步代码;您可以使用 禁用此功能以获得更高的性能 @sync_to_async(thread_sensitive=False),但请确保您的代码在执行此操作时不依赖任何绑定到线程(如数据库连接)的内容。

上面的说明是来自官网的说明,其实就是如果不考虑线程安全问题的,建议设置thread_sensitive=false,这样可以获得更好的性能。但是如果存在上下文依赖绑定到线程的时候,也就是通常所说的本地线程之类的的时候,则不能关闭。

1.3 asgiref 异步到同步的转换

from fastapi import FastAPI,Request
from fastapi.responses import PlainTextResponse
from fastapi.params import Body
from fastapi.background import  BackgroundTasks

# 定义我们的APP服务对象
app = FastAPI()


@app.post("/get/access_token")
def access_token(reques:Request,name=Body(...)):
    # print(reques.body())
    from asgiref.sync import async_to_sync
    body = async_to_sync(reques.body)()
    print(body)
    return PlainTextResponse(body.decode(encoding='utf-8'))


if __name__ == '__main__':
    import uvicorn
    uvicorn.run('kkk:app', host="127.0.0.1", port=8100, debug=True, reload=True)

上述代码中,关键代码部分是:

from asgiref.sync import async_to_sync
body = async_to_sync(reques.body)()

需要注意的是:async_to_sync需要传入的是一个awaitable,返回是一个函数,所以我们的还需要在函数的后面进行()进行一个函数的调用处理。最终如下图所示结果:

image.png

2、同步到异步的转化

在这里我们需要说一下,我们为什么需要把同步转为异步呢?因为目前大部分的库有可能还没有对应的aio的实现,假设比如我们的场景的http请求库(目前异步的已经有了,如aiohttp,httpx等,这里仅仅是假设)requests,如果我们的项目整个过程都是使用异步的方式来进行处理,那么我们如何在异步的逻辑中使用同步的,而又不因为产生阻塞的呢?毕竟我们的知道协程是单进程单线程的,如果调用同步的话,不用怀疑,肯定是会阻塞的,所以我们的就需要做转换了。

2.1 BackgroundTasks案例代码演示:

其实关于异步到同步的转换在FastApi框架比较明显的地方就是后台任务,通过后台任务的源码我们可以分析到,它对对我们的同步的函数如何转换已经有明显的示例,如BackgroundTasks源码:

import asyncio
import typing

from starlette.concurrency import run_in_threadpool


class BackgroundTask:
    def __init__(
        self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
    ) -> None:
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            await self.func(*self.args, **self.kwargs)
        else:
            await run_in_threadpool(self.func, *self.args, **self.kwargs)

在上述的代码中,我们可以看到他们是通过对函数判断是否是is_async来处理,如果是协程函数则直接的

 await self.func(*self.args, **self.kwargs)

如果是普通函数那么他就进行使用

await run_in_threadpool(self.func, *self.args, **self.kwargs)

进行包裹之后返回具体一个协程对象。

然后继续顺着run_in_threadpool的源码查看,


async def run_in_threadpool(
    func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any
) -> T:
    if contextvars is not None:  # pragma: no cover
        # Ensure we run in the same context
        child = functools.partial(func, *args, **kwargs)
        context = contextvars.copy_context()
        func = context.run
        args = (child,)
    elif kwargs:  # pragma: no cover
        # run_sync doesn't accept 'kwargs', so bind them in here
        func = functools.partial(func, **kwargs)
    return await anyio.to_thread.run_sync(func, *args)

我们可以看到他是通过

await anyio.to_thread.run_sync(func*args)

来完成最终的转换。如果有需要,我们可以参考这个来做自己的转换库。

2.2 asgiref 同步到异步的转换

我们上面提到asgiref的转换是相互,同样的asgiref也提供了对应的转换。

asgiref 官网文档地址为:

asgi.readthedocs.io/en/latest/ 具体案例:


from fastapi import FastAPI,Request
from fastapi.responses import PlainTextResponse,HTMLResponse
from asgiref.sync import sync_to_async
import requests
# 定义我们的APP服务对象
app = FastAPI()

def getdata():
    return requests.get('http://www.baidu.com').text

@app.get("/get/access_token")
async def access_token(reques:Request):
    asds= await sync_to_async(func=getdata)()
    return HTMLResponse(asds)


if __name__ == '__main__':
    import uvicorn
    uvicorn.run('kkk:app', host="127.0.0.1", port=8100, debug=True, reload=True)

需要注意点是sync_to_async返回的是携程的函数,所以后面需要转换为携程对象,需要加()

await说明科普:

  • await后面跟的是一个协程对象或Awaitable,它主要的目的声明程序在此处需要挂起且等待协程控制流的返回。

2.3 asyncer Fastapi框架大佬自己写转换库

asyncer和asgiref一样,它不仅可以把同步代码异步化,还可以把异步代码同步化。我们上面提到asgiref的转换是相互,同样的asyncer也提供了对应的转换。

asyncer 官网文档地址为:

asyncer.tiangolo.com/

库的安装:

pip isntall asyncer

通过官网示例,我们使用起来也很方便,这里我列举官方的示例,如下示例是同步转换异步的示例:

from fastapi import FastAPI,Request
from fastapi.responses import PlainTextResponse,HTMLResponse
from asyncer import asyncify
import requests
# 定义我们的APP服务对象
app = FastAPI()

def do_sync_work(name):
    return requests.get(f'http://www.baidu.com?name={name}').text

@app.get("/get/access_token")
async def access_token(reques:Request):
    # asds= await sync_to_async(func=getdata)()
    message = await asyncify(do_sync_work)(name="World")
    return HTMLResponse(message)


if __name__ == '__main__':
    import uvicorn
    uvicorn.run('kkk:app', host="127.0.0.1", port=8100, debug=True, reload=True)

需要注意点是asyncify返回的是携程的函数,所以后面需要转换为携程对象,需要加()

await说明科普:

  • await后面跟的是一个协程对象或Awaitable,它主要的目的声明程序在此处需要挂起且等待协程控制流的返回。

2.4 awaits库,通过装饰器形式

还有一个比较有点意思的库awaits,通过装饰器转换

awaits 官网文档地址为:

github.com/pomponchik/…

感兴趣的可以直接的查阅官网地址进行使用,这个是库感觉是值得研究一下的。

2.5 aioify 让每个函数都可以异步且等待的库

aioify库按他自己说法是:让每个函数都可以异步且等待。

库的安装:

pip isntall aioify

具体示例:

from fastapi import FastAPI,Request
from fastapi.responses import PlainTextResponse,HTMLResponse
from aioify import aioify
import requests
# 定义我们的APP服务对象
app = FastAPI()

def do_sync_work(name):
    return requests.get(f'http://www.baidu.com?name={name}').text

@app.get("/get/access_token")
async def access_token(reques:Request):
    # asds= await sync_to_async(func=getdata)()
    message = await aioify(do_sync_work)(name="World")
    print(message)
    return HTMLResponse(message)


if __name__ == '__main__':
    import uvicorn
    uvicorn.run('kkk:app', host="127.0.0.1", port=8100, debug=True, reload=True)

其实多数的库是一样的。直接进行转换即可使用。它还可以当做装饰器直接的同步函数装饰之后,进行使用,如下示例代码:

from fastapi import FastAPI,Request
from fastapi.responses import PlainTextResponse,HTMLResponse
from aioify import aioify
import requests
# 定义我们的APP服务对象
app = FastAPI()

@aioify
def do_sync_work(name):
    return requests.get(f'http://www.baidu.com?name={name}').text

@app.get("/get/access_token")
async def access_token(reques:Request):
    # asds= await sync_to_async(func=getdata)()
    message = await do_sync_work(name="World")
    print(message)
    return HTMLResponse(message)


if __name__ == '__main__':
    import uvicorn
    uvicorn.run('kkk:app', host="127.0.0.1", port=8100, debug=True, reload=True)

2.6 总结-同步到异步转换的本质

各库的实现各有千秋,但本质内部源码殊途同归!同步到异步转换的本质其实就是把同步的函数通过线程的方式进行调用,如下的代码示例:

 def asyncify(
    function: Callable[T_ParamSpec, T_Retval],
    *,
    cancellable: bool = False,
    limiter: Optional[anyio.CapacityLimiter] = None
) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:

    async def wrapper(
        *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
    ) -> T_Retval:
        partial_f = functools.partial(function, *args, **kwargs)
        return await anyio.to_thread.run_sync(
            partial_f, cancellable=cancellable, limiter=limiter
        )

    return wrapper

其实都是通过await anyio.to_thread.run_sync来进行处理。