Python - Concurrency and Asynchronous Patterns

ZhangZhihui's Blog / 2024-08-23 / 原文

Concurrency allows your program to manage multiple operations simultaneously, leveraging the full power of modern processors. It’s akin to a chef preparing multiple dishes in parallel, each step orchestrated so that all dishes are ready at the same time. Asynchronous programming, on the other hand, lets your application move on to other tasks while waiting for operations to complete, such as sending a food order to the kitchen and serving other customers until the order is ready.

Technical requirements
• Faker, using pip install faker
• ReactiveX, using pip install reactivex

 

• The Thread Pool pattern

First, it’s important to understand what a thread is. In computing, a thread is the smallest unit of processing that can be scheduled by an operating system.

Threads are like tracks of execution that can run on a computer at the same time, which enables many activities to be done simultaneously and thus improve performance. They are particularly important in applications that need multitasking, such as serving multiple web requests or carrying out multiple computations.

Now, onto the Thread Pool pattern itself. Imagine you have many tasks to complete but starting each task (which means in this case, creating a thread) can be expensive in terms of resources and time. It’s like hiring a new employee every time you have a job to do and then letting them go when the job is done. This process can be inefficient and costly. By maintaining a collection, or a pool, of worker threads that can be created for once and then reused upon several jobs, the Thread Pool pattern helps reduce this inefficiency. When one thread finishes a task, it does not terminate but goes back to the pool, awaiting another task that it can be used again for.

What are worker threads?

A worker thread is a thread of execution of a particular task or set of tasks. Worker threads are used to offload processing tasks from the main thread, helping to keep applications responsive by performing time-consuming or resource-intensive tasks asynchronously.

In addition to faster application performance, there are two benefits:

• Reduced overhead: By reusing threads, the application avoids the overhead of creating and destroying threads for each task
• Better resource management: The thread pool limits the number of threads, preventing resource exhaustion that could occur if too many threads were created

Use cases for the Thread Pool pattern

• Batch processing: When you have many tasks that can be performed in parallel, a thread pool can distribute them among its worker threads
• Load balancing: Thread pools can be used to distribute workload evenly among worker threads, ensuring that no single thread takes on too much work
• Resource optimization: By reusing threads, the thread pool minimizes system resource usage, such as memory and CPU time

Implementing the Thread Pool pattern

First, let’s stop to break down how a thread pool, for a given application, works:

1.When the application starts, the thread pool creates a certain number of worker threads. This is the initialization. This number of threads can be fixed or dynamically adjusted based on the application’s needs.

2.Then, we have the task submission step. When there’s a task to be done, it’s submitted to the pool rather than directly creating a new thread. The task can be anything that needs to be executed, such as processing user input, handling network requests, or performing calculations.

3.The following step is the task execution. The pool assigns the task to one of the available worker threads. If all threads are busy, the task might wait in a queue until a thread becomes available.

4.Once a thread completes its task, it doesn’t die. Instead, it returns to the pool, ready to be assigned a new task.

For our example, let’s see some code where we create a thread pool with five worker threads to handle a set of tasks. We are going to use the ThreadPoolExecutor class from the concurrent.futures module.

from concurrent.futures import ThreadPoolExecutor
import time


def task(n):
    print(f"Executing task {n}")
    time.sleep(1)
    print(f"Task {n} completed")


with ThreadPoolExecutor(max_workers=5) as executor:
    for i in range(10):
        executor.submit(task, i)

 

Executing task 0
Executing task 1
Executing task 2
Executing task 3
Executing task 4
Task 0 completed
Task 4 completed
Task 3 completed
Task 1 completed
Executing task 6
Executing task 7
Executing task 8
Task 2 completed
Executing task 5
Executing task 9
Task 8 completed
Task 6 completed
Task 9 completed
Task 5 completed
Task 7 completed

We see that the tasks were completed in an order different from the order of submission. This shows that they were executed concurrently using the threads available in the thread pool.

• The Worker Model pattern

