Source code for nxpy.core.async_subprocess

# nxpy.core package ----------------------------------------------------------

# Copyright Nicola Musatti 2008 - 2012
# Use, modification, and distribution are subject to the Boost Software
# License, Version 1.0. (See accompanying file LICENSE.txt or copy at
# http://www.boost.org/LICENSE_1_0.txt)

# See http://sourceforge.net/nxpy for library home page. ---------------------

r"""
Allow asynchronous interaction with a subprocess.

This module was taken from the ASPN Python Cookbook website, with only minor
modifications. This is the original description:

Title:        Module to allow Asynchronous subprocess use on Windows and Posix platforms
Submitter:    Josiah Carlson (other recipes)
Last Updated: 2006/12/01
Version no:   1.9
Category:     System 

"""

import os
import subprocess
import errno
import time
import sys

## PIPE = subprocess.PIPE

if subprocess.mswindows:
    from win32file import ReadFile, WriteFile
    from win32pipe import PeekNamedPipe
    import msvcrt
else:
    import select
    import fcntl

[docs]class AsyncPopen(subprocess.Popen): r""" An asynchronous variant to 'subprocess.Popen', which doesn't block on incomplete I/O operations. Note that the terms input, output and error refer to the controlled program streams, so we receive from output or error and we send to input. """
[docs] def recv(self, maxsize=None): r"""Receive at most 'maxsize' bytes from the subprocess's standard output""" return self._recv('stdout', maxsize)
[docs] def recv_err(self, maxsize=None): r"""Receive at most 'maxsize' bytes from the subprocess's standard error""" return self._recv('stderr', maxsize)
[docs] def send_recv(self, input='', maxsize=None): r""" Send 'input' to the subprocess's standard input and then receive at most 'maxsize' bytes from both its standard output and standard error. """ return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
[docs] def get_conn_maxsize(self, which, maxsize): r""" Return 'which' output pipe (either stdout or stderr) and 'maxsize' constrained to the [1, 1024] interval in a tuple. """ if maxsize is None: maxsize = 1024 elif maxsize < 1: maxsize = 1 return getattr(self, which), maxsize
def _close(self, which): getattr(self, which).close() setattr(self, which, None) if subprocess.mswindows: def send(self, input): r"""Send 'input' to the subprocess's standard input.""" if not self.stdin: return None try: x = msvcrt.get_osfhandle(self.stdin.fileno()) (errCode, written) = WriteFile(x, input) except ValueError: return self._close('stdin') except (subprocess.pywintypes.error, Exception), why: if why[0] in (109, errno.ESHUTDOWN): return self._close('stdin') raise return written def _recv(self, which, maxsize): conn, maxsize = self.get_conn_maxsize(which, maxsize) if conn is None: return None try: x = msvcrt.get_osfhandle(conn.fileno()) (read, nAvail, nMessage) = PeekNamedPipe(x, 0) if maxsize < nAvail: nAvail = maxsize if nAvail > 0: (errCode, read) = ReadFile(x, nAvail, None) except ValueError: return self._close(which) except (subprocess.pywintypes.error, Exception), why: if why[0] in (109, errno.ESHUTDOWN): return self._close(which) raise if self.universal_newlines: read = self._translate_newlines(read) return read else:
[docs] def send(self, input): r"""Send 'input' to the subprocess's standard input.""" if not self.stdin: return None if not select.select([], [self.stdin], [], 0)[1]: return 0 try: written = os.write(self.stdin.fileno(), input) except OSError, why: if why[0] == errno.EPIPE: #broken pipe return self._close('stdin') raise return written
def _recv(self, which, maxsize): conn, maxsize = self.get_conn_maxsize(which, maxsize) if conn is None: return None flags = fcntl.fcntl(conn, fcntl.F_GETFL) if not conn.closed: fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK) try: if not select.select([conn], [], [], 0)[0]: return '' r = conn.read(maxsize) if not r: return self._close(which) if self.universal_newlines: r = self._translate_newlines(r) return r finally: if not conn.closed: fcntl.fcntl(conn, fcntl.F_SETFL, flags)
message = "Other end disconnected!"
[docs]def recv_some(p, t=.1, e=1, tr=5, stderr=0): r""" Try and receive data from AsyncPopen object 'p''s stdout in at most 'tr' tries and with a timeout of 't'. If 'stderr' is True receive from the subprocess's stderr instead. """ if tr < 1: tr = 1 x = time.time()+t y = [] r = '' pr = p.recv if stderr: pr = p.recv_err while time.time() < x or r: r = pr() if r is None: if e: raise Exception(message) else: break elif r: y.append(r) else: time.sleep(max((x-time.time())/tr, 0)) return ''.join(y)
[docs]def send_all(p, data): r"""Send all of 'data' to AsyncPopen object 'p''s stdin.""" while len(data): sent = p.send(data) if sent is None: raise Exception(message) data = buffer(data, sent)
if __name__ == '__main__': # TODO Move tests to the _test subpackage. if sys.platform == 'win32': shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n') else: shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n') a = AsyncPopen(shell, stdin=subprocess.PIPE, stdout=subprocess.PIPE) print recv_some(a), for cmd in commands: send_all(a, cmd + tail) print recv_some(a), send_all(a, 'exit' + tail) print recv_some(a, e=0) a.wait()

This Project