pjsip-pjproject/tests/pjsua/run.py

441 lines
13 KiB
Python

import sys
import re
import os
import subprocess
import random
import telnetlib
import time
import threading
import traceback
import getopt
import inc_cfg as inc
import inc_const as const
import inc_util as util
# Vars
G_EXE = "" # pjsua executable path
G_INUNIX = False # flags that test is running in Unix
# Usage string
usage = \
"""
run.py - Automated test driver
Usage:
run.py [options] MODULE CONFIG
Options:
--exe, -e pjsua executable path
--null-audio, -n use null audio
Sample:
run.py -n mod_run.py scripts-run/100_simple.py
"""
# Parse arguments
try:
opts, args = getopt.getopt(sys.argv[1:], "hne:", ["help", "null-audio", "exe="])
except getopt.GetoptError as err:
print(str(err))
print(usage)
sys.exit(2)
for o, a in opts:
if o in ("-h", "--help"):
print(usage)
sys.exit()
elif o in ("-n", "--null-audio"):
inc.HAS_SND_DEV = 0
elif o in ("-e", "--exe"):
G_EXE = a
else:
print("Unknown options")
sys.exit(2)
if len(args) != 2:
print("Invalid arguments")
print(usage)
sys.exit(2)
# Set global ARGS to be used by modules
inc.ARGS = args
# Get the pjsua executable name
if G_EXE == "":
if sys.platform.find("win32")!=-1:
EXE_DIR = "../../pjsip-apps/bin/"
EXECUTABLES = [ "pjsua_vc6d.exe",
"pjsua_vc6.exe",
"pjsua-i386-Win32-vc8-Debug.exe",
"pjsua-i386-Win32-vc8-Debug-Dynamic.exe",
"pjsua-i386-Win32-vc8-Debug-Static.exe",
"pjsua-i386-Win32-vc8-Release.exe",
"pjsua-i386-Win32-vc8-Release-Dynamic.exe",
"pjsua-i386-Win32-vc8-Release-Static.exe",
"pjsua-i386-Win32-vc14-Debug.exe",
"pjsua-i386-Win32-vc14-Debug-Dynamic.exe",
"pjsua-i386-Win32-vc14-Debug-Static.exe",
"pjsua-i386-Win32-vc14-Release.exe",
"pjsua-i386-Win32-vc14-Release-Dynamic.exe",
"pjsua-i386-Win32-vc14-Release-Static.exe"
]
e_ts = 0
for e in EXECUTABLES:
e = EXE_DIR + e
if os.access(e, os.F_OK):
st = os.stat(e)
if e_ts==0 or e_ts<st.st_mtime:
G_EXE = e
e_ts = st.st_mtime
if G_EXE=="":
print("Unable to find valid pjsua. Please build pjsip first")
sys.exit(1)
G_INUNIX = False
else:
f = open("../../build.mak", "r")
while True:
line = f.readline()
if not line:
break
if line.find("TARGET_NAME")!=-1:
print(line)
G_EXE="../../pjsip-apps/bin/pjsua-" + line.split(":= ")[1]
break
if G_EXE=="":
print("Unable to find ../../../build.mak. Please build pjsip first")
sys.exit(1)
G_INUNIX = True
else:
if sys.platform.lower().find("win32")!=-1 or sys.platform.lower().find("microsoft")!=-1:
G_INUNIX = False
else:
G_INUNIX = True
G_EXE = G_EXE.rstrip("\n\r \t")
###################################
# Poor man's 'expect'-like class
class Expect(threading.Thread):
proc = None
telnet = None
use_telnet = False
echo = False
trace_enabled = False
inst_param = None
rh = re.compile(const.DESTROYED)
ra = re.compile(const.ASSERT, re.I)
rr = re.compile(const.STDOUT_REFRESH)
t0 = time.time()
output = ""
lock = threading.Lock()
running = False
def __init__(self, inst_param):
threading.Thread.__init__(self)
self.inst_param = inst_param
self.name = inst_param.name
self.echo = inst_param.echo_enabled
self.trace_enabled = inst_param.trace_enabled
self.use_telnet = inst_param.telnet_enabled
self.telnet = None
def run(self):
if self.use_telnet:
fullcmd = G_EXE + " " + self.inst_param.arg + " --use-cli --no-cli-console --cli-telnet-port=%d" % (self.inst_param.telnet_port)
self.trace("Popen " + fullcmd)
self.proc = subprocess.Popen(fullcmd, shell=G_INUNIX)
# start telnet-ing to pjsua, raise exception if telnet fails after 5s
t0 = time.time()
while self.proc.poll() is None and self.telnet is None:
try:
time.sleep(0.01)
self.telnet = telnetlib.Telnet('127.0.0.1', port=self.inst_param.telnet_port, timeout=60)
except Exception as e:
t1 = time.time()
dur = int(t1 - t0)
if dur > 5:
raise inc.TestError(self.name + ": Timeout connecting to pjsua: " + repr(e))
self.running = True
while self.proc.poll() is None:
line = self.telnet.read_until(b'\n', 60)
linestr = line.decode('utf-8')
if linestr == "" or const.DESTROYED in linestr:
break;
#Print the line if echo is ON
if self.echo:
try:
print(self.name + ": " + linestr.rstrip())
except UnicodeEncodeError:
print((self.name + ": " + linestr.rstrip()).encode('utf-8'))
self.lock.acquire()
self.output += linestr
self.lock.release()
self.running = False
else:
fullcmd = G_EXE + " " + self.inst_param.arg + " --stdout-refresh=5 --stdout-refresh-text=" + const.STDOUT_REFRESH
if not self.inst_param.enable_buffer:
fullcmd = fullcmd + " --stdout-no-buf"
self.trace("Popen " + fullcmd)
self.proc = subprocess.Popen(fullcmd, shell=G_INUNIX, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False)
self.running = True
while self.proc.poll() is None:
line = self.proc.stdout.readline()
if line == "":
break;
#Print the line if echo is ON
if self.echo:
print(self.name + ": " + line.rstrip())
self.lock.acquire()
self.output += line
self.lock.release()
self.running = False
def send(self, cmd):
self.trace("send " + cmd)
if self.use_telnet:
self.telnet.write((cmd + '\r\n').encode('utf-8'))
else:
self.proc.stdin.writelines(cmd + "\n")
self.proc.stdin.flush()
def expect(self, pattern, raise_on_error=True, title="", timeout=15):
# no prompt for telnet
if self.use_telnet and pattern==const.PROMPT:
return
self.trace("expect " + pattern)
r = re.compile(pattern, re.I)
found_at = -1
t0 = time.time()
while found_at < 0:
self.lock.acquire()
lines = self.output.splitlines()
for i, line in enumerate(lines):
# Search for expected text
if r.search(line) != None:
found_at = i
break
# Trap assertion error
if raise_on_error:
if self.ra.search(line) != None:
self.lock.release()
raise inc.TestError(self.name + ": " + line)
self.output = '\n'.join(lines[found_at+1:])+"\n" if found_at >= 0 else ""
self.lock.release()
if found_at >= 0:
return line
if not self.running:
if raise_on_error:
raise inc.TestError(self.name + ": Premature EOF")
break
else:
t1 = time.time()
dur = int(t1 - t0)
if dur > timeout:
self.trace("Timed-out!")
if raise_on_error:
raise inc.TestError(self.name + " " + title + ": Timeout expecting pattern: \"" + pattern + "\"")
break
else:
time.sleep(0.01)
return None
def get_config(self, key_config):
self.send("dd")
line = self.expect(key_config);
return line
def sync_stdout(self):
if not self.use_telnet:
self.trace("sync_stdout")
cmd = "echo 1" + str(random.randint(1000,9999))
self.send(cmd)
self.expect(cmd)
def wait(self):
self.trace("wait")
self.join()
self.proc.communicate()
if self.telnet:
self.telnet.close()
def trace(self, s):
if self.trace_enabled:
now = time.time()
fmt = self.name + ": " + "================== " + s + " ==================" + " [at t=%(time)03d]"
try:
print(fmt % {'time':int(now - self.t0)})
except UnicodeEncodeError:
print((fmt % {'time':int(now - self.t0)}).encode('utf-8'))
#########################
# Error handling
def handle_error(errmsg, t, close_processes = True):
print("====== Caught error: " + errmsg + " ======")
if (close_processes):
time.sleep(1)
for p in t.process:
# Protect against 'Broken pipe' exception
try:
if not p.use_telnet:
p.send("q")
p.send("q")
else:
p.send("shutdown")
except:
pass
is_err = False
try:
ret = p.expect(const.DESTROYED, False)
if ret is None:
is_err = True
except:
is_err = True
if is_err and p.proc.poll() is None:
if sys.hexversion >= 0x02060000:
p.proc.terminate()
else:
p.wait()
else:
p.wait()
print("Test completed with error: " + errmsg)
sys.exit(1)
#########################
# MAIN
# Import the test script
script = util.load_module_from_file("script", inc.ARGS[0])
# Init random seed
random.seed()
# Validate
if script.test == None:
print("Error: no test defined")
sys.exit(1)
if script.test.skip:
print("Test " + script.test.title + " is skipped")
sys.exit(0)
if len(script.test.inst_params) == 0:
print("Error: test doesn't contain pjsua run descriptions")
sys.exit(1)
# Instantiate pjsuas
print("====== Running " + script.test.title + " ======")
print("Using " + G_EXE + " as pjsua executable")
for inst_param in script.test.inst_params:
retry = 0
process_running = False
while (not process_running) and retry < 3:
p = None
retry += 1
try:
# Create pjsua's Expect instance from the param
p = Expect(inst_param)
p.start()
except inc.TestError as e:
handle_error(e.desc, script.test)
# wait process ready
if not p.use_telnet:
while True:
try:
p.send("echo 1")
except:
time.sleep(0.1)
continue
break
process_running = True
else:
t0 = time.time()
while p.telnet is None:
time.sleep(0.1)
dur = int(time.time() - t0)
if dur > 5:
break
process_running = p.telnet is not None
# wait before retrying
if not process_running and retry < 2:
time.sleep(2)
if not process_running:
handle_error("Failed running pjsua", script.test)
# add running instance
script.test.process.append(p)
for p in script.test.process:
try:
# Wait until registration completes
if p.inst_param.have_reg:
p.send("rr")
p.expect(p.inst_param.uri+".*registration success")
# Synchronize stdout
if not p.use_telnet:
p.send("")
p.expect(const.PROMPT)
p.send("echo 1")
p.expect("echo 1")
except inc.TestError as e:
handle_error(e.desc, script.test)
# Run the test function
if script.test.test_func != None:
try:
script.test.test_func(script.test)
except inc.TestError as e:
handle_error(e.desc, script.test)
except:
handle_error("Unknown error: " + str(traceback.format_exc()), script.test)
# Shutdown all instances
for p in script.test.process:
# Unregister if we have_reg to make sure that next tests
# won't fail
if p.inst_param.have_reg:
p.send("ru")
p.expect(p.inst_param.uri+".*unregistration success")
if p.use_telnet:
p.send("shutdown")
else:
p.send("q")
time.sleep(0.5)
for p in script.test.process:
if p.running:
p.expect(const.DESTROYED, False)
p.wait()
# Run the post test function
if script.test.post_func != None:
try:
script.test.post_func(script.test)
except inc.TestError as e:
handle_error(e.desc, script.test, False)
# Done
print("Test " + script.test.title + " completed successfully")
sys.exit(0)