Notch Up Producer-Consumer Paradigm with Python

The concept of Producer Consumer problem is used to some extent in implementing a message queue.

Recently, we dealt with asynchronous programming using Processes and Threads

Here at LinuxAPT, as part of our Server Management Services, we regularly help our Customers to perform related Python queries.

In this context, we shall look into when to use what briefly and implementation of the Producer-Consumer paradigm.


What are Processes ?

Processes are suitable for applications where CPU utilization is more. Consider crunching huge numbers or processing large data. These tasks are best performed across processes or CPUs.


What are Threads ?

Threads are suitable for scenarios where we have a time consuming I/O operations eg: file I/O, http calls or socket connections. The thread making such I/O call waits for the response while other threads may keep running.


Producer-Consumer Pattern

The producer-Consumer design pattern is widely used today in an array of applications. Most of the integration interfaces use this approach. Message Queues (MQs) use this paradigm extensively and extend it into publisher/subscriber pattern, P2P pattern or Push/Pull pattern.

The basic idea behind using this pattern is to distribute a collection of tasks to multiple threads running asynchronously. We can also achieve chaining of producers and consumers and use intermediary nodes that shall act as both producers and consumers.

Take an example of android notifications, the servers push (produce) notifications to FCM which consumes them and further pushes to individual devices.

Another real-life example could be that of Postal Services, people produce mails to Postal Service which consumes them and in turn produces them to recipients who act as consumers.


Queue

We need a queue when dealing with Producers and Consumers. Producers need a place to push the messages and consumers to read them from. Queues are widely used data structures here considering, we would usually want to process messages in FIFO fashion. Although other data structures like List and Arrays should work just fine. It is very important for the producers and consumers to share the same object of Queue.


Producers

Producers are the objects running asynchronously and adding messages inside a queue. The messages pushed in a queue then become available to be consumed by a consumer. The producers do not bother if the messages are getting consumed, their job is limited and it is to push messages. There could be multiple producers producing messages to a queue.


Consumers

Consumers keep polling the queue, waiting for messages to arrive in the queue. The consumers start processing the messages once they become available. There could be multiple consumers polling through a queue.


Poison (Optional)

Helps to identify when to return a running thread. When a producer/consumer receives a poison, it is a signal for them to stop. This ensures there are no hanging threads at the end of processing.


Do It Yourself:

There are multiple examples of single producers and consumers. Let us take it a notch up.


Assumptions:

There are some tasks (viz. initial_task, final_task) which can be done relatively faster than others and there are tasks(viz. intermediary_task) which take a lot of time to process.


Design:

There would be just a single thread for running initial_task and another for final_task however, there would be three threads dedicated for the intermediary_task. As soon as the initial_task is done, it is required to be picked up by the available thread which then performs intermediary_task. Once completed the result is then produced into the final queue which is then consumed to perform the final_task:

import time 
from queue import Queue 
from threading import Thread 
Poison = -1 

class TimeConsumingNode(Thread): 
  def __init__(self, consumer_queue: Queue = None, 
               producer_queue: Queue = None, task_time: int = 0): 
      super(TimeConsumingNode, self).__init__() 
      self.consumer_queue = consumer_queue 
      self.producer_queue = producer_queue 
      self.task_time = task_time 
  def run(self) -> None: 
     global Poison 
     while True: 
        consumed_message = self.consumer_queue.get() 
        print(self, f"consumed {consumed_message}") 
        if consumed_message == Poison: 
           print(self, "got poison") 
           # Pass the poison to intermediary producers. 
           self.consumer_queue.put(consumed_message) 
           # Pass the poison to the final consumer. 
           self.producer_queue.put(consumed_message) 
           break 
        # Mock processing the consumed message. 
        time.sleep(self.task_time) 
        # Mock some heavy tasks. 
        time.sleep(self.task_time) 
        print(self, 
              f"took at least {self.task_time * 2} for consuming " 
              f"{consumed_message} and for producing {consumed_message}") 
        self.producer_queue.put(consumed_message) 

