# nxpy.core package ----------------------------------------------------------
# Copyright Nicola Musatti 2008 - 2012
# Use, modification, and distribution are subject to the Boost Software
# License, Version 1.0. (See accompanying file LICENSE.txt or copy at
# http://www.boost.org/LICENSE_1_0.txt)
# See http://sourceforge.net/nxpy for library home page. ---------------------
r"""
Interactive program driver.
"""
import re
import StringIO
import subprocess
import sys
import time
import nxpy.core.async_subprocess
import nxpy.command.error
# Output prefixes
OUTPUT = "OUT> "
ERROR = "ERR> "
EXCEPT = "EXC> "
COMMAND = "CMD> "
# Type of output
FILE = 0
STRING = 1
def _format(input, output=STRING, type=OUTPUT):
r"""
Prepends a prefix to each 'input' line as specified by 'type' and copies it to 'output'.
Returns 'output'.
"""
if type not in ( OUTPUT, ERROR, EXCEPT, COMMAND ):
raise nxpy.command.error.BadLogFormat(str(type) + ": Unknown format")
if isinstance(input, basestring):
if input and input[-1] != '\n':
input = input + '\n'
input = StringIO.StringIO(input)
if output in ( FILE, STRING ):
out = StringIO.StringIO()
else:
out = output
for line in input:
out.write(type)
out.write(line)
if output == STRING:
return out.getvalue()
return out
[docs]class BadCommand(Exception):
"""Raised on a command execution failure"""
def __init__(self, cmd, err):
self.command = cmd
self.stderr = err
msg = _format(cmd, FILE, COMMAND)
if err:
msg = _format(err, msg, ERROR)
super(BadCommand, self).__init__("".join(msg))
EXP_OUT = 0
EXP_ERR = 1
def waitOutput(out, err):
return out
def waitError(out, err):
return err
class LineWaiter(object):
def __init__(self, lines):
self.lines = lines
self.count = 0
def __call__(self, out, err):
self.count += out.count('\n')
return self.count == self.lines
class StringWaiter(object):
def __init__(self, str, where):
self.str = str
self.where = where
def __call__(self, out, err):
if self.where == EXP_OUT:
o = out
else:
o = err
return o.find(self.str) != -1
class RegexpWaiter(object):
def __init__(self, regexp, where):
if isinstance(regexp, basestring):
self.regexp = re.compile(regexp, re.MULTILINE)
else:
self.regexp = regexp
self.where = where
def __call__(self, out, err):
if self.where == EXP_OUT:
o = out
else:
o = err
return self.regexp.search(o)
[docs]class Timer(object):
r"""A collaborative timer class"""
def __init__(self, timeout, retries, interval=0.1, quantum=0.01):
if retries < 0:
raise nxpy.command.error.TimerError("retries must be equal or greater than 0")
elif retries == 0:
self.retries = -1
self.retries = retries
self.count = 0
self.interval = interval
if timeout > 0:
self.timeout = timeout
self.end = time.time() + timeout
if retries > 1:
self.interval = timeout / retries
else:
self.timeout = 0
self.end = 0
self.interval = interval
if quantum < 0:
raise nxpy.command.error.TimerError("quantum must be equal or greater than 0")
self.quantum = quantum
def getInterval(self):
if self.end and self.retries > 1:
self.interval = max((self.end-time.time())/self.retries, self.quantum)
return self.interval
def expired(self):
self.count += 1
return self.timeout and self.end < time.time() or ( self.retries - self.count == 0 )
def reset(self):
self.count = 0
if self.timeout:
self.end = time.time() + self.timeout
[docs]class BaseInterpreter(object):
r"""
Controls the execution of an interactive program in a sub-process.
Provides means to send input to the controlled process and to check different conditions on its
output and error streams.
"""
def __init__(self, popen):
self.log = False
self.popen = popen
def setLog(self, log):
self.log = log
def _log(self, log):
if log != None:
return log
else:
return self.log
def send_cmd(self, cmd, log=None):
try:
if self._log(log):
_format(cmd, sys.stderr, COMMAND)
self.popen.send(cmd + "\r\n")
except Exception, e:
raise BadCommand(cmd, str(e.args))
def expect_any(self, **kwargs):
return self.expect(waitOutput, **kwargs)
def expect_lines(self, lines=1, **kwargs):
return self.expect(LineWaiter(lines), **kwargs)
def expect_string(self, str, where=EXP_OUT, **kwargs):
return self.expect(StringWaiter(str, where), **kwargs)
def expect_regexp(self, regexp, where=EXP_OUT, **kwargs):
return self.expect(RegexpWaiter(str, where), **kwargs)
def expect(self, cond=None, timeout=0, retries=0, interval=0.01,
quantum=0.01, raise_on_error=True, log=None):
try:
out_list = []
err_list = []
timer = Timer(timeout, retries, interval, quantum)
while not timer.expired():
out = self.popen.recv()
if out:
out_list.append(out)
err = self.popen.recv_err()
if err:
err_list.append(err)
if cond and cond(out, err):
break
if out or err:
timer.reset()
t = timer.quantum
else:
t = timer.getInterval()
if self._log(log):
sys.stderr.write("END: %s SLEEP: %f SIZE: %d\n" %
(time.ctime(timer.end), t, len(out) + len(err)))
if t > 0:
time.sleep(t)
else:
if cond:
raise nxpy.command.error.TimeoutError(err)
finally:
out = ''.join(out_list)
err = ''.join(err_list)
if self._log(log):
_format(out, sys.stderr)
_format(err, sys.stderr, ERROR)
if raise_on_error and err:
raise nxpy.command.error.ExpectError(err)
return out, err
def run(self, cmd, log=None, **kwargs):
self.send_cmd(cmd, log=log)
kwargs['log'] = log
try:
return self.expect(**kwargs)
except nxpy.command.error.ExpectError, e:
raise BadCommand(cmd, e.args[0])
[docs]class Interpreter(BaseInterpreter):
r"""The actual Interpreter class.
Optionally accepts an alternative 'subprocess' implementation.
"""
def __init__(self, cmd):
super(Interpreter, self).__init__(nxpy.core.async_subprocess.AsyncPopen(cmd,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE))