Skip to content
Snippets Groups Projects
Commit 15a5c0a9 authored by Andreas Antener's avatar Andreas Antener
Browse files

cleaned up direct tests, fixed various lint warnings and removed unused code

parent c4a81f66
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,6 @@
#
PKG = 'px4'
import sys
import unittest
import rospy
......@@ -50,14 +49,18 @@ from manual_input import ManualInput
#
class ManualInputTest(unittest.TestCase):
def setUp(self):
self.actuator_status = actuator_armed()
self.control_mode = vehicle_control_mode()
#
# General callback functions used in tests
#
def actuator_armed_callback(self, data):
self.actuatorStatus = data
self.actuator_status = data
def vehicle_control_mode_callback(self, data):
self.controlMode = data
self.control_mode = data
#
# Test arming
......@@ -67,19 +70,19 @@ class ManualInputTest(unittest.TestCase):
rospy.Subscriber('iris/actuator_armed', actuator_armed, self.actuator_armed_callback)
rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback)
man = ManualInput()
man_in = ManualInput()
# Test arming
man.arm()
self.assertEquals(self.actuatorStatus.armed, True, "did not arm")
man_in.arm()
self.assertEquals(self.actuator_status.armed, True, "did not arm")
# Test posctl
man.posctl()
self.assertTrue(self.controlMode.flag_control_position_enabled, "flag_control_position_enabled is not set")
man_in.posctl()
self.assertTrue(self.control_mode.flag_control_position_enabled, "flag_control_position_enabled is not set")
# Test offboard
man.offboard()
self.assertTrue(self.controlMode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set")
man_in.offboard()
self.assertTrue(self.control_mode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set")
if __name__ == '__main__':
......
......@@ -37,7 +37,6 @@
#
PKG = 'px4'
import sys
import unittest
import rospy
......@@ -46,11 +45,8 @@ import numpy as np
from px4.msg import vehicle_local_position
from px4.msg import vehicle_control_mode
from px4.msg import actuator_armed
from px4.msg import position_setpoint_triplet
from px4.msg import position_setpoint
from sensor_msgs.msg import Joy
from std_msgs.msg import Header
from manual_input import ManualInput
from flight_path_assertion import FlightPathAssertion
......@@ -68,31 +64,34 @@ class DirectOffboardPosctlTest(unittest.TestCase):
rospy.init_node('test_node', anonymous=True)
rospy.Subscriber('iris/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback)
rospy.Subscriber("iris/vehicle_local_position", vehicle_local_position, self.position_callback)
self.pubSpt = rospy.Publisher('iris/position_setpoint_triplet', position_setpoint_triplet, queue_size=10)
self.pub_spt = rospy.Publisher('iris/position_setpoint_triplet', position_setpoint_triplet, queue_size=10)
self.rate = rospy.Rate(10) # 10hz
self.has_pos = False
self.local_position = vehicle_local_position()
self.control_mode = vehicle_control_mode()
def tearDown(self):
if (self.fpa):
if self.fpa:
self.fpa.stop()
#
# General callback functions used in tests
#
def position_callback(self, data):
self.hasPos = True
self.localPosition = data
self.has_pos = True
self.local_position = data
def vehicle_control_mode_callback(self, data):
self.controlMode = data
self.control_mode = data
#
# Helper methods
#
def is_at_position(self, x, y, z, offset):
rospy.logdebug("current position %f, %f, %f" % (self.localPosition.x, self.localPosition.y, self.localPosition.z))
rospy.logdebug("current position %f, %f, %f" % (self.local_position.x, self.local_position.y, self.local_position.z))
desired = np.array((x, y, z))
pos = np.array((self.localPosition.x, self.localPosition.y, self.localPosition.z))
pos = np.array((self.local_position.x, self.local_position.y, self.local_position.z))
return linalg.norm(desired - pos) < offset
def reach_position(self, x, y, z, timeout):
......@@ -105,12 +104,12 @@ class DirectOffboardPosctlTest(unittest.TestCase):
pos.position_valid = True
stp = position_setpoint_triplet()
stp.current = pos
self.pubSpt.publish(stp)
self.pub_spt.publish(stp)
# does it reach the position in X seconds?
count = 0
while(count < timeout):
if(self.is_at_position(pos.x, pos.y, pos.z, 0.5)):
while count < timeout:
if self.is_at_position(pos.x, pos.y, pos.z, 0.5):
break
count = count + 1
self.rate.sleep()
......@@ -121,22 +120,22 @@ class DirectOffboardPosctlTest(unittest.TestCase):
# Test offboard position control
#
def test_posctl(self):
manIn = ManualInput()
man_in = ManualInput()
# arm and go into offboard
manIn.arm()
manIn.offboard()
self.assertTrue(self.controlMode.flag_armed, "flag_armed is not set")
self.assertTrue(self.controlMode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set")
self.assertTrue(self.controlMode.flag_control_position_enabled, "flag_control_position_enabled is not set")
man_in.arm()
man_in.offboard()
self.assertTrue(self.control_mode.flag_armed, "flag_armed is not set")
self.assertTrue(self.control_mode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set")
self.assertTrue(self.control_mode.flag_control_position_enabled, "flag_control_position_enabled is not set")
# prepare flight path
positions = (
(0,0,0),
(2,2,-2),
(2,-2,-2),
(-2,-2,-2),
(2,2,-2))
(0, 0, 0),
(2, 2, -2),
(2, -2, -2),
(-2, -2, -2),
(2, 2, -2))
# flight path assertion
self.fpa = FlightPathAssertion(positions, 1, 0)
......@@ -147,12 +146,10 @@ class DirectOffboardPosctlTest(unittest.TestCase):
self.assertFalse(self.fpa.failed, "breached flight path tunnel (%d)" % i)
# does it hold the position for Y seconds?
positionHeld = True
count = 0
timeout = 50
while(count < timeout):
if(not self.is_at_position(2, 2, -2, 0.5)):
positionHeld = False
while count < timeout:
if not self.is_at_position(2, 2, -2, 0.5):
break
count = count + 1
self.rate.sleep()
......
......@@ -35,7 +35,6 @@
#
# @author Andreas Antener <andreas@uaventure.com>
#
import sys
import rospy
import threading
......@@ -48,7 +47,6 @@ from geometry_msgs.msg import Twist
from numpy import linalg
import numpy as np
import math
#
# Helper to test if vehicle stays on expected flight path.
......@@ -62,30 +60,52 @@ class FlightPathAssertion(threading.Thread):
# TODO: yaw validation
# TODO: fail main test thread
#
def __init__(self, positions, tunnelRadius = 1, yawOffset = 0.2):
def __init__(self, positions, tunnelRadius=1, yaw_offset=0.2):
threading.Thread.__init__(self)
rospy.Subscriber("px4_multicopter/vehicle_local_position", vehicle_local_position, self.position_callback)
self.spawnModel = rospy.ServiceProxy('gazebo/spawn_sdf_model', SpawnModel)
self.setModelState = rospy.ServiceProxy('gazebo/set_model_state', SetModelState)
self.deleteModel = rospy.ServiceProxy('gazebo/delete_model', DeleteModel)
self.spawn_model = rospy.ServiceProxy('gazebo/spawn_sdf_model', SpawnModel)
self.set_model_state = rospy.ServiceProxy('gazebo/set_model_state', SetModelState)
self.delete_model = rospy.ServiceProxy('gazebo/delete_model', DeleteModel)
self.positions = positions
self.tunnelRadius = tunnelRadius
self.yawOffset = yawOffset
self.hasPos = False
self.shouldStop = False
self.tunnel_radius = tunnelRadius
self.yaw_offset = yaw_offset
self.has_pos = False
self.should_stop = False
self.center = positions[0]
self.endOfSegment = False
self.end_of_segment = False
self.failed = False
self.local_position = vehicle_local_position
def position_callback(self, data):
self.hasPos = True
self.localPosition = data
self.has_pos = True
self.local_position = data
def spawn_indicator(self):
self.deleteModel("indicator")
xml = "<?xml version='1.0'?><sdf version='1.4'><model name='indicator'><static>true</static><link name='link'><visual name='visual'><transparency>0.7</transparency><geometry><sphere><radius>%f</radius></sphere></geometry><material><ambient>1 0 0 0.5</ambient><diffuse>1 0 0 0.5</diffuse></material></visual></link></model></sdf>" % self.tunnelRadius
self.spawnModel("indicator", xml, "", Pose(), "")
self.delete_model("indicator")
xml = (
"<?xml version='1.0'?>" +
"<sdf version='1.4'>" +
"<model name='indicator'>" +
"<static>true</static>" +
"<link name='link'>" +
"<visual name='visual'>" +
"<transparency>0.7</transparency>" +
"<geometry>" +
"<sphere>" +
"<radius>%f</radius>" +
"</sphere>" +
"</geometry>" +
"<material>" +
"<ambient>1 0 0 0.5</ambient>" +
"<diffuse>1 0 0 0.5</diffuse>" +
"</material>" +
"</visual>" +
"</link>" +
"</model>" +
"</sdf>" % self.tunnel_radius)
self.spawn_model("indicator", xml, "", Pose(), "")
def position_indicator(self):
state = SetModelState()
......@@ -97,7 +117,7 @@ class FlightPathAssertion(threading.Thread):
state.pose = pose
state.twist = Twist()
state.reference_frame = ""
self.setModelState(state)
self.set_model_state(state)
def distance_to_line(self, a, b, pos):
v = b - a
......@@ -111,7 +131,7 @@ class FlightPathAssertion(threading.Thread):
c2 = np.dot(v, v)
if c2 <= c1: # after b
self.center = b
self.endOfSegment = True
self.end_of_segment = True
return linalg.norm(pos - b)
x = c1 / c2
......@@ -120,7 +140,7 @@ class FlightPathAssertion(threading.Thread):
return linalg.norm(pos - l)
def stop(self):
self.shouldStop = True
self.should_stop = True
def run(self):
rate = rospy.Rate(10) # 10hz
......@@ -128,43 +148,43 @@ class FlightPathAssertion(threading.Thread):
current = 0
while not self.shouldStop:
if (self.hasPos):
while not self.should_stop:
if self.has_pos:
# calculate distance to line segment between first two points
# if distances > tunnelRadius
# if distances > tunnel_radius
# exit with error
# advance current pos if not on the line anymore or distance to next point < tunnelRadius
# advance current pos if not on the line anymore or distance to next point < tunnel_radius
# exit if current pos is now the last position
self.position_indicator()
pos = np.array((self.localPosition.x,
self.localPosition.y,
self.localPosition.z))
aPos = np.array((self.positions[current][0],
self.positions[current][1],
self.positions[current][2]))
bPos = np.array((self.positions[current + 1][0],
self.positions[current + 1][1],
self.positions[current + 1][2]))
pos = np.array((self.local_position.x,
self.local_position.y,
self.local_position.z))
a_pos = np.array((self.positions[current][0],
self.positions[current][1],
self.positions[current][2]))
b_pos = np.array((self.positions[current + 1][0],
self.positions[current + 1][1],
self.positions[current + 1][2]))
dist = self.distance_to_line(aPos, bPos, pos)
bDist = linalg.norm(pos - bPos)
dist = self.distance_to_line(a_pos, b_pos, pos)
b_dist = linalg.norm(pos - b_pos)
rospy.logdebug("distance to line: %f, distance to end: %f" % (dist, bDist))
rospy.logdebug("distance to line: %f, distance to end: %f" % (dist, b_dist))
if (dist > self.tunnelRadius):
msg = "left tunnel at position (%f, %f, %f)" % (self.localPosition.x, self.localPosition.y, self.localPosition.z)
if dist > self.tunnel_radius:
msg = "left tunnel at position (%f, %f, %f)" % (self.local_position.x, self.local_position.y, self.local_position.z)
rospy.logerr(msg)
self.failed = True
break
if (self.endOfSegment or bDist < self.tunnelRadius):
if self.end_of_segment or b_dist < self.tunnel_radius:
rospy.loginfo("next segment")
self.endOfSegment = False
self.end_of_segment = False
current = current + 1
if (current == len(self.positions) - 1):
if current == len(self.positions) - 1:
rospy.loginfo("no more positions")
break
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment