asyncio在异步请求与批处理请求中的应用

批处理api简介

在现代Python应用程序中,使用REST或其他基于web的技术访问远程API是很常见的。批处理api能够用一个调用处理多个请求。您可以使用批处理api来减少对远程服务的网络调用数量。当您必须对远程服务进行大量调用,这些调用可以批处理为单个请求时,这是超级理想的。

假设您有一个返回股票当前价格的REST API。使用一个接受单个股票标识符并返回当前价格的简单API,如果需要获得1000只股票的价格,则需要进行1000个API调用。提供一样功能的批处理API将在请求中接受一组股票标识符,并为所有请求的标识符返回当前价格。使用批处理API,您可以在一个请求中获取所需的所有价格。这减少了网络开销,从而减少了应用程序的延迟。它还可能减少远程服务器上的负载。

asyncio在异步请求与批处理请求中的应用

asyncio在异步请求与批处理请求中的应用

在本文中,您将学习如何将批处理模式与Python的asyncio包一起使用,将许多单独的函数调用批处理为数量更少的请求。

动机:Excel中的异步Python函数

本文来自Python Excel外接程序PyXLL的一个用户,他提出了一个关于如何使用批处理API简化Excel电子表格的问题。

PyXLL将Python嵌入到Excel中,它支持在Excel电子表格中直接调用Python函数。每次单元格使用Python函数计算时,它都会调用该Python函数。在本例中,该函数是一个向REST服务器发出请求的异步函数。

数千个单元格向REST API发出单个请求的工作表花费的时间太长了。解决方案是使用批处理模式!

asyncio在异步请求与批处理请求中的应用

背景:AsyncIO和并发

当向远程服务器发出多个请求时,一般不希望发送一个请求并等待响应后再发送下一个请求。一般,并行发送多个请求(同时)并等待所有响应要快得多。您可以在Python中使用多线程或异步编程来实现这一点。本节概述多线程和异步编程。你也会看到为什么你会选择其中一个而不是另一个。

多线程

多线程是一种同时执行多个任务的方法。在线程模型中,启动多个线程,每个线程同时执行其代码。如果您的问题是CPU限制的,将其分解为使用多线程并行运行的任务会有所协助。当一个程序的主要性能瓶颈是CPU处理时间时,它就被称为CPU限制程序。

对于特定于Python的线程,有一些微妙之处您不会在本文中讨论,但在理论上,这基本上就是它的工作方式!

计算机的操作系统管理所有线程,并确保每个线程都共享CPU时间。这增加了复杂性,由于每次上下文切换都需要花时间做其他事情。这种复杂性随着线程数量的增加而增加。当瓶颈是等待IO(例如网络请求)时,为每个请求运行多个线程并等待网络响应是超级不理想的。它也无法扩展到数千个请求。这就是异步编程的由来。

使用asyncio进行异步编程

Python中的异步编程是一种不同的并发模型,它不使用多线程。相反,一切都运行在一个线程上,Python管理活动任务之间的切换。它超级适合使用大量网络请求或其他io绑定任务(如磁盘或数据库访问)的程序。

事件循环管理一组正在运行的任务。当一个任务在等待诸如网络请求之类的东西完成时,它就会“等待”。当一个任务在等待时,事件循环可以调度其他任务运行。这允许另一个任务发送另一个网络请求,然后等待,允许另一个任务运行,以此类推。当网络请求准备好时,事件循环可以恢复任务。这使得我们可以同时处理多个请求,而不必为每个请求增加一个线程的开销。

Python关键字”async”将一个函数指定为在事件循环中运行的异步函数。关键字“await”应用于事件循环,等待另一个异步函数或任务完成。Python包“asyncio”提供了管理异步任务所需的原语。

批处理API的优点

上面你学到了你可以同时发出多个请求。这比在发送下一个请求之前等待每个请求返回要快得多。如果可以在同一时间发送所需的所有请求,为什么还需要批处理API呢?

发送多个请求比发送单个请求需要更多的网络流量。如果您可以使用一个请求请求所需的所有数据,那么从数据传输的角度来看,这将更加有效。

它还有其他好处。例如,如果远程服务器可以通过一次性获取所有内容来减少所需的工作量,那么它为一个批处理请求提供服务所需的时间实际上会少于为一样的单个请求提供服务所需的总时间。

Python中的批处理模式

目前您了解了什么是批处理API,以及您可以使用Python中的异步编程并发地发出多个请求,什么是批处理模式以及为什么需要它?

简单地说,批处理模式将多个请求收集到一个请求中,并一次性分发该请求。在本文的其余部分中,您将看到如何使用该特性将一个使用大量独立请求的实现转换为一个将请求批量处理在一起的实现,从而减少对远程服务器的调用。

示例:获取街道地址的位置

您将使用获取街道地址的位置作为示例。为此,您可以使用来自https://www.geoapify.com/的REST API。你可以注册一个免费的测试层,它支持批量获取多个位置。要使用下面的代码,您需要注册并获得API密钥。

下面是第一次尝试一些代码来获取一些街道地址的位置:

# Import the modules you're going to use.
# You may need to 'pip install aiohttp'
from urllib.parse import urlencode
import asyncio
import aiohttp
import json
# Test data
STREET_ADDRESSES = [
    "1600 Pennsylvania Avenue, Washington DC, USA",
    "11 Wall Street New York, NY",
    "350 Fifth Avenue New York, NY 10118",
    "221B Baker St, London, England",
    "Tour Eiffel Champ de Mars, Paris",
    "4059 Mt Lee Dr.Hollywood, CA 90068",
    "Buckingham Palace, London, England",
    "Statue of Liberty, Liberty Island New York, NY 10004",
    "Manger Square, Bethlehem, West Bank",
    "2 Macquarie Street, Sydney"
]
# Constants for accessing the Geoapify API
GEOCODING_API = "https://api.geoapify.com/v1/geocode/search"
YOUR_API_KEY = "xxxx-xxxx-xxxx-xxxx"
async def get_location(address):
    """Return (latitude, longitude) from an address."""
    # Construct the URL to do the lookup for a single address
    query_string = urlencode({
        "apiKey": YOUR_API_KEY,
        "text": address,
        "limit": 1,
        "format": "json"
    })
    url = f"{GEOCODING_API}?{query_string}"
    # Make the request to the API
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.read()
            # Read the json string and return the latitude and longitude
            # from the first result (there will only be one)
            results = json.loads(data.decode())["results"]
            return results[0]["lat"], results[0]["lon"]
async def main():
    # Print the city for each IP address
    tasks = []
    for address in STREET_ADDRESSES:
        location = await get_location(address)
        print(f"{address} -> {location}")
# Because it's an async function you need to run it using the asyncio event loop
loop = asyncio.new_event_loop()
loop.run_until_complete(main())

您可能已经注意到,上面的代码依旧按顺序为每个地址调用API。尽管使用了异步,但for循环目前在移动到下一个地址之前等待每个请求完成。要解决这个问题,你可以使用asyncio函数”gather”。通过将任务集合在一起并在最后等待它们,你不需要单独等待它们。

你更新的主函数目前看起来像这样:

async def main():
   # Get the location for each address
   tasks = []
   for address in STREET_ADDRESSES:
       tasks.append(get_location(address))
   # Wait for all tasks to complete
   locations = await asyncio.gather(*tasks)
   # Print them all once all requests have completed
   for address, location in zip(STREET_ADDRESSES, locations):
       print(f"{address} -> {location}")

您依旧在向服务器发送多个请求。接下来,您将看到批处理模式如何在不修改主函数的情况下对这些请求进行批处理,以减少请求的数量。

示例:使用批处理API获取多个位置

使用批处理API,您可以一次性提交多个请求。如果处理请求的服务器能够比单个请求更有效地处理批处理,那么在处理多个查询时使用批处理请求会更快。

您将使用上面使用的地理编码API的批处理版本。这有点复杂。而不是提交一个单独的地址作为URL的一部分,你必须作出一个POST请求。由于处理一个批处理可能需要一点时间,而不是立即返回结果,服务器将第一响应一个请求id,然后您查询该请求id以检查结果是否准备好了。这是实现批处理API时常用的模式。

下面的函数向API查询地址列表的位置。它使用对批处理API的单个请求来实现这一点。

