# nxpy.core package ----------------------------------------------------------
# Copyright Nicola Musatti 2008 - 2014
# Use, modification, and distribution are subject to the Boost Software
# License, Version 1.0. (See accompanying file LICENSE.txt or copy at
# http://www.boost.org/LICENSE_1_0.txt)
# See http://nxpy.sourceforge.net for library home page. ---------------------
r"""
Interactive program driver.
"""
from __future__ import absolute_import
import os
import re
import subprocess
import sys
import time
import six
import nxpy.core.async_subprocess
import nxpy.command.error
# Output prefixes
OUTPUT = "OUT> "
ERROR = "ERR> "
EXCEPT = "EXC> "
COMMAND = "CMD> "
# Type of output
FILE = 0
STRING = 1
def _format(input_, output=STRING, type_=OUTPUT):
r"""
Prepends a prefix to each *input_* line as specified by *type_* and copies it to *output*.
Returns *output*.
"""
if type_ not in ( OUTPUT, ERROR, EXCEPT, COMMAND ):
raise nxpy.command.error.BadLogFormat(str(type_) + ": Unknown format")
if isinstance(input_, six.string_types):
if input_ and input_[-1] != '\n':
input_ = input_ + '\n'
input_ = six.StringIO(input_)
if output in ( FILE, STRING ):
out = six.StringIO()
else:
out = output
for line in input_:
out.write(type_)
out.write(line)
if output == STRING:
return out.getvalue()
return out
[docs]class BadCommand(Exception):
r"""Raised on a command execution failure"""
[docs] def __init__(self, cmd, err):
r"""Takes the failed command and the contents of the error stream."""
self.command = cmd
self.stderr = err
msg = _format(cmd, FILE, COMMAND)
if err:
msg = _format(err, msg, ERROR)
super(BadCommand, self).__init__("".join(msg))
EXP_OUT = 0
EXP_ERR = 1
[docs]def waitOutput(out, err):
r"""Wait for any output."""
return out
[docs]def waitError(out, err):
r"""Wait for any error."""
return err
[docs]class LineWaiter(object):
r"""Wait for *count* lines of output."""
[docs] def __init__(self, count):
self.count = count
self.n = 0
[docs] def __call__(self, out, err):
self.n += out.count('\n')
return self.n >= self.count
[docs]class StringWaiter(object):
r"""Wait for a specific *string* in the *where* stream."""
[docs] def __init__(self, string, where):
self.string = string
self.where = where
[docs] def __call__(self, out, err):
if self.where == EXP_OUT:
o = out
else:
o = err
return o.find(self.string) != -1
[docs]class RegexpWaiter(object):
r"""Wait for a match to a given *regexp*, passed either compiled or as a string."""
[docs] def __init__(self, regexp, where):
if isinstance(regexp, six.string_types):
self.regexp = re.compile(regexp, re.MULTILINE)
else:
self.regexp = regexp
self.where = where
[docs] def __call__(self, out, err):
if self.where == EXP_OUT:
o = out
else:
o = err
return self.regexp.search(o)
[docs]class Timer(object):
r"""
A collaborative timer class.
Support a polling mechanism by keeping track of the amount of time to wait before the next
attempt, according to different policies.
"""
[docs] def __init__(self, timeout=0, retries=0, interval=0.1, quantum=0.01):
"""
Specify an overall *timeout*, a number of *retries* and/or an *interval* between them.
The next attempt will not take place before a *quantum* has passed. Timings are expressed
in seconds. If a timeout is specified it will take precedence over the other arguments; in
that case the number of retries will take precedence over the interval. If neither a timeout
nor a number of retries are specified the overall timer will never expire.
"""
if retries < 0:
raise nxpy.command.error.TimerError("retries must be equal or greater than 0")
elif retries == 0:
self.retries = -1
else:
self.retries = retries
self.count = 0
self.interval = interval
if timeout > 0:
self.timeout = timeout
self.end = time.time() + timeout
if retries > 1:
self.interval = timeout / retries
else:
self.timeout = 0
self.end = 0
if quantum < 0:
raise nxpy.command.error.TimerError("quantum must be equal or greater than 0")
self.quantum = quantum
[docs] def getInterval(self):
r"""
Return the next wait interval.
Call after each attempt in order to know how long to wait for.
"""
if self.end and self.retries > 1:
self.interval = max((self.end-time.time())/self.retries, self.quantum)
return self.interval
[docs] def expired(self):
r"""
Indicate whether the current timer expired.
Use as polling loop control condition.
"""
self.count += 1
return self.timeout and self.end < time.time() or ( self.retries - self.count == 0 )
[docs] def reset(self):
r"""Reset the timer."""
self.count = 0
if self.timeout:
self.end = time.time() + self.timeout
[docs]class BaseInterpreter(object):
r"""
Controls the execution of an interactive program in a sub-process.
Provides means to send input to the controlled process and to check different conditions on its
output and error streams.
"""
[docs] def __init__(self, popen):
r"""
Creates an interpreter instance. *popen* is a :py:class:`.Popen`-like object which must
support non-blocking I/O.
"""
self.log = False
self.popen = popen
[docs] def setLog(self, log):
r"""
If *log* is *True*, enable logging of command output and error, otherwise disable it.
"""
self.log = log
def _log(self, log):
r"""
Check whether logging should be enabled. Usually *log* is passed from the calling method.
"""
if log != None:
return log
else:
return self.log
[docs] def send_cmd(self, cmd, log=None):
r"""
Write *cmd* to the interpreter's input, optianally logging it. If *log* is not *None*,
override the global setting.
"""
try:
if self._log(log):
_format(cmd, sys.stderr, COMMAND)
self.popen.send(cmd + os.linesep)
except Exception:
e = sys.exc_info()[1]
raise BadCommand(cmd, str(e.args))
[docs] def expect_any(self, **kwargs):
r"""Expect any output."""
return self.expect(waitOutput, **kwargs)
[docs] def expect_lines(self, count=1, **kwargs):
r"""Expect *count* lines of output."""
return self.expect(LineWaiter(count), **kwargs)
[docs] def expect_string(self, string, where=EXP_OUT, **kwargs):
r"""Expect a *string* in the *where* stream."""
return self.expect(StringWaiter(string, where), **kwargs)
[docs] def expect_regexp(self, regexp, where=EXP_OUT, **kwargs):
r"""
Expect to find a match for the *regexp* regular expression within the *where* stream.
"""
return self.expect(RegexpWaiter(regexp, where), **kwargs)
[docs] def expect(self, cond=None, timeout=0, retries=0, interval=0.01,
quantum=0.01, raise_on_error=True, log=None):
r"""
Express expectations on the outcome of a command.
*cond* is a two argument callable which will be passed the command's standard output and
standard error, and which should return *True* if the expectation is satisfied. For the
other arguments see the documentation for the :py:class:`.Timer` class.
"""
try:
out_list = []
err_list = []
timer = Timer(timeout, retries, interval, quantum)
while not timer.expired():
out = self.popen.recv()
if out:
out_list.append(out)
err = self.popen.recv_err()
if err:
err_list.append(err)
if cond and cond(out, err):
break
if out or err:
timer.reset()
t = timer.quantum
else:
t = timer.getInterval()
if self._log(log):
sys.stderr.write("END: %s SLEEP: %f SIZE: %d\n" %
(time.ctime(timer.end), t, len(out) + len(err)))
if t > 0:
time.sleep(t)
else:
if cond:
raise nxpy.command.error.TimeoutError(err)
finally:
out = ''.join(out_list)
err = ''.join(err_list)
if self._log(log):
_format(out, sys.stderr)
_format(err, sys.stderr, ERROR)
if raise_on_error and err:
raise nxpy.command.error.ExpectError(err)
return out, err
[docs] def run(self, cmd, log=None, **kwargs):
r"""Executes the command and waits for the expected outcome or an error."""
self.send_cmd(cmd, log=log)
kwargs['log'] = log
try:
return self.expect(**kwargs)
except nxpy.command.error.ExpectError:
e = sys.exc_info()[1]
raise BadCommand(cmd, e.args[0])
[docs]class Interpreter(BaseInterpreter):
r"""
The actual Interpreter class.
This implementation uses a :py:class:`.core.async_subprocess.AsyncPopen` instance.
"""
[docs] def __init__(self, cmd):
super(Interpreter, self).__init__(nxpy.core.async_subprocess.AsyncPopen(cmd.split(),
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE))