174 lines
5.5 KiB
Python
174 lines
5.5 KiB
Python
|
|
# Quality test of media calls.
|
|
# - UA1 calls UA2
|
|
# - UA1 plays a file until finished to be streamed to UA2
|
|
# - UA2 records from stream
|
|
# - Apply PESQ to played file (reference) and recorded file (degraded)
|
|
#
|
|
# File should be:
|
|
# - naming: xxxxxx.CLOCK_RATE.wav, e.g: test1.8.wav
|
|
# - clock-rate of those files can only be 8khz or 16khz
|
|
|
|
import time
|
|
import os
|
|
import sys
|
|
import re
|
|
import subprocess
|
|
import wave
|
|
import shutil
|
|
import inc_const as const
|
|
import inc_util as util
|
|
|
|
from inc_cfg import *
|
|
|
|
# Load configuration
|
|
cfg_file = util.load_module_from_file("cfg_file", ARGS[1])
|
|
|
|
# PESQ configs
|
|
PESQ = "tools/pesq" # PESQ executable path
|
|
PESQ_DEFAULT_THRESHOLD = 3.4 # Default minimum acceptable PESQ MOS value
|
|
|
|
# PESQ params
|
|
pesq_sample_rate_opt = "" # Sample rate option for PESQ
|
|
input_filename = "" # Input/Reference filename
|
|
output_filename = "" # Output/Degraded filename
|
|
|
|
|
|
# Test body function
|
|
def test_func(t):
|
|
global pesq_sample_rate_opt
|
|
global input_filename
|
|
global output_filename
|
|
|
|
ua1 = t.process[0]
|
|
ua2 = t.process[1]
|
|
|
|
# Get input file name
|
|
input_filename = re.compile(const.MEDIA_PLAY_FILE).search(ua1.inst_param.arg).group(1)
|
|
|
|
# Get output file name
|
|
output_filename = re.compile(const.MEDIA_REC_FILE).search(ua2.inst_param.arg).group(1)
|
|
|
|
# Get WAV input length, in seconds
|
|
fin = wave.open(input_filename, "r")
|
|
if fin == None:
|
|
raise TestError("Failed opening input WAV file")
|
|
inwavlen = fin.getnframes() * 1.0 / fin.getframerate()
|
|
inwavlen += 0.2
|
|
fin.close()
|
|
print("WAV input len = " + str(inwavlen) + "s")
|
|
|
|
# Get clock rate of the output
|
|
mo_clock_rate = re.compile("\.(\d+)\.wav").search(output_filename)
|
|
if (mo_clock_rate==None):
|
|
raise TestError("Cannot compare input & output, incorrect output filename format")
|
|
clock_rate = mo_clock_rate.group(1)
|
|
|
|
# Get channel count of the output
|
|
channel_count = 1
|
|
if re.search("--stereo", ua2.inst_param.arg) != None:
|
|
channel_count = 2
|
|
|
|
# Get matched input file from output file
|
|
# (PESQ evaluates only files whose same clock rate & channel count)
|
|
if channel_count == 2:
|
|
if re.search("\.\d+\.\d+\.wav", input_filename) != None:
|
|
input_filename = re.sub("\.\d+\.\d+\.wav", "." + str(channel_count) + "."+clock_rate+".wav", input_filename)
|
|
else:
|
|
input_filename = re.sub("\.\d+\.wav", "." + str(channel_count) + "."+clock_rate+".wav", input_filename)
|
|
|
|
if (clock_rate != "8") & (clock_rate != "16"):
|
|
raise TestError("PESQ only works on clock rate 8kHz or 16kHz, clock rate used = "+clock_rate+ "kHz")
|
|
|
|
# Get conference clock rate of UA2 for PESQ sample rate option
|
|
pesq_sample_rate_opt = "+" + clock_rate + "000"
|
|
|
|
# UA1 making call
|
|
if ua1.use_telnet:
|
|
ua1.send("call new " + t.inst_params[1].uri)
|
|
else:
|
|
ua1.send("m")
|
|
ua1.send(t.inst_params[1].uri)
|
|
ua1.expect(const.STATE_CALLING)
|
|
|
|
# UA2 wait until call established
|
|
ua2.expect(const.STATE_CONFIRMED)
|
|
|
|
ua1.sync_stdout()
|
|
ua2.sync_stdout()
|
|
time.sleep(2)
|
|
|
|
# Disconnect mic -> rec file, to avoid echo recorded when using sound device
|
|
# Disconnect stream -> spk, make it silent
|
|
# Connect stream -> rec file, start recording
|
|
ua2.send("cd 0 1")
|
|
ua2.send("cd 4 0")
|
|
ua2.send("cc 4 1")
|
|
|
|
# Disconnect mic -> stream, make stream purely sending from file
|
|
# Disconnect stream -> spk, make it silent
|
|
# Connect file -> stream, start sending
|
|
ua1.send("cd 0 4")
|
|
ua1.send("cd 4 0")
|
|
ua1.send("cc 1 4")
|
|
|
|
time.sleep(inwavlen)
|
|
|
|
# Disconnect files from bridge
|
|
ua2.send("cd 4 1")
|
|
ua2.expect(const.MEDIA_DISCONN_PORT_SUCCESS)
|
|
ua1.send("cd 1 4")
|
|
ua1.expect(const.MEDIA_DISCONN_PORT_SUCCESS)
|
|
|
|
|
|
# Post body function
|
|
def post_func(t):
|
|
global pesq_sample_rate_opt
|
|
global input_filename
|
|
global output_filename
|
|
|
|
endpt = t.process[0]
|
|
|
|
# Execute PESQ
|
|
fullcmd = os.path.normpath(PESQ) + " " + pesq_sample_rate_opt + " " + input_filename + " " + output_filename
|
|
endpt.trace("Popen " + fullcmd)
|
|
pesq_proc = subprocess.Popen(fullcmd, shell=True, stdout=subprocess.PIPE, universal_newlines=True)
|
|
pesq_out = pesq_proc.communicate()
|
|
|
|
# Parse ouput
|
|
mo_pesq_out = re.compile("Prediction[^=]+=\s+([\-\d\.]+)\s*").search(pesq_out[0])
|
|
if (mo_pesq_out == None):
|
|
raise TestError("Failed to fetch PESQ result")
|
|
|
|
# Get threshold
|
|
if (cfg_file.pesq_threshold != None) | (cfg_file.pesq_threshold > -0.5 ):
|
|
threshold = cfg_file.pesq_threshold
|
|
else:
|
|
threshold = PESQ_DEFAULT_THRESHOLD
|
|
|
|
# Evaluate the PESQ MOS value
|
|
pesq_res = mo_pesq_out.group(1)
|
|
if (float(pesq_res) >= threshold):
|
|
endpt.trace("Success, PESQ result = " + pesq_res + " (target=" + str(threshold) + ").")
|
|
else:
|
|
endpt.trace("Failed, PESQ result = " + pesq_res + " (target=" + str(threshold) + ").")
|
|
# Save the wav file
|
|
wavoutname = ARGS[1]
|
|
wavoutname = re.sub("[\\\/]", "_", wavoutname)
|
|
wavoutname = re.sub("\.py$", ".wav", wavoutname)
|
|
wavoutname = "logs/" + wavoutname
|
|
try:
|
|
shutil.copyfile(output_filename, wavoutname)
|
|
print("Output WAV is copied to " + wavoutname)
|
|
except:
|
|
print("Couldn't copy output WAV, please check if 'logs' directory exists.")
|
|
|
|
raise TestError("WAV seems to be degraded badly, PESQ = "+ pesq_res + " (target=" + str(threshold) + ").")
|
|
|
|
|
|
# Here where it all comes together
|
|
test = cfg_file.test_param
|
|
test.test_func = test_func
|
|
test.post_func = post_func
|
|
|