# Constants for accessing the Geoapify batch API
GEOCODING_BATCH_API = "https://api.geoapify.com/v1/batch/geocode/search"
YOUR_API_KEY = "xxxx-xxxx-xxxx-xxxx"
async def get_locations(addresses):
    """Return a dictionary of address -> (lat, lon)."""
    # Construct the URL to do the batch request
    query_string = urlencode({"apiKey": YOUR_API_KEY})
    url = f"{GEOCODING_BATCH_API}?{query_string}"
    # Build the JSON payload for the batch POST request
    data = json.dumps(addresses)
    # And use Content-Type: application/json in the headers
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json"
    }
    # Make the POST request to the API
    async with aiohttp.ClientSession() as session:
        async with session.post(url, data=data, headers=headers) as response:
            response_json = await response.read()
            response_data = json.loads(response_json)
    # The API can return a dict with a pending status if it needs more
    # time to complete. Poll the API until the result is ready.
    while isinstance(response_data, dict) 
    and response_data.get("status") == "pending":
        # Wait a bit before calling the API
        await asyncio.sleep(0.1)
        # Query the result to see if it's ready yet
        request_id = response_data.get("id")
        async with aiohttp.ClientSession() as session:
            async with session.get(url + f"&id={request_id}") as response:
                response_json = await response.read()
                response_data = json.loads(response_json)
    # Gather the results into a dictionary of address -> (lat, lon)
    locations = {}
    for result in response_data:
        address = result["query"]["text"]
        coords = result["lat"], result["lon"]
        locations[address] = coords
    return locations

把它们放在一起:批处理模式

目前,您有了一个函数,可以调用批处理API来批量查找地址列表的位置。您的下一个任务是重构“get_location”,以便它可以利用批处理API,而不必更改“main”函数。

为什么不改变“main”功能呢?在这个简单的示例中,将main函数更改为调用get_locations是很简单的。在现实的项目中,这种重构一般不是那么简单。其他时候,甚至不希望更改函数的输入,您一般希望屏蔽函数的最终用户,不让他们了解实现细节。

回到激发本文灵感的最初问题,那是关于使用Excel外接程序PyXLL从Excel调用Python函数的。在这种情况下,最终用户是一个可能对Python一无所知的Excel用户。使用一个函数接受一个输入并返回一个输出符合他们作为Excel用户的期望。让他们接触批量的概念会不必要地混淆问题。这也意味着他们必须构建自己的电子表格,以有效的方式调用它。在这种情况下,在保持终端用户看到的界面的同时,在幕后处理请求的批处理绝对是一种优势。

它是如何工作的

在伪代码中,我们想要写的是:

async def get_location(address)
    1. Put the address on a queue of requests.
    2. Start a background task that:
        i. Waits a short time for other requests to be enqueued.
        ii. Processes all queued requests as a batch.
        iii. Notifies the waiting 'get_location' functions.
    3. Wait for the result and return it.

批处理请求

你可以在Python中使用asyncio来实现这一点。你的”get_location()”函数可以启动一个后台任务来处理任何排队请求。它将一直等待,直到后台任务处理完包含您请求的批处理,然后返回它。后台任务应该只启动一次,所以在启动它之前需要检查它是否已经在运行。如果多次调用“get_location”,由于它是一个异步函数,它可以在其他函数等待时运行。每个后续调用将向当前队列添加一个请求。

为了将结果从后台任务返回给等待的get_location函数,你将使用asyncio原语”Future”。Future是一个可等待对象,当被等待时,它将阻塞,直到设置结果。

你的”get_location()”函数被重写成批处理请求,使用future将结果传回,如下所示:

# State for batching requests
ADDRESSES_BATCH = []
BATCH_LOOP_RUNNING = False
async def get_location(address):
    """Return (latitude, longitude) from an address."""
    global BATCH_LOOP_RUNNING
    # Create a Future that will be set with the location once the
    # request has completed.
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    # Add the ip address and future to the batch
    ADDRESSES_BATCH.append((address, future))
    # Start 'process_batches' running on the asyncio event loop if it's
    # not already running.
    # We've not written 'process_batches_loop' yet!
    if not BATCH_LOOP_RUNNING:
        BATCH_LOOP_RUNNING = True
        asyncio.create_task(process_batches_loop())
    # Wait for the batch your address is in to return
    await future
    # And return the result
    return future.result()

上面的代码创建了一个asyncio。对象,并将其和地址添加到将作为批处理的列表中。如果处理批处理的循环没有运行,则使用“asyncio.create_task”启动它。asyncio函数”。Create_task在asyncio事件循环上调度“processes_batched_loop”,以便在其他正在运行的任务等待完成时调用。您还没有定义函数“process_batches_loop”,但是接下来您将这样做。等待future,允许在asyncio事件循环上运行的其他任务运行,一旦设置了结果,就返回它。

处理批处理

“process_batches_loop”函数等待很短的时间,以允许其他函数向”ADDRESSES_BATCH”列表中添加请求。然后,它将所有排队的请求作为单个调用提交给REST API。一旦从REST API返回结果,它将解包结果并在期货上设置结果,允许每个等待的“get_location”函数完成。

async def process_batches_loop():
    global ADDRESSES_BATCH, BATCH_LOOP_RUNNING
    # Loop while BATCH_LOOP_RUNNING is True
    while BATCH_LOOP_RUNNING:
        # Wait for more to be added to the batch
        await asyncio.sleep(0.1)
        # If nothing has been added to the batch then continue
        # to the start of the loop as there's nothing to do.
        if not ADDRESSES_BATCH:
            continue
        # Get the current items from the batch and reset the batch
        batch = ADDRESSES_BATCH
        ADDRESSES_BATCH = []
        # Get the locations of the current batch
        addresses = [address for (address, future) in batch]
        locations = await get_locations(addresses)
        # Set the results on the futures from this batch.
        # This allows each awaiting 'get_location' function to continue.
        for address, future in batch:
            coords = locations.get(address)
            future.set_result(coords)

你目前已经实现了最初的目标。您有一个函数“get_location”,对于调用者来说它看起来像原始函数。它接受单个地址并返回单个位置。在幕后,它将这些单独的请求批处理在一起,并将它们提交给批处理API。与只处理单个请求的api相比,批处理api可以提供更好的性能,目前您的函数可以利用这一点,而无需对函数的调用方式进行任何更改。

应该调整等待将请求添加到批处理中的时间,以匹配函数的使用方式。如果函数可能在几乎同一时间被调用多次,例如,在Excel中同时计算多个单元格,那么可以使用短延迟。在其他情况下,例如,如果调用来自一些可能需要几秒钟的用户输入,则需要更长的延迟。记录每件物品被添加到批处理中的时间,以及每批处理的时间,将协助我们了解最佳等待时间。

改善空间

这里提供的代码有很大的改善空间。我希望这能给你一些想法,让你可以在自己的项目中使用它!代码是用一种相对简单的方式编写的,以尝试清楚地说明其背后的意图,但是在在实际应用程序中使用它之前,有一些事情需要思考。

  1. 错误检查。这可能是需要添加的最重大的内容。如果循环处理批失败会发生什么?您的代码应该处理可能发生的任何错误,或者至少记录它们,以便跟踪发生了什么。
  1. 不必要的循环。按照所写的方式处理批的循环继续循环,即使没有任何事情要做。您可以将其修改为等待一个“asyncio。事件”对象,直到您已将至少一个项排队。或者,您也可以在没有更多的项目要处理时退出循环,并在需要时重新启动它。
  1. 当程序结束时停止循环。只要BATCH_LOOP_RUNNING为True,循环就会继续循环。当程序结束时,应该思考如何优雅地结束循环。这可以简单地将BATCH_LOOP_RUNNING设置为False,然后等待任务完成。asyncio函数”。create_task返回一个Task对象,你可以将其存储为一个全局变量。

结束

在本文中,您了解了什么是批处理API,以及为什么使用批处理API会有好处。您了解了Python中的并发性,比较了多线程和异步编程。最后,您演示了如何使用批处理模式将处理单个请求的函数转换为使用批处理API的函数。

REST API是批处理API的一个例子。您可以将一样的技术应用于数据库查询,或任何其他类型的函数,在这些函数中,批量排队数据的效率更高。

当您希望为函数的最终用户保持简单时,在幕后批处理请求并对面向用户的函数或API隐藏细节是很有用的。它也是一种将批处理API改善到现有代码库的方法,在现有代码库中重构很困难。

本文所基于的用例的动机是从Excel调用Python函数,而不让Excel用户了解管理批处理调用的细节。用户调用一个简单的函数来执行单个请求。如果他们构建一个跨不同单元发出多个请求的表,那么该解决方案会在幕后自动将所有内容批量处理在一起。Excel外接程序PyXLL支持将Python集成到Excel中,从而可以将Python函数作为Excel工作表函数调用。

有关PyXLL插件的更多信息,请访问https://www.pyxll.com。

© 版权声明

相关文章

1 条评论

  • 头像
    梁华荣 读者

    收藏了,感谢分享

    无记录
    回复