Thursday, January 5, 2017

Python - a Queue delivering to all consumers

The standard Python Queue is great. But we needed a Queue that would replicate all items to all consumers.

So, the case is that you have a queue where you push data, but you have multiple threads needing all items that have been queued. Here is a quick (and dirty) implementation I came up with.


import Queue
import threading
import time


class ReplicatingQueue(Queue.Queue):

    def __init__(self, *subqueues):
        self._lock = threading.RLock()
        self._subqueues = ()
        self.add_subqueue(*subqueues)
        self._thread = None
        Queue.Queue.__init__(self)
        self.start()

    def add_subqueue(self, *subqueues):
        "Add Queues to receive objects from the ReplicatingQueue."
        for subqueue in subqueues:
            if not isinstance(subqueue, Queue.Queue):
                raise TypeError('Subqueue in not a Queue')
        with self._lock:
            self._subqueues = tuple(set(self._subqueues).union(set(subqueues)))

    def add_new_subqueue(self):
        "Shorthand for creating a new Queue and adding it."
        queue = Queue.Queue()
        self.add_subqueue(queue)
        return queue

    def remove_subqueue(self, *subqueues):
        "Remove Queues from receiving objects from the ReplicatingQueue."
        with self._lock:
            self._subqueues = tuple(set(self._subqueues).difference(set(subqueues)))

    def stop(self):
        "Kill the replicating thread, so references are gone and GC can work, delivering queued items first."
        with self._lock:
            if not self._thread:
                return
            thread_ref = self._thread
            self._thread = None
            self.put(ReplicatingQueue)

    def start(self):
        "Start the replicating thread."
        with self._lock:
            if self._thread:
                return
            self._thread = threading.Thread(target=self._copier)
            self._thread.daemon = True
            self._thread.start()

    def _copier(self):
        "Copying routine."
        while True:
            try:
                next_object = self.get()
            except:
                break
            with self._lock:
                if next_object is ReplicatingQueue:
                    break
                for subqueue in self._subqueues:
                    subqueue.put_nowait(next_object)
            next_object = None


This class is a queue, but it also starts a Thread that copies all items to consumer queues you specify. You can start and stop the copying process. Keep in mind that, for the garbage collection to work, at some point, you will need to call stop() in order to stop the copying thread, thus deleting the references to objects allowing them to be GCed.

Here is also some naive test code to see it in action:

def __main__():
    class TestConsumer(threading.Thread):
        def __init__(self, name, queue):
            self.queue = queue
            threading.Thread.__init__(self, name=str(name))

        def run(self):
            while True:
                print self.name + ': ' + str(self.queue.get())

    replica = ReplicatingQueue()
    for i in range(11):
        queue = replica.add_new_subqueue()
        t = TestConsumer(i, queue)
        t.daemon = True
        t.start()
    queue = replica.add_new_subqueue()
    replica.remove_subqueue(queue)
    for i in range(10, 11):
        time.sleep(1)
        replica.put(i)
    time.sleep(2)
    print 'Stopping...'
    replica.stop()
    time.sleep(2)
    for i in range(90, 99):
        replica.put(i)
    print 'Starting...'
    replica.start()
    print 'Stopping...'
    replica.stop()
    print 'Stopped'
    time.sleep(2)


Monday, December 1, 2014

Python datetime and DST timezones

The datetime implementation in Python has an inherent problem. It does not perform well around Daylight Savings Time (DST) changes.

We were already using dateutil package, but it did not help. The problem lies in datetime.datetime and since dateutil is based upon the datetime package, it could not help.

Consider this code:

import datetime
import dateutil.tz
local = dateutil.tz.gettz('Europe/Athens')
d1 = datetime.datetime(2014, 3, 30, 2, 30, tzinfo=local)
d2 = datetime.datetime(2014, 3, 30, 4, 30, tzinfo=local)

02:30 on 2014-03-30 is 30 minutes before the DST change. At 03:00, the time "jumps" to 04:00 and the timezone changes from GMT+2 to GMT+3.

