Solve Common Asynchronous Scenarios With Python’s “asyncio”

Fire-and-Forget, Pub/Sub, and Data Pipelines

Luk Verhelst
Better Programming

--

Photo by Andrea Leopardi on Unsplash

Asyncio is a powerful tool for writing asynchronous code in Python, but just like many developers around me, I struggled to fully grasp its concepts and capabilities.

When searching the web on asyncio, documentation and tutorials are either too technical and theoretical, making it hard to understand how to apply it in practice. Or they’re too shallow, only scratching the surface of its potential.

That’s why we’re writing this blog post — to provide a clear and comprehensive overview of asyncio, its core concepts, and practical examples of how to use it to improve the performance and responsiveness of your Python applications.

Whether you’re new to asyncio or looking to deepen your understanding, this post will help you get up to speed and start using it to write efficient and scalable Python code.

A Practical Introduction to Asynchronous Programming in Python

Asyncio is a Python library that provides tools for writing asynchronous code. This means that you can write programs that perform multiple tasks at the same time without blocking the execution of other tasks.

Here are some real-world examples of how asyncio can greatly improve the performance and responsiveness of your application:

  1. Web scraping: When scraping a website for data, you may need to make multiple HTTP requests to different URLs to retrieve the data you need. By using asyncio, you can perform these requests in parallel, making the scraping process much faster and more efficient.
  2. File I/O: When reading or writing to multiple files, using asyncio can allow you to perform these operations concurrently, improving performance and reducing the time it takes to complete the task.
  3. Database access: When accessing a database, you may need to make multiple queries simultaneously. By using asyncio, you can perform these queries concurrently, improving performance and allowing you to handle more requests at once.

In more technical terms, asyncio provides a framework for writing non-blocking code that can handle I/O bound tasks, such as network communication or file I/O, by allowing tasks to yield control to other tasks when waiting for I/O operations to complete.

Here is an example to simulate the File I/O use case:

import asyncio

async def fetch_file():
print("Starting to fetch file")
await asyncio.sleep(1) # duration to fetch the file
print("Fetching file completed")

async def main():
print("Starting main")
await `asyncio.gather`(
fetch_file(),
fetch_file(),
fetch_file()
)
print("Main completed")

asyncio.run(main())

Can you guess what is printed from the above program? …..

That’s right… Because of the 1 second wait first three times “Starting to fetch file”, then three times “Fetching file completed” *.

Starting main
Starting to fetch file
Starting to fetch file
Starting to fetch file
Fetching file completed
Fetching file completed
Fetching file completed
Main completed

If it were handled synchronously it would have returned:

Starting main
Starting to fetch file
Fetching file completed
Starting to fetch file
Fetching file completed
Starting to fetch file
Fetching file completed
Main completed

Coroutines

In the above example, we define a coroutine function fetch_file that simply prints some text and then sleeps for one second simulating the wait to get the file from disk, using the await keyword. We also define a main function that creates three instances of fetch_file using the asyncio.gather function to run them concurrently.

We then use the asyncio.run function to run the main coroutine, which prints some text, runs the three instances of fetch_file concurrently, and then prints some more text when they are all complete.

So the keywords are async def and await in the above example. It is these keywords that distinguish an asynchronous program from a blocking program. In the next chapters, we'll delve deeper into both terms. First async...

Declare coroutines with async def

When we compare a coroutine to a “standard” subroutine, a coroutine is a type of subroutine that allows for non-blocking concurrent execution. Like a subroutine, a coroutine is a block of code that can be called from elsewhere in a program, typically a function.

However, unlike a subroutine, a coroutine can pause its execution at any point and allow another block of code to execute before resuming where it left off. For example, in a web server, a coroutine can be used to handle an incoming request without blocking the server from handling other requests.

Here’s an example comparison that calculates the sum of all numbers in a list, first as a “classic” subroutine, then as a coroutine:

# As a subroutine
def sum_list(numbers: List) -> int:
result = 0
for num in numbers:
result += num
return result

And here’s the same function implemented as a coroutine using the async keyword:

# As a coroutine
async def sum_list(numbers):
result = 0
for num in numbers:
result += num
# Pause execution to allow other coroutines to run
await asyncio.sleep(0)
return result

As you can see, the main difference between the two implementations is the addition of the async keyword and the use of the await keyword in the coroutine version. This allows the coroutine to pause its execution at the await statement and allow other coroutines to run before resuming execution.

When calling the subroutine version, it will block the execution of the calling code until the function returns. This means that if you have a large list of numbers to sum, the calling code will be blocked until the function has finished calculating the sum.

In contrast, when calling the coroutine version, it will not block the execution of the calling code. Instead, the coroutine will yield control to other coroutines when it reaches the await statement, allowing them to run before resuming execution. This means that the calling code can continue to execute while the coroutine is running in the background.

