The standard Python Queue is great. But we needed a Queue that would replicate all items to all consumers.
So, the case is that you have a queue where you push data, but you have multiple threads needing all items that have been queued. Here is a quick (and dirty) implementation I came up with.
import Queue
import threading
import time
class ReplicatingQueue(Queue.Queue):
def __init__(self, *subqueues):
self._lock = threading.RLock()
self._subqueues = ()
self.add_subqueue(*subqueues)
self._thread = None
Queue.Queue.__init__(self)
self.start()
def add_subqueue(self, *subqueues):
"Add Queues to receive objects from the ReplicatingQueue."
for subqueue in subqueues:
if not isinstance(subqueue, Queue.Queue):
raise TypeError('Subqueue in not a Queue')
with self._lock:
self._subqueues = tuple(set(self._subqueues).union(set(subqueues)))
def add_new_subqueue(self):
"Shorthand for creating a new Queue and adding it."
queue = Queue.Queue()
self.add_subqueue(queue)
return queue
def remove_subqueue(self, *subqueues):
"Remove Queues from receiving objects from the ReplicatingQueue."
with self._lock:
self._subqueues = tuple(set(self._subqueues).difference(set(subqueues)))
def stop(self):
"Kill the replicating thread, so references are gone and GC can work, delivering queued items first."
with self._lock:
if not self._thread:
return
thread_ref = self._thread
self._thread = None
self.put(ReplicatingQueue)
def start(self):
"Start the replicating thread."
with self._lock:
if self._thread:
return
self._thread = threading.Thread(target=self._copier)
self._thread.daemon = True
self._thread.start()
def _copier(self):
"Copying routine."
while True:
try:
next_object = self.get()
except:
break
with self._lock:
if next_object is ReplicatingQueue:
break
for subqueue in self._subqueues:
subqueue.put_nowait(next_object)
next_object = None
So, the case is that you have a queue where you push data, but you have multiple threads needing all items that have been queued. Here is a quick (and dirty) implementation I came up with.
import Queue
import threading
import time
class ReplicatingQueue(Queue.Queue):
def __init__(self, *subqueues):
self._lock = threading.RLock()
self._subqueues = ()
self.add_subqueue(*subqueues)
self._thread = None
Queue.Queue.__init__(self)
self.start()
def add_subqueue(self, *subqueues):
"Add Queues to receive objects from the ReplicatingQueue."
for subqueue in subqueues:
if not isinstance(subqueue, Queue.Queue):
raise TypeError('Subqueue in not a Queue')
with self._lock:
self._subqueues = tuple(set(self._subqueues).union(set(subqueues)))
def add_new_subqueue(self):
"Shorthand for creating a new Queue and adding it."
queue = Queue.Queue()
self.add_subqueue(queue)
return queue
def remove_subqueue(self, *subqueues):
"Remove Queues from receiving objects from the ReplicatingQueue."
with self._lock:
self._subqueues = tuple(set(self._subqueues).difference(set(subqueues)))
def stop(self):
"Kill the replicating thread, so references are gone and GC can work, delivering queued items first."
with self._lock:
if not self._thread:
return
thread_ref = self._thread
self._thread = None
self.put(ReplicatingQueue)
def start(self):
"Start the replicating thread."
with self._lock:
if self._thread:
return
self._thread = threading.Thread(target=self._copier)
self._thread.daemon = True
self._thread.start()
def _copier(self):
"Copying routine."
while True:
try:
next_object = self.get()
except:
break
with self._lock:
if next_object is ReplicatingQueue:
break
for subqueue in self._subqueues:
subqueue.put_nowait(next_object)
next_object = None
This class is a queue, but it also starts a Thread that copies all items to consumer queues you specify. You can start and stop the copying process. Keep in mind that, for the garbage collection to work, at some point, you will need to call stop() in order to stop the copying thread, thus deleting the references to objects allowing them to be GCed.
Here is also some naive test code to see it in action:
def __main__():
class TestConsumer(threading.Thread):
def __init__(self, name, queue):
self.queue = queue
threading.Thread.__init__(self, name=str(name))
def run(self):
while True:
print self.name + ': ' + str(self.queue.get())
replica = ReplicatingQueue()
for i in range(11):
queue = replica.add_new_subqueue()
t = TestConsumer(i, queue)
t.daemon = True
t.start()
queue = replica.add_new_subqueue()
replica.remove_subqueue(queue)
for i in range(10, 11):
time.sleep(1)
replica.put(i)
time.sleep(2)
print 'Stopping...'
replica.stop()
time.sleep(2)
for i in range(90, 99):
replica.put(i)
print 'Starting...'
replica.start()
print 'Stopping...'
replica.stop()
print 'Stopped'
time.sleep(2)
No comments:
Post a Comment