Thursday, January 5, 2017

Python - a Queue delivering to all consumers

The standard Python Queue is great. But we needed a Queue that would replicate all items to all consumers.

So, the case is that you have a queue where you push data, but you have multiple threads needing all items that have been queued. Here is a quick (and dirty) implementation I came up with.


import Queue
import threading
import time


class ReplicatingQueue(Queue.Queue):

    def __init__(self, *subqueues):
        self._lock = threading.RLock()
        self._subqueues = ()
        self.add_subqueue(*subqueues)
        self._thread = None
        Queue.Queue.__init__(self)
        self.start()

    def add_subqueue(self, *subqueues):
        "Add Queues to receive objects from the ReplicatingQueue."
        for subqueue in subqueues:
            if not isinstance(subqueue, Queue.Queue):
                raise TypeError('Subqueue in not a Queue')
        with self._lock:
            self._subqueues = tuple(set(self._subqueues).union(set(subqueues)))

    def add_new_subqueue(self):
        "Shorthand for creating a new Queue and adding it."
        queue = Queue.Queue()
        self.add_subqueue(queue)
        return queue

    def remove_subqueue(self, *subqueues):
        "Remove Queues from receiving objects from the ReplicatingQueue."
        with self._lock:
            self._subqueues = tuple(set(self._subqueues).difference(set(subqueues)))

    def stop(self):
        "Kill the replicating thread, so references are gone and GC can work, delivering queued items first."
        with self._lock:
            if not self._thread:
                return
            thread_ref = self._thread
            self._thread = None
            self.put(ReplicatingQueue)

    def start(self):
        "Start the replicating thread."
        with self._lock:
            if self._thread:
                return
            self._thread = threading.Thread(target=self._copier)
            self._thread.daemon = True
            self._thread.start()

    def _copier(self):
        "Copying routine."
        while True:
            try:
                next_object = self.get()
            except:
                break
            with self._lock:
                if next_object is ReplicatingQueue:
                    break
                for subqueue in self._subqueues:
                    subqueue.put_nowait(next_object)
            next_object = None


This class is a queue, but it also starts a Thread that copies all items to consumer queues you specify. You can start and stop the copying process. Keep in mind that, for the garbage collection to work, at some point, you will need to call stop() in order to stop the copying thread, thus deleting the references to objects allowing them to be GCed.

Here is also some naive test code to see it in action:

def __main__():
    class TestConsumer(threading.Thread):
        def __init__(self, name, queue):
            self.queue = queue
            threading.Thread.__init__(self, name=str(name))

        def run(self):
            while True:
                print self.name + ': ' + str(self.queue.get())

    replica = ReplicatingQueue()
    for i in range(11):
        queue = replica.add_new_subqueue()
        t = TestConsumer(i, queue)
        t.daemon = True
        t.start()
    queue = replica.add_new_subqueue()
    replica.remove_subqueue(queue)
    for i in range(10, 11):
        time.sleep(1)
        replica.put(i)
    time.sleep(2)
    print 'Stopping...'
    replica.stop()
    time.sleep(2)
    for i in range(90, 99):
        replica.put(i)
    print 'Starting...'
    replica.start()
    print 'Stopping...'
    replica.stop()
    print 'Stopped'
    time.sleep(2)