>>> print d1
2014-03-30 02:30:00+02:00
>>> print d2
2014-03-30 04:30:00+03:00

Normally, you would expect that adding 1 hour to d1 would give you 04:30 on the new timezone. Thus, you expect that d1 + datetime.timedelta(hours=1) == d2

Instead:

>>> print d1 + datetime.timedelta(hours=1)
2014-03-30 03:30:00+03:00
>>> print d2-d1
2:00:00

Event worse:

>>> print datetime.datetime(2014, 3, 30, 3, 30, tzinfo=local)
2014-03-30 03:30:00+03:00

This date in the specified timezone does not exist!

So, we needed a way to cope with these transitions. We needed a way to tell if a date is possible or nope. We ended up using the following snippet:

import datetime
import dateutil.tz

_utc = dateutil.tz.tzutc()

def localize_datetime(x, tz=_utc):
    tmp = x.replace(tzinfo=local)
    x = tmp.astimezone(_utc).astimezone(tz)
    if x != tmp:
        raise ValueError('Provided date is invalid in specified timezone!')
    return x

With this function, we check if a timezone-agnostic (or not) date is valid in a specific timezone. Not that we do not check the same issue on October, since we only care about the "gap" created by the DST change in March.

Not the best solution, but one that works for us.

Thursday, May 22, 2014

Cross-platform way to start use subprocess.Popen without inherited file descriptors

As mentioned in my previous post on how subprocess and file descriptors work in Python, there are significant differences in the way subprocess.Popen works in Linux and Windows. After the previous post, I tried to find a better way to handle this issue in a cross-platform way.

So, I ended up with the following module, "ingenuously" named file_safe_subprocess.py :

import subprocess
import threading
import platform
import __builtin__

_system = platform.system()

if _system=='Windows':
    import msvcrt
    import win32api
    import win32con
    import win32file
else:
    import fcntl
    import termios
    import resource

_lock = threading.RLock()
_std_Popen = subprocess.Popen
_std_open = open

def _set_file_descriptor_as_non_inheritable(fd):
    if _system=='Windows':
        win32api.SetHandleInformation(msvcrt.get_osfhandle(fd),
                                      win32con.HANDLE_FLAG_INHERIT,
                                      0)
    else:
        fcntl.ioctl(0, termios.FIOCLEX, 0)

def _max_opened_files():
    if _system=='Windows':
        return win32file._getmaxstdio()
    return resource.getrlimit(resource.RLIMIT_NOFILE)[0]

if _system=='Windows':
    def _safe_open(*args, **kwargs):
        with _lock:
            f = _std_open(*args, **kwargs)
            _set_file_descriptor_as_non_inheritable(f.fileno())
        return f

    __builtin__.open = _safe_open

    def _safe_Popen(*args, **kwargs):
        with _lock:
            p = _std_Popen(*args, **kwargs)
            for f in (p.stdin, p.stdout, p.stderr):
                if f!=None and f.fileno()>2:
                    try:
                        _set_file_descriptor_as_non_inheritable(f.fileno())
                    except:
                        pass
            return p
else:
    def _safe_Popen(*args, **kwargs):
        with _lock:
            kwargs['close_fds'] = True
            return _std_Popen(*args, **kwargs)

subprocess.Popen = _safe_Popen

def set_fds_as_non_inheritable(fds=None):
    if fds==None:
        fds = range(3, _max_opened_files())
    for i in fds:
        try:
            _set_file_descriptor_as_non_inheritable(fd)
        except:
            pass

def wrap_process(func, *args, **kwargs):
    with _lock:
        set_fds_as_non_inheritable()
        return func(*args, **kwargs)

def wrap_fd_open(func, *args, **kwargs):
    with _lock:

        return func(*args, **kwargs)

if _system=='Windows':
    set_fds_as_non_inheritable()

What does it do?

