Source code for nxpy.command.interpreter

# nxpy.core package ----------------------------------------------------------

# Copyright Nicola Musatti 2008 - 2012
# Use, modification, and distribution are subject to the Boost Software
# License, Version 1.0. (See accompanying file LICENSE.txt or copy at
# http://www.boost.org/LICENSE_1_0.txt)

# See http://sourceforge.net/nxpy for library home page. ---------------------

r"""
Interactive program driver.

"""

import re
import StringIO
import subprocess
import sys
import time

import nxpy.core.async_subprocess
import nxpy.command.error


# Output prefixes

OUTPUT  = "OUT> "
ERROR   = "ERR> "
EXCEPT  = "EXC> "
COMMAND = "CMD> "


# Type of output

FILE = 0
STRING = 1


def _format(input, output=STRING, type=OUTPUT):
    r"""
    Prepends a prefix to each 'input' line as specified by 'type' and copies it to 'output'. 
    Returns 'output'.
    """
    
    if type not in ( OUTPUT, ERROR, EXCEPT, COMMAND ):
        raise nxpy.command.error.BadLogFormat(str(type) + ": Unknown format")
    if isinstance(input, basestring):
        if input and input[-1] != '\n':
            input = input + '\n'
        input = StringIO.StringIO(input)
    if output in ( FILE, STRING ):
        out = StringIO.StringIO()
    else:
        out = output
    for line in input:
        out.write(type)
        out.write(line)
    if output == STRING:
        return out.getvalue()
    return out


[docs]class BadCommand(Exception): """Raised on a command execution failure""" def __init__(self, cmd, err): self.command = cmd self.stderr = err msg = _format(cmd, FILE, COMMAND) if err: msg = _format(err, msg, ERROR) super(BadCommand, self).__init__("".join(msg))
EXP_OUT = 0 EXP_ERR = 1 def waitOutput(out, err): return out def waitError(out, err): return err class LineWaiter(object): def __init__(self, lines): self.lines = lines self.count = 0 def __call__(self, out, err): self.count += out.count('\n') return self.count == self.lines class StringWaiter(object): def __init__(self, str, where): self.str = str self.where = where def __call__(self, out, err): if self.where == EXP_OUT: o = out else: o = err return o.find(self.str) != -1 class RegexpWaiter(object): def __init__(self, regexp, where): if isinstance(regexp, basestring): self.regexp = re.compile(regexp, re.MULTILINE) else: self.regexp = regexp self.where = where def __call__(self, out, err): if self.where == EXP_OUT: o = out else: o = err return self.regexp.search(o)
[docs]class Timer(object): r"""A collaborative timer class""" def __init__(self, timeout, retries, interval=0.1, quantum=0.01): if retries < 0: raise nxpy.command.error.TimerError("retries must be equal or greater than 0") elif retries == 0: self.retries = -1 self.retries = retries self.count = 0 self.interval = interval if timeout > 0: self.timeout = timeout self.end = time.time() + timeout if retries > 1: self.interval = timeout / retries else: self.timeout = 0 self.end = 0 self.interval = interval if quantum < 0: raise nxpy.command.error.TimerError("quantum must be equal or greater than 0") self.quantum = quantum def getInterval(self): if self.end and self.retries > 1: self.interval = max((self.end-time.time())/self.retries, self.quantum) return self.interval def expired(self): self.count += 1 return self.timeout and self.end < time.time() or ( self.retries - self.count == 0 ) def reset(self): self.count = 0 if self.timeout: self.end = time.time() + self.timeout
[docs]class BaseInterpreter(object): r""" Controls the execution of an interactive program in a sub-process. Provides means to send input to the controlled process and to check different conditions on its output and error streams. """ def __init__(self, popen): self.log = False self.popen = popen def setLog(self, log): self.log = log def _log(self, log): if log != None: return log else: return self.log def send_cmd(self, cmd, log=None): try: if self._log(log): _format(cmd, sys.stderr, COMMAND) self.popen.send(cmd + "\r\n") except Exception, e: raise BadCommand(cmd, str(e.args)) def expect_any(self, **kwargs): return self.expect(waitOutput, **kwargs) def expect_lines(self, lines=1, **kwargs): return self.expect(LineWaiter(lines), **kwargs) def expect_string(self, str, where=EXP_OUT, **kwargs): return self.expect(StringWaiter(str, where), **kwargs) def expect_regexp(self, regexp, where=EXP_OUT, **kwargs): return self.expect(RegexpWaiter(str, where), **kwargs) def expect(self, cond=None, timeout=0, retries=0, interval=0.01, quantum=0.01, raise_on_error=True, log=None): try: out_list = [] err_list = [] timer = Timer(timeout, retries, interval, quantum) while not timer.expired(): out = self.popen.recv() if out: out_list.append(out) err = self.popen.recv_err() if err: err_list.append(err) if cond and cond(out, err): break if out or err: timer.reset() t = timer.quantum else: t = timer.getInterval() if self._log(log): sys.stderr.write("END: %s SLEEP: %f SIZE: %d\n" % (time.ctime(timer.end), t, len(out) + len(err))) if t > 0: time.sleep(t) else: if cond: raise nxpy.command.error.TimeoutError(err) finally: out = ''.join(out_list) err = ''.join(err_list) if self._log(log): _format(out, sys.stderr) _format(err, sys.stderr, ERROR) if raise_on_error and err: raise nxpy.command.error.ExpectError(err) return out, err def run(self, cmd, log=None, **kwargs): self.send_cmd(cmd, log=log) kwargs['log'] = log try: return self.expect(**kwargs) except nxpy.command.error.ExpectError, e: raise BadCommand(cmd, e.args[0])
[docs]class Interpreter(BaseInterpreter): r"""The actual Interpreter class. Optionally accepts an alternative 'subprocess' implementation. """ def __init__(self, cmd): super(Interpreter, self).__init__(nxpy.core.async_subprocess.AsyncPopen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE))

This Project