# Comparatively faster task node. 
class ProducerNode(Thread): 
  def __init__(self, consumed_queue: Queue = None, 
               produced_queue: Queue = None, 
               task_time: int = 0): 
      super(ProducerNode, self).__init__() 
      self.consumed_queue = consumed_queue 
      self.produced_queue = produced_queue 
      self.task_time = task_time 
  def run(self) -> None: 
      global Poison 
      while not self.consumed_queue.empty(): 
         some_number = self.consumed_queue.get() 
         time.sleep(self.task_time) 
         print(self, f"Producing {some_number}") 
         self.produced_queue.put(some_number) 
      # Add poison. 
      print(self, f"Adding poison") 
      self.produced_queue.put(Poison) 

# Last task again comparatively faster one. 
class ConsumerNode(Thread): 
  def __init__(self, consumed_queue: Queue = None, task_time: int = 0): 
     super(ConsumerNode, self).__init__() 
     self.consumed_queue = consumed_queue 
     self.task_time = task_time 
     self.poisons = 0 
  def run(self) -> None: 
     global Poison 
     while True: 
        consumed_message = self.consumed_queue.get() 
        if consumed_message == Poison: 
           self.poisons = self.poisons + 1 
           print(self, f"got {self.poisons} poisons") 
           if self.poisons == 3: 
              break 
        # Mock some time taking task 
        time.sleep(self.task_time / 2) 
        print(self, 
f"took {self.task_time / 2} to consume {consumed_message}") 

if __name__ == "__main__": 
   my_queue = Queue() 
   my_queue.put(2) 
   my_queue.put(3) 
   my_queue.put(4) 
   my_queue.put(5) 
   my_queue.put(6) 
   intermediary_queue = Queue() 
   final_queue = Queue() 
   first_prod_node = ProducerNode(consumed_queue=my_queue, 
produced_queue=intermediary_queue, 
task_time=3) 
   tc_node_1 = TimeConsumingNode(consumer_queue=intermediary_queue, 
producer_queue=final_queue, task_time=4) 
   tc_node_2 = TimeConsumingNode(consumer_queue=intermediary_queue, 
producer_queue=final_queue, task_time=5) 
   tc_node_3 = TimeConsumingNode(consumer_queue=intermediary_queue, 
producer_queue=final_queue, task_time=2) 
   final_node = ConsumerNode(consumed_queue=final_queue, task_time=1) 
   first_prod_node.start() 
   tc_node_1.start() 
   tc_node_2.start() 
   tc_node_3.start() 
   final_node.start() 
   first_prod_node.join() 
   tc_node_1.join() 
   tc_node_2.join() 
   tc_node_3.join() 
   final_node.join()


Risks of Notch Up Producer-Consumer Paradigm with Python:

If not properly implemented they may lead to,

1. Hanging threads

2. Deadlocks

3. IndexOutOfBound Exceptions while reading queues.


[Need urgent assistance in fixing Python issues? We are available. ]

This article covers how producer-Consumer pattern is a very useful design which can be leveraged to a varied extent in order to enable asynchronous processing of multiple time-consuming tasks. The concept has been widely incorporated in modern-day messaging queues viz. Kafka, RabbitMQ, Cloud MQs provided by AWS, GCP, and so on.

Python provide Queue class which implements queue data structure. We can put an item inside the queue and we can get an item from the queue. By default this works in FIFO (First In First Out) manner.


The function producer will put an item inside queue and function consumer will get an item from the queue. We will use following method of queue class by instantiating queue object q = Queue().


Queue Method Python:

q.put(): To put an item inside queue.

q.get(): To get an item which is present inside queue.

q.join(): This method stops python program from exiting till it gets signal from the below method task_done. Hence this method should always be used in conjunction with method task_done

q.task_done(): This method should be called when item got outside from the queue using q.get() has been completely processed by consumer. When all items make call to their respective task_done it sends signals to q.join() that all items have been processed and program can exit.


Threads class Python:

Python allows writing multi-threaded program using Thread class. We will instantiate object of thread class and make use of following methods to process (consume) multiple items concurrently:

t = Thread(target=consumer): Instantiate thread object which would make call to function consumer.

t.start(): Starts execution of thread by making call to function consumer.

Related Posts