By importing this module at an early stage in a script, you essentially do the following:

  1. [Windows-only] change the built-in open() function to make sure that, after opening a file, the file descriptor is set as non-inheritable.
  2. override subprocess.Popen with a factory function that make sure that a) in Windows, any new pipes are marked as non-inheritable, and b) in non-Windows, make sure you use close_fds=True so that the subprocess.Popen constructor closes all other file descriptors besides 0, 1, 2
  3. [Windows-only] run the set_fds_as_non_inheritable() function once, to make sure that any already opened file descriptors are properly marked as non-inheritable.
Note that I use a lock, since my application is multi-threaded and I do not want to leak file descriptors opened from some other thread.

Alternative approach

Since I only use Popen for spawning sub-processes and mainly open logging files using the open() function, this module will do just fine. Of course, there are many other cases in which a file descriptor is created. For example,
  • opening up a socket
  • creating some other pipe though non-Popen calls
  • etc, etc
If you use some other function and you want the same (more or less) protection against file descriptor leaking, you can use the wrap_process() and wrap_fd_open() functions:
  • When you want to call any callable (function or class constructor) that internally creates a new process, then you can use the wrap_process() which tries to mark ALL possible file descriptors as non-inheritable and then calls the function you specify, with the arguments you specify.
  • When you want to call any callable (function or class constructor) that internally opens a new file descriptor (file, socket, pipe, whatever), then you can use the wrap_fd_open() which properly synchronizes against the wrap_process() function calls, so that no file descriptors are created while the wrap_process() is running.
Use caution when using the wrap_process() and wrap_fd_open() may create a deadlock in multi-threaded applications if the wrap_process() call does not return immediately.

Else, you could make additional functions that are used for wrapping your preferred method of process invocation and file descriptor creation, to make it faster and more efficient.

Tuesday, May 20, 2014

How subprocess and file descriptors work in Python

Frequently, I use subprocess in Python to spawn other tools to do some tasks I need. Recently, we moved a large piece of software from Linux-based to Windows-based servers. And the nightmare begun.

There are many differences between Linux and Windows in how file descriptors (or file handles) are handled by the Python interpreter. Let see.

Subprocess

