In "Concurrent Programming in Python is not what you think it is", I wrote about the notorious GIL in Python and did a simple experiment on existing concurrency mechanism in Python. Today, I'll describe another common scenario in concurrent programming, the Producer/Consumer problem, with Python.
What is the producer/consumer scenario?
Imagine you are at a fast-food chain, in which there are an order line and a serving line. The staff in the serving line prepare the meals while their coworkers in the order line take orders from customers. In this scenario, the staff in the order line are known as producers, who produce order chits, whereas their coworkers in the serving line are consumers, who consume order chits and prepare the meals. You may see producers and consumers as two distinct groups of entities, connected with the list of orders (or a better term in software engineering, a queue)
For human-being, coordinating these groups of workers (consumers and producers) seems relatively straightforward and intuitive. However, for computer sense, it's unfortunately not the case.
So, what's the catch of the Producer/Consumer problem?
Let's put our fast-food chain scenario into code:
"""
Implementation 1: Infinite Loop in Consumers
"""
import queue
import threading
orders = queue.Queue()
def serving_line_or_consumer():
while True:
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed
def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")
# Let's put 4 staff into the order line
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)]
# Let's assign 6 staff into the serving line
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)]
# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]
# "join" the order, block until all orders are cleared
orders.join()
# "join" the threads, ending all threads
[t.join() for t in order_line]
[t.join() for t in serving_line]
Let's focus on the consumer logic in serving_line_or_consumer()
:
def serving_line_or_consumer():
while True: # PROBLEM: Wait for orders
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed
The consumer is implemented with an infinite while loop. With an infinite loop, when there aren't any orders to be prepared, the consumer thread will be busy waiting. If you're unsure why busy waiting is an anti-pattern, it is because we are simply spinning in an infinite loop, waiting for an event to occur. While busy waiting seems simple and straightforward to implement, CPU cycles are wasted. A comprehensive explanation of busy waiting can be found here.
What if we terminate consumers once they finished preparing all orders?
Let's put it into code again:
"""
Implementation 2: Use a Sentinel Value to Stop Busy Waiting
"""
import queue
import threading
orders = queue.Queue()
def serving_line_or_consumer():
has_order = True
while has_order:
new_order = orders.get()
if new_order is None: # Check for Sentinel Value
has_order = False
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed
def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")
# Let's put 4 staff into the order line
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)]
# Let's assign 6 staff into the serving line
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)]
# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]
# ADDED THIS: Inform serving line no more orders
[orders.put(None) for _ in range(len(serving_line))]
# "join" the order, block until all orders are cleared
orders.join()
# "join" the threads, ending all threads
[t.join() for t in order_line]
[t.join() for t in serving_line]
In the second implementation, we use a sentinel value None
to inform the consumers when there aren't any new orders. We added a line to put None
into orders
:
[orders.put(None) for _ in range(len(serving_line))]
We then updated the consumer logic to regard the sentinel value:
def serving_line_or_consumer():
has_order = True
while has_order:
new_order = orders.get()
if new_order is None: # Check for Sentinel Value
has_order = False
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done() # Invoke this to indicate the "order" in the Queue is processed
Okay, with the new logic, whenever the consumer encounters the sentinel value None
, it will break from the infinite loop. However, the caveat is, we have to explicitly recreate a consumer whenever the system gets new orders after consumers are terminated. This isn't really what desired in our scenario. You wouldn't want to dismiss the staff in the serving line when there aren't any new orders.
A Much Better Solution: Use a Semaphore
In a producer/consumer problem, it is best to use a concurrency control object: Lock, Mutex, Semaphore, etc. If you find these terms foreign, let just focus on semaphore for now.
What is a Semaphore?
A semaphore is a variable used to control access to a critical section in a multi-processing environment. In a multi-processing environment (a program that involves more than 1 thread/process), a critical section is a group of resources (variables, data, etc) shared among threads/processes.
In Python, Semaphore is a class with an internal counter, which value is always an integer, and at least 2 methods: acquire()
, and release()
. I will explain what acquire
and release
perform in the latter part.
In our fast-food chain scenario, the use of semaphore is not for controlling access to shared resources, but to allow consumers to sleep while waiting for an event. Let's put semaphore into our scenario:
"""
Implementation 3: Use Semaphores
"""
import queue
import threading
orders = queue.Queue()
has_order = threading.Semaphore(value=0) # ADDED THIS
def serving_line_or_consumer():
while has_order.acquire(): # ADDED THIS: Acquire a Semaphore, or sleep until the counter of semaphore is larger than zero
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done()
def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")
has_order.release() # ADDED THIS: Release the Semaphore, increment the internal counter by 1
# Let's put 4 staff into the order line
order_line = [threading.Thread(target=order_line_or_producer) for _ in range(4)]
# Let's assign 6 staff into the serving line
serving_line = [threading.Thread(target=serving_line_or_consumer) for _ in range(6)]
# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]
# "join" the order, block until all orders are cleared
orders.join()
# "join" the threads, ending all threads
[t.join() for t in order_line]
[t.join() for t in serving_line]
In the latest implementation, the consumers wait to acquire a semaphore. The simplified logic of acquire()
would be:
If the internal counter is zero:
allow the thread to sleep while waiting for it to be larger than zero
once the internal counter is more than zero, decrement it by 1
return True
if the internal counter is larger than zero:
decrement the internal counter by 1
return True
With acquire()
, our consumers will be idle when there isn't any order (the internal counter of the semaphore is zero).
In our case, we updated our order_line_or_producer()
to make use of semaphore as well. When a producer receives an order, it releases the semaphore. In short, release()
doesn't do anything but increment the internal counter of the semaphore.
With the semaphore being incremented, acquire()
will then act accordingly. This is a much better solution among all. With semaphore, consumers will be put to idle/sleep while they wait for new orders, and producers inform consumers by incrementing the semaphore.
Conclusion
In this article, we implemented a producer/consumer problem with 3 different implementations:
- using an infinite loop to wait for events
- using a sentinel value to terminate consumers when they are not needed temporarily
- (the better solution) using a concurrency control object: a semaphore to allow consumers to sleep while waiting
Among these, a concurrency control object is much preferred. In this scenario, we used a semaphore. In general, there are other control objects than just semaphore. I highly recommend you explore on your own.
Anecdote: In Private File Saver, I implemented a desktop client to sync local files to AWS S3 bucket in Python. The performance was satisfactory until I had to sync more than 12,000 files. It warranted a need to rewrite the logic to be more efficient. One of the obvious improvements would be implementing producer/consumer solution. (The details of this problem can be found here).