The await Keyword and Awaitables

The await keyword plays a central role in asynchronous code, and can only be used within asynchronous code blocks such as the code block of an async def statement. It declares an awaitable.

So what is an awaitable? When you use the await keyword you create an awaitable object. This pauses the execution of the coroutine until the result of the awaitable is ready, yielding the execution back to the caller of the subroutine.

To understand this concept, think of an awaitable as a restaurant order. Just like a restaurant order, an awaitable is something that you request and wait for. When you place an order at a restaurant, the waiter takes your order and tells you that your food will be ready in a certain amount of time. You then wait patiently for your food to be prepared and delivered to your table.

Similarly, when you await an awaitable in a coroutine, the coroutine pauses its execution and waits for the result of the awaitable to be ready. Once the result is ready, the coroutine resumes its execution and continues from where it left off.

Here’s an example of representing the restaurant order metaphor in Python code:

async def place_order():
print("Placing order...")
await asyncio.sleep(5)
print("Order ready!")
return "Steak"

async def serve_food():
print("Waiting for order...")
order = await place_order()
print(f"Order received: {order}")
print("Serving food...")

asyncio.run(serve_food())

In this example, the place_order coroutine simulates placing an order at a restaurant. It prints some text to indicate that an order is being placed and then uses the await keyword to pause its execution for 5 seconds using asyncio.sleep. This simulates the time it takes for the food to be prepared.

Once the food is ready, the place_order coroutine resumes its execution and returns the value "Steak", which represents the food order.

The serve_food coroutine simulates waiting for the food to be delivered and then serving it. It prints some text to indicate that it's waiting for an order, and then calls the place_order coroutine using the await keyword to get the order. Once the order is received, the serve_food coroutine prints some more text to indicate that it's serving the food.

Finally, we run the serve_food coroutine using asyncio.run(serve_food()) to see the output:

Waiting for order...
Placing order...
Order ready!
Order received: Steak
Serving food...

Tasks and the event loop

Consider this example: Grandmaster Judith Polgar is at a chess convention. She plays against 24 amateur chess players. To make a move it takes her 5 seconds. The opponents need 55 seconds to make their move. A game ends at roughly 60 moves or 30 moves from each side. (Source: https://github.com/fillwithjoy1/async_io_for_beginners)

Synchronous version

import time

def play_game(player_name):
for move in range(30):
time.sleep(5) # the champion takes 5 seconds to make a move
print(f"{player_name} made move {move+1}")
time.sleep(55) # the opponent takes 55 seconds to make a move

if __name__ == "__main__":
players = ['Judith'] + [f'Amateur{i+1}' for i in range(24)]
for player in players:
play_game(player)

Asynchronous version

import asyncio

async def play_game(player_name):
for move in range(30):
await asyncio.sleep(5) # the champion takes 5 seconds to make a move
print(f"{player_name} made move {move+1}")
await asyncio.sleep(55) # the opponent takes 55 seconds to make a move

async def play_all_games(players):
tasks = [`asyncio.create_task`(play_game(player)) for player in players]
await `asyncio.gather`(*tasks)

if __name__ == "__main__":
players = ['Judith'] + [f'Amateur{i+1}' for i in range(24)]
asyncio.run(play_all_games(players))

In the synchronous version, the program will run sequentially, playing one game after another. Therefore, it will take a total of 24 * 60 * 60 = 86,400 seconds (or 1 day) to complete all the games.

In the asynchronous version, the program will run concurrently, allowing multiple games to be played at the same time. Therefore, it will take approximately 60 * 5 = 300 seconds (or 5 minutes) to complete all the games, assuming that there are enough resources available to handle all the concurrent games.

Observe how we are using asyncio.create_task and asyncio.gather to allow multiple games to be played concurrently. Here's how these functions work:

  • asyncio.create_task is used to schedule a coroutine to run in the event loop as a task. In our case, we are creating a task for each game that needs to be played. By doing this, we are allowing the event loop to run multiple games concurrently.
  • asyncio.gather is used to run multiple coroutines concurrently and wait for them all to complete. In our case, we are using asyncio.gather to run all the game tasks concurrently and wait for all of them to finish before exiting the program.

By using these functions, we are able to run multiple games concurrently, which makes the program faster and more efficient. Without these functions, the program would run sequentially and take a lot longer to complete.

Tasks and the Event Loop

In asyncio, a Task is a unit of work that is scheduled to be executed by the event loop.

When we create a task in an asynchronous program, we are essentially telling the event loop to run that task at some point in the future. The event loop will then manage the execution of that task by scheduling it to run when it’s ready. The keyword await passes function control back to the event loop.

For example, let’s say we have a task that involves making a network request. When we create that task and tell the event loop to run it, the event loop will start executing the task and then pause it when it needs to wait for the network response. While the task is paused, the event loop can continue running other tasks. When the network response arrives, the event loop will resume the task and continue its execution.

By creating tasks in the event loop, we can manage multiple tasks efficiently and ensure that they are executed in an order that is optimal for performance. This is what makes asynchronous programming so powerful and efficient.

Common Asynchronous Scenarios

Photo by Cesar Carlevarino Aragon on Unsplash

What we have learned so far… Asynchronous programming with asyncio is a powerful technique that can help you write highly concurrent and performant Python code. Here are some common concepts we touched upon:

  1. Coroutines: Coroutines are the building blocks of asyncio. They are functions that can be paused and resumed later, allowing you to write non-blocking code that can switch between tasks quickly.
  2. Event loops: The event loop is the core of asyncio. It schedules and runs coroutines, and manages I/O operations. You should only create one event loop per thread, and it should be started with the run_until_complete() method.
  3. Tasks: Tasks are a higher-level abstraction than coroutines. They represent a coroutine that is running in the background, and allow you to cancel, pause, and resume the coroutine. You can use the asyncio.create_task() function to create a new task.

What we did not cover:

  • Timeout: The asyncio module provides a way to set a timeout for an operation using the asyncio.wait_for() function. This can be useful when you want to limit the amount of time a coroutine can spend waiting for a result.
  • Futures: Task is a subclass of Future. Futures are objects that represent a result that may not be available yet. You can use the asyncio.Future class to create a new future, and the await keyword to wait for the result.
  • Queue: This will be covered in the upcoming paragraphs… A Queue is a data structure that allows multiple coroutines to communicate with each other. It can be used to pass messages between coroutines or to coordinate the execution of multiple coroutines.

Pattern 1 Fire-and-Forget

The “Fire-and-Forget” design pattern allows you to execute a task asynchronously without waiting for its result. This means that you can start the task and immediately continue with other work, without blocking or waiting for the task to complete.

Fire-and-Forget is useful when you have a task that needs to be executed in the background, but the result of that task isn’t needed immediately. For example, sending an email or logging a message are both tasks that don’t require an immediate response, and can be executed asynchronously using the Fire-and-Forget pattern.

This is nothing new, we’ve done this in the chapter earlier. To implement the Fire-and-Forget pattern in Python’s asyncio library, we can create a coroutine function that performs the task, and then call it using asyncio.create_task(). This function will start the coroutine in the background and immediately return a Task object that represents the running task.

Here’s an example of using the Fire-and-Forget pattern in Python’s asyncio library to send an email:

import asyncio
import smtplib
from email.message import EmailMessage

async def send_email(subject, body, to):
# Create a new email message
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = 'me@example.com'
msg['To'] = to
msg.set_content(body)
# Connect to the SMTP server and send the message
with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
await smtp.starttls()
await smtp.login('my_email@gmail.com', 'my_password')
await smtp.send_message(msg)

# Fire-and-forget sending the email
asyncio.create_task(send_email('Test Email', 'This is a test email', 'you@example.com'))

# Continue with other work without waiting for the email to send
print('Email sent in the background')

In this example, we define an asynchronous function send_email() that sends an email using the smtplib library.

We then call asyncio.create_task() to start the send_email() coroutine in the background and immediately return a Task object that represents the running task.

Finally, we continue with other work without waiting for the email to send.

Pattern 2 Pub/Sub

To design a pub/sub pattern with asyncio and the Queue class, you can follow these steps:

  1. Create a shared asyncio.Queue object that will act as a buffer between publishers and subscribers. The Queue class is thread-safe, which means that it can be safely accessed by multiple coroutines in parallel without causing any race conditions or other synchronization issues.
  2. Define a coroutine function that represents the publisher. This coroutine function should accept a single argument, which represents the message to be published. The function should then put the message into the shared Queue object using the put method.
  3. Define a coroutine function that represents the subscriber. This coroutine function should continuously loop and wait for messages to be available in the shared Queue object using the get method. When a message is available, the function should process it according to the subscriber's needs.
  4. Define a main function that creates a shared Queue object and starts multiple instances of the publisher and subscriber coroutines as needed.

Here is a sample implementation:

import asyncio

async def publisher(queue):
while True:
message = input("Enter message: ")
await queue.put(message)

async def subscriber(queue, name):
while True:
message = await queue.get()
print(f"Subscriber {name} received message: {message}")

async def main():
queue = asyncio.Queue()
await asyncio.gather(
publisher(queue),
subscriber(queue, "A"),
subscriber(queue, "B"),
subscriber(queue, "C")
)

asyncio.run(main())

In this implementation, the publisher coroutine reads input from the user and puts it into the shared Queue object. The subscriber coroutine continuously waits for messages to be available in the shared Queue object and prints them to the console. The main function creates the shared Queue object and starts one publisher and three subscribers.

Let’s build a chat application as a practical example. In a chat room, multiple users can send messages to a channel or a group of users, and those messages need to be delivered to all the users who are subscribed to the channel or group.

By using the pub/sub pattern with asyncio and the Queue class, you can create a message queue that acts as a buffer between the publishers and subscribers. When a user sends a message, it can be put into the message queue by the publisher. The subscribers can then retrieve the messages from the message queue and display them to the appropriate users.

import asyncio

class ChatRoom:
def __init__(self):
self.message_queue = asyncio.Queue()
async def publish(self, message):
await self.message_queue.put(message)
async def subscribe(self, user_name):
while True:
message = await self.message_queue.get()
print(f"{user_name} received message: {message}")

async def main():
chat_room = ChatRoom()
await asyncio.gather(
chat_room.publish("Hello, world!"),
chat_room.subscribe("Alice"),
chat_room.subscribe("Bob")
)

asyncio.run(main())

Pattern 3: Data pipelines

A pipeline is a sequence of routines that are executed in a specific order, where the output of one routine becomes the input of the next. By using coroutines we break down a complex task into smaller, more manageable steps, and execute each step asynchronously.

We will be using the asyncio.Queue class to pass data between the coroutines. By doing so we ensure that the steps are executed in the specific order. Each coroutine reads data from the queue, processes it, and puts the output back on the queue for the next coroutine to process. By using a queue, we can ensure that each coroutine processes data in the order that it was received.

In the following program, we define three functions to perform the different steps of a data pipeline: remove_duplicates, get_coordinates, and add_import_timestamp.

We apply these functions to a batch of data, in the example represented as two batches assigned to Pandas dataframes. We then define the main function main that creates two tasks to process each batch of data asynchronously using the asyncio.gather method. Finally, we run the main function using the asyncio.run method.

When the program is run, it will output the processed dataframes for both batches, with the added Latitude, Longitude, and Import Timestamp columns.

import asyncio
import pandas as pd
from datetime import datetime
import requests

# Define the coroutine to remove duplicates
async def remove_duplicates(queue):
while True:
data = await queue.get()
data = data.drop_duplicates()
await queue.put(data)
queue.task_done()

# Define the coroutine to call the 3rd party API to get coordinates
async def get_coordinates(queue):
while True:
data = await queue.get()
for index, row in data.iterrows():
address = row['Address']
url = f'https://api.geocode.xyz/{address}?json=1'
response = requests.get(url)
json_response = response.json()
data.loc[index, 'Latitude'] = json_response['latt']
data.loc[index, 'Longitude'] = json_response['longt']
await queue.put(data)
queue.task_done()

# Define the function to add the import timestamp column
def add_import_timestamp(data):
timestamp = datetime.now()
data['Import Timestamp'] = timestamp
return data

# Define the coroutine to process each batch of data
async def process_data(data, queue):
# Put the initial data in the queue
await queue.put(data)
# Wait for all tasks to complete
await queue.join()
# Add the import timestamp column
data = add_import_timestamp(data)
return data

# Define the main function to process both batches of data asynchronously
async def main():
# Define the two batches of data
batch1 = pd.DataFrame({
'Address': ['123 Main St', '456 Oak Ave', '789 Maple St', '456 Oak Ave', '234 Pine Rd']
})
batch2 = pd.DataFrame({
'Address': ['345 Elm St', '678 Birch Rd', '234 Pine Rd', '910 Cedar Ln', '345 Elm St']
})
# Define the queue
queue = asyncio.Queue()
# Define the tasks to process each batch of data asynchronously
tasks = [
asyncio.create_task(remove_duplicates(queue)),
asyncio.create_task(get_coordinates(queue)),
asyncio.create_task(process_data(batch1, queue)),
asyncio.create_task(process_data(batch2, queue))
]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
# Print the results
for result in results:
print(result)

# Run the main function
asyncio.run(main())

We use a Queue to ensure that the steps are executed in the specific order of remove_duplicates, get_coordinates, and add_import_timestamp.

We define two coroutines, remove_duplicates and get_coordinates, that continuously loops through the queue, gets the data, processes it, and put it back in the queue. The process_data coroutine puts the initial data in the queue and waits for all tasks to complete using queue.join().

Once all the steps are complete, the add_import_timestamp function is called to add the import timestamp column.

There is still quite some ground to cover but I hope you learned something useful today. Please shout out if you think of other practical applications to share. Thanks for reading.

--

--

Data engineer with a drive to use his skills for the greater good. All in all the life of the party, as long as the party ends by 9 PM.