The idea behind the Worker Model pattern is to divide a large task or many tasks into smaller, manageable units of work, called workers, that can be processed in parallel. This approach to concurrency and parallel processing not only accelerates processing time but also enhances the application’s performance.

The workers could be threads within a single application (as we have just seen in the Thread Pool pattern), separate processes on the same machine, or even different machines in a distributed system.

The benefits of the Worker Model pattern are the following:

Scalability: Easily scales with the addition of more workers, which can be particularly beneficial in distributed systems where tasks can be processed on multiple machines
Efficiency: By distributing tasks across multiple workers, the system can make better use of available computing resources, processing tasks in parallel
Flexibility: The Worker Model pattern can accommodate a range of processing strategies, from simple thread-based workers to complex distributed systems spanning multiple servers

Real-world examples

Consider a delivery service where packages (tasks) are delivered by a team of couriers (workers). Each courier picks up a package from the distribution center (task queue) and delivers it. The number of couriers can vary depending on demand; more couriers can be added during busy periods and reduced when it’s quieter.

In big data processing, the Worker Model pattern is often employed where each worker is responsible for mapping or reducing a part of the data.

In systems such as RabbitMQ or Kafka, the Worker Model pattern is used to process messages from a queue concurrently.

We can also cite image processing services. Services that need to process multiple images simultaneously often use the Worker Model pattern to distribute the load among multiple workers.

Use cases for the Worker Model pattern

One use case for the Worker Model pattern is data transformation. When you have a large dataset that needs to be transformed, you can distribute the work among multiple workers.

Another one is task parallelism. In applications where different tasks are independent of each other, the Worker Model pattern can be very effective.

A third use case is distributed computing, where the Worker Model pattern can be extended to multiple machines, making it suitable for distributed computing environments.

Implementing the Worker Model pattern

Before discussing an implementation example, let’s understand how the Worker Model pattern works. Three components are involved in the Worker Model pattern: workers, a task queue, and, optionally, a dispatcher:

The workers: The primary actors in this model. Each worker can perform a piece of the task independently of the others. Depending on the implementation, a worker might process one task at a time or handle multiple tasks concurrently.
The task queue: A central component where tasks are stored awaiting processing. Workers typically pull tasks from this queue, ensuring that tasks are distributed efficiently among them. The queue acts as a buffer, decoupling task submission from task processing.
The dispatcher: In some implementations, a dispatcher component assigns tasks to workers based on availability, load, or priority. This can help optimize task distribution and resource utilization.

Let’s now see an example where we execute a function in parallel.

We start by importing what we need for the example, as follows:

from multiprocessing import Process, Queue
import time

Then, we create a worker() function that we are going to run tasks with. It takes as a parameter the task_queue object that contains the tasks to execute. The code is as follows:

def worker(task_queue):
    while not task_queue.empty():
        task = task_queue.get()
        print(f"Worker {task} is processing")
        time.sleep(1)
        print(f"Worker {task} completed")

In the main() function, we start by creating a queue of tasks, an instance of multiprocessing.Queue. Then, we create 10 tasks and add them to the queue:

def main():
    task_queue = Queue()

    for i in range(10):
        task_queue.put(i)

Five worker processes are then created, using the multiprocessing.Process class, and started. Each worker picks up a task from the queue, to execute it, and then picks up another until the queue is empty. Then, we start each worker process (using p.start()) in a loop, which means that the associated task will get executed concurrently. After that, we create another loop where we use the process’ .join() method so that the program waits for those processes to complete their work. That part of the code is as follows:

    processes = [
        Process(target=worker, args=(task_queue,))
        for _ in range(5)
    ]

    # Start the worker processes
    for p in processes:
        p.start()

    # Wait for all worker processes to finish
    for p in processes:
        p.join()
    print("All tasks completed.")

This pattern is particularly useful for scenarios where tasks are independent and can be processed in parallel.

• The Future and Promise pattern

 

• The Observer pattern in reactive programming

 

• Other concurrency and asynchronous patterns