It’s possible that you have multiple OpenAI requests that you want to run concurrently. For example, you want to process several documents at the same time, so that you can finish processing them faster. You can do so by using Python’s asyncio
library.
Prerequisites
To run the code in this article, you need to have Python 3.11 installed. This will enable you to use the TaskGroup
functionality. You also need to have the openai
Python package installed, version 1.0 or higher.
In my case, I have two Azure OpenAI subscriptions, one in the US East region and another in the US West region. Therefore, I have two API keys, one for each region. To make things more interesting, I’ll assume that each region has different models, gpt-4
in US East and gpt-4-turbo
in US West.
This code also works if you have only one subscription, and it’s also faster when run concurrently. One potential issue with that approach is that the OpenAI API has rate limits per account, so if you run multiple requests concurrently, you may hit the rate limit faster.
To test concurrency, I will ask GPT to perform a task that takes about 30 seconds to complete, such as generating a five-paragraph story about a theme. I have four themes, “dog”, “cat”, “chicken”, and “tiger”. I will ask GPT to generate stories about “dog” and “cat” concurrently, and then “chicken” and “tiger” will be generated concurrently. I will time how long it took to generate the stories concurrently, and I will also measure the time to generate the stories sequentially, so that I can compare the results.
The code
First, let’s import the required libraries.
import os
from openai import AsyncAzureOpenAI
from dotenv import load_dotenv
import asyncio
from timeit import default_timer as timer
Note that I’m importing the AsyncAzureOpenAI
class from the openai
package. This class is used to call the OpenAI API asynchronously. I’m also importing the load_dotenv
function from the dotenv
package, which is used to load environment variables from a .env
file, where my subscription keys and endpoints are stored.
The concurrency is managed by the asyncio
library. I’m also importing the timer
function from the timeit
package to measure the time it takes to generate the stories.
Now, let’s define the function that will call the OpenAI API to generate the stories.
async def call_openai(client, id, model, theme, answers):
print (f"Generating a story about {theme} using {id}.")
= await client.chat.completions.create(
response =model,
model=[
messages"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": f"Generate a five-paragraph story about a {theme}."},
{
]
)0].message.content)
answers.append(response.choices[print (f"Generated a story about {theme} using {id}.")
The function is a straightforward call to the chat.completions.create
method of the AsyncAzureOpenAI
class. Note that I’m requesting a five-paragraph story about a theme. I chose that prompt because it takes about 30 seconds to complete, so even if GPT randomly generates a shorter story, we should still be able to see the differences. The response is appended to the answers
list (not shown).
Let’s now see the main body of the function.
async def main():
load_dotenv()= AsyncAzureOpenAI(
client1 =os.getenv("USEAST_KEY"),
api_key="2023-12-01-preview",
api_version= os.getenv("USEAST_ENDPOINT")
azure_endpoint
)
= AsyncAzureOpenAI(
client2 =os.getenv("USWEST_KEY"),
api_key="2023-12-01-preview",
api_version= os.getenv("USWEST_ENDPOINT")
azure_endpoint
)
= ["dog", "cat", "chicken", "tiger"]
themes = 0
i = "gpt-4"
model1 = "gpt-4-turbo"
model2 = "client-1"
id1 = "client-2" id2
In the code above, I creaate two separate OpenAI clients, one for each region. I also define the themes for which I want to generate stories, and the models that I want to use. I also define the IDs for each client, so that I can identify which client generated each story.
Calling the functions sequentially
Now let’s call the functions sequentially, so that I can establish the baseline.
The code below calls the call_openai
function (that we defined above) sequentially, and measures the time it takes to generate the stories. The code is still inside the main
function, which is called at the end of the script.
= []
answers_s = timer()
start # Run sequentially
for i in range(0, len(themes), 2):
= timer()
start_step await call_openai(client1, id1, model1, themes[i], answers_s)
await call_openai(client2, id2, model2, themes[i+1], answers_s)
= timer()
end_step print(f"Finished generating stories about a {themes[i]} and a {themes[i+1]} sequentially in {end_step - start_step:.2f} seconds.")
= timer()
end print(f"Generated stories sequentially in {end - start:.2f} seconds.")
Sequential results
The output of the code above is as follows.
Generating a story about dog using client-1.
Generated a story about dog using client-1.
Generating a story about cat using client-2.
Generated a story about cat using client-2.
Finished generating stories about a dog and a cat sequentially in 85.34 seconds.
Generating a story about chicken using client-1.
Generated a story about chicken using client-1.
Generating a story about tiger using client-2.
Generated a story about tiger using client-2.
Finished generating stories about a chicken and a tiger sequentially in 57.66 seconds.
Generated stories sequentially in 143.00 seconds.
It took 143 seconds to generate the stories sequentially. If you run this code, you will see that it starts the “dog” story, then it finishes, then it starts the “cat” story, then it finishes, and so on sequentially, as we would expect.
Now let’s see what happens when we run these functions concurrently.
Calling the functions concurrently
Here, I will use the asyncio.TaskGroup
class to run the call_openai
function concurrently. This is still inside the main
function.
# Run concurrently
= []
answers_c = timer()
start for i in range(0, len(themes), 2):
= timer()
start_step async with asyncio.TaskGroup() as tg:
print(f"Started generating stories about {themes[i]} and {themes[i+1]} concurrently.")
tg.create_task(call_openai(client1, id1, model1, themes[i], answers_c))+1], answers_c))
tg.create_task(call_openai(client2, id2, model2, themes[i= timer()
end_step print(f"Finished generating stories about a {themes[i]} and a {themes[i+1]} concurrently in {end_step - start_step:.2f} seconds.")
= timer()
end print(f"Generated stories concurrently in {end - start:.2f} seconds.\n\n")
The TaskGroup
class is a context manager that allows you to run multiple tasks concurrently. You can start asynchronous tasks with the create_task
method, and the context manager will wait for all tasks to complete before exiting. In my case, since I have two clients, I will start two tasks in each iteration of the loop.
Let’s see the results.
Concurrent results
The output of the code above is as follows.
Generating a story about dog using client-1.
Generating a story about cat using client-2.
Generated a story about cat using client-2.
Generated a story about dog using client-1.
Finished generating stories about a dog and a cat concurrently in 34.89 seconds.
Started generating stories about chicken and tiger concurrently.
Generating a story about chicken using client-1.
Generating a story about tiger using client-2.
Generated a story about chicken using client-1.
Generated a story about tiger using client-2.
Finished generating stories about a chicken and a tiger concurrently in 30.25 seconds.
Generated stories concurrently in 65.14 seconds.
You can now see that it took 65 seconds to generate the stories concurrently. This is about half the time it took to generate the stories sequentially. You can also see that the “dog” and “cat” stories started at the same time, and the “chicken” and “tiger” stories also started at the same time, as we would expect.
The code below is required just to start the whole process.
if __name__ == "__main__":
asyncio.run(main())
What if you have just one subscription?
The code above also works if you only have one subscription. For example, if you were to replace the client1
with client2
everywhere in the code above, you would still be able to run the tasks concurrently. The only difference is that you would be using the same subscription for both tasks, so you would be consuming your rate limits faster. However, you would still be able to run the tasks concurrently, and you would still see a significant improvement in the time it takes to generate the results.
Conclusion
Running AI models concurrently can significantly reduce the time it takes to generate the results. This is especially important when you have to generate a large number of results, or when the results take a long time to generate. In this case, we saw that running the models concurrently with double the resources, as expected, took about half the time. This is a significant improvement, and it can make a big difference in practice.