It is fairly easy to use subprocess in Python. You just use the Popen constructor. Here is an example calling the ls command and retrieving the output.
import subprocess
p = subprocess.subprocess(['ls'], stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
print stdout

File Descriptors (or File Handles)

File descriptors are the integers that are used to identify the open files (or pipes, fifos, tty, anything) that any running program has currently open. If you do not know what a file descriptor is, have a look at the file descriptor Wikipedia article.

When any program is "forked", the new process (usually) inherits the file descriptors of its parent.

Python and File Descriptors

Python works more or less in a similar way. File descriptors in Python are usually not handled directly. Instead, they are managed through higher level objects, like Python file objects, which automatically handle creating and destroying file descriptors using underlying C libraries.

How to Get Opened File Descriptors in Python

In order for us to troubleshoot our issues, we needed a way to get the valid file descriptors in a Python script. So, we crafted the following script. Note that we only check file descriptors from 0 to 100, since we do not open so many files concurrently.

fd_table_status.py :

import os
import stat

_fd_types = (
    ('REG', stat.S_ISREG),
    ('FIFO', stat.S_ISFIFO),
    ('DIR', stat.S_ISDIR),
    ('CHR', stat.S_ISCHR),
    ('BLK', stat.S_ISBLK),
    ('LNK', stat.S_ISLNK),
    ('SOCK', stat.S_ISSOCK)
)

def fd_table_status():
    result = []
    for fd in range(100):
        try:
            s = os.fstat(fd)
        except:
            continue
        for fd_type, func in _fd_types:
            if func(s.st_mode):
                break
        else:
            fd_type = str(s.st_mode)
        result.append((fd, fd_type))
    return result

def fd_table_status_logify(fd_table_result):
    return ('Open file handles: ' +
            ', '.join(['{0}: {1}'.format(*i) for i in fd_table_result]))

def fd_table_status_str():
    return fd_table_status_logify(fd_table_status())

if __name__=='__main__':
    print fd_table_status_str()

When simply run, it will show all open file descriptors and their respective type:

$> python fd_table_status.py
Open file handles: 0: CHR, 1: CHR, 2: CHR
$>

The output is the same by calling fd_table_status_str() .

Inherited File Descriptors - Linux vs Windows

To see the behavior, we will run the following script in Windows and in Linux. Note that the differentiated output is marked in bold.

test_fd_handling.py :

import fd_table_status
import subprocess
import platform

fds = fd_table_status.fd_table_status_str

if platform.system()=='Windows':
    python_exe = r'C:\Python27\python.exe'
else:
    python_exe = 'python'

print '1) Initial file descriptors:\n' + fds()
f = open('fd_table_status.py', 'r')
print '2) After file open, before Popen:\n' + fds()
p = subprocess.Popen(['python', 'fd_table_status.py'],
                     stdin=subprocess.PIPE,
                     stdout=subprocess.PIPE,
                     stderr=subprocess.PIPE)
print '3) After Popen, before reading piped output:\n' + fds()
result = p.communicate()
print '4) After Popen.communicate():\n' + fds()
del p
print '5) After deleting reference to Popen instance:\n' + fds()
del f
print '6) After deleting reference to file instance:\n' + fds()
print '7) child process had the following file descriptors:'
print result[0][:-1]

Linux output

1) Initial file descriptors:
Open file handles: 0: CHR, 1: CHR, 2: CHR
2) After file open, before Popen:
Open file handles: 0: CHR, 1: CHR, 2: CHR, 3: REG
3) After Popen, before reading piped output:
Open file handles: 0: CHR, 1: CHR, 2: CHR, 3: REG, 5: FIFO, 6: FIFO, 8: FIFO
4) After Popen.communicate():
Open file handles: 0: CHR, 1: CHR, 2: CHR, 3: REG
5) After deleting reference to Popen instance:
Open file handles: 0: CHR, 1: CHR, 2: CHR, 3: REG
6) After deleting reference to file instance:
Open file handles: 0: CHR, 1: CHR, 2: CHR
7) child process had the following file descriptors:
Open file handles: 0: FIFO, 1: FIFO, 2: FIFO, 3: REG

Windows output

1) Initial file descriptors:
Open file handles: 0: CHR, 1: CHR, 2: CHR
2) After file open, before Popen:
Open file handles: 0: CHR, 1: CHR, 2: CHR, 3: REG
3) After Popen, before reading piped output:
Open file handles: 0: CHR, 1: CHR, 2: CHR, 3: REG, 4: FIFO, 5: FIFO, 6: FIFO
4) After Popen.communicate():
Open file handles: 0: CHR, 1: CHR, 2: CHR, 3: REG, 5: FIFO, 6: FIFO
5) After deleting reference to Popen instance:
Open file handles: 0: CHR, 1: CHR, 2: CHR, 3: REG
6) After deleting reference to file instance:
Open file handles: 0: CHR, 1: CHR, 2: CHR
7) child process had the following file descriptors:
Open file handles: 0: FIFO, 1: FIFO, 2: FIFO

Step-by-step Output Review

I will be tracing the output of both runs step-by-step and explain (or try to explain) the output.
  1. The 3 initial file descriptors 0, 1, 2, (stdin, stdout, stderr) are connected to the TTY (the controlling terminal) in both cases. Type CHR indicates a special character device, which is our attached terminal (or pseudo-terminal).
  2. After a file is opened (for reading, but it does not matter), an new file descriptor (3) is added in both cases. Type REG indicates a regular file.
  3. After the Popen call, three additional file descriptors are added, 1 for each anonymous pipe created during the spawn of the new program. The numbers of the file descriptors do not matter, so do not pay attention to them. It is up to the OS to assign file descriptor values, so this is accepted. All goes well up to here. Type FIFO indicates a pipe (named pipe or anonymous pipe).
  4. Popen.communicate() does the following: a) sends any input to the child-process (we do not specify any), b) closes the input pipe, and c) reads all available output (both stdout and stderr) until the child terminates. In this step, we have different behavior in Windows and in Linux. In Linux, all three file descriptors in the parent process are closed, while in Windows the two output file descriptors (used for stdout and stderr by the child-process) are retained. Despite the fact that the child-process has terminated, in Windows, the two pipes remain open. This has a significant implication: if the Popen instance is not destroyed (e.g. by keeping a reference somehow, and thus not allowing the garbage collector to destroy the object), the pipes remain opened. If this is continued in further calls, the "zombie" file descriptors from the pipes will pile up, and eventually cause a Python exception "IOError: [Errno 24] Too many open files". Remember, this happens only in Windows!
  5. The reference to the Popen instance is removed, the Garbage Collector destroys the object, and the remaining pipes are also destroyed in Windows. Thus, the file descriptors are now the same with the Linux run. Expected behavior.
  6. The same happens when deleting the reference to the file object. Expected behavior.
  7. Now, we look at the file descriptors of the child-process (we retrieved the output of the child program when we did the communicate() call previously). In the Linux run, we see that the child-process has the previously opened file also available as a valid file descriptor, which is the expected behavior. But in the Windows run, we see that the child process does not have this file descriptor available! This is probably caused by the Windows-version of the Python interpreter. I have run other command-line tools and examined their behavior. It seems that the Python interpreter in Windows closes all file descriptors (besides 0, 1, 2) upon start-up. This requires further investigation to confirm the observed behavior.

Wrapper Python Script to Close All File Descriptors

Two simple scripts follow, which close all file descriptors before running a command. Both are called with the command to run as the 1st command-line argument, and the arguments to the command as the following command-line arguments.

Running the final program as a subprocess.

import sys
import os
import subprocess

if __name__=='__main__':
    os.closerange(3, 100)
    subprocess_args = sys.argv[1:]
    if not subprocess_args:
        print ("USAGE: python {0} .....\n\n"
               "where are arguments passed to subprocess.Popen")
        sys.exit(1)
    popen = subprocess.Popen(subprocess_args, stdin=0, stdout=1, stderr=2)
    exit_status = popen.wait()
    sys.exit(exit_status)

Running the final program using execv.

import sys
import os

if __name__=='__main__':
    os.closerange(3, 100)
    subprocess_args = sys.argv[1:]
    if not subprocess_args:
        print ("USAGE: python {0} .....\n\n"
               "where are arguments passed to os.execvp()")
        sys.exit(1)
    os.execvp(subprocess_args[0], subprocess_args)

Conclusions

Some important conclusions:
  • Make sure that Popen instances are destroyed.
  • If you:
    a) are in Windows, and
    b) are spawning an external tool to do some processing, and
    c) have opened files that you want to work on (e.g. move) concurrently,
    THEN: use one of the above wrapper scripts. It might save you a lot of trouble.

Further Reading and Information

Some more information can be found in Python PEP-446, which might explain the behavior observed above. May be a better solution can be found for the case of inherited file handles in Windows.

An alternative (and probably better) way for achieving the same result (but avoiding the use of the above wrapper scripts) is to make a Win32 system call and mark a file descriptor as non-inheritable. In the following example code, I define an open() replacement that opens the file and marks the resulting file descriptor as non-inheritable.

import msvcrt
import win32api
import win32con
def my_open(*args, **kwargs):
    f = open(*args, **kwargs)
    win32api.SetHandleInformation(
        msvcrt.get_osfhandle(f.fileno()),
        win32con.HANDLE_FLAG_INHERIT,
        0)
    return f

Although this behavior does not cause the same issues in Linux as it does in Windows, you can achieve the same behavior by setting close_fd=True in the subprocess.Popen() constructor. In Linux, it works fine.