aboutsummaryrefslogtreecommitdiff
path: root/server/site_tests/kernel_ExternalUsbPeripheralsDetectionTest/kernel_ExternalUsbPeripheralsDetectionTest.py
blob: a1ca7bc8960118760c943e4a9d900be8ce7ff6b8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging, os, re, time

from autotest_lib.server import test
from autotest_lib.client.common_lib import error

_WAIT_DELAY = 25
_USB_DIR = '/sys/bus/usb/devices'

class kernel_ExternalUsbPeripheralsDetectionTest(test.test):
    """Uses servo to repeatedly connect/remove USB devices during boot."""
    version = 1


    def set_hub_power(self, on=True):
        """Setting USB hub power status

        @param: on To power on the servo-usb hub or not

        """
        reset = 'off'
        if not on:
            reset = 'on'
        self.host.servo.set('dut_hub1_rst1', reset)
        self.pluged_status = on
        time.sleep(_WAIT_DELAY)


    def check_usb_peripherals_details(self):
        """Checks the effect from plugged in USB peripherals.

        @returns True if command line output is matched successfuly; Else False
        """
        failed = list()
        for cmd in self.usb_checks.keys():
            out_match_list = self.usb_checks.get(cmd)
            logging.info('Running %s',  cmd)

            # Run the usb check command
            cmd_out_lines = (self.host.run(cmd, ignore_status=True).
                             stdout.strip().split('\n'))
            for out_match in out_match_list:
                match_result = False
                for cmd_out_line in cmd_out_lines:
                    match_result = (match_result or
                        re.search(out_match, cmd_out_line) != None)
                if not match_result:
                    failed.append((cmd,out_match))
        return failed


    def get_usb_device_dirs(self):
        """Gets the usb device dirs from _USB_DIR path.

        @returns list with number of device dirs else None
        """
        usb_dir_list = []
        cmd = 'ls -1 %s' % _USB_DIR
        tmp = self.host.run(cmd).stdout.strip().split('\n')
        for d in tmp:
            usb_dir_list.append(os.path.join(_USB_DIR, d))
        return usb_dir_list


    def get_vendor_id_dict_from_dut(self, dir_list):
        """Finds the vendor id from provided dir list.

        @param dir_list: full path of directories
        @returns dict of all vendor ids vs file path
        """
        vendor_id_dict = dict()
        for d in dir_list:
            file_name = os.path.join(d, 'idVendor')
            if self._exists_on(file_name):
                vendor_id = self.host.run('cat %s' % file_name).stdout.strip()
                if vendor_id:
                    vendor_id_dict[vendor_id] = d
        logging.info('%s', vendor_id_dict)
        return vendor_id_dict


    def _exists_on(self, path):
        """Checks if file exists on host or not.

        @returns True or False
        """
        return self.host.run('ls %s' % path,
                             ignore_status=True).exit_status == 0



    def run_once(self, host, usb_checks=None,
                 vendor_id_dict_control_file=None):
        """Main function to run the autotest.

        @param host: name of the host
        @param usb_checks: dictionary defined in control file
        @param vendor_id_list: dictionary defined in control file
        """
        self.host = host
        self.usb_checks = usb_checks

        self.host.servo.switch_usbkey('dut')
        self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
        time.sleep(_WAIT_DELAY)

        self.set_hub_power(False)
        # Collect the USB devices directories before switching on hub
        usb_list_dir_off = self.get_usb_device_dirs()

        self.set_hub_power(True)
        # Collect the USB devices directories after switching on hub
        usb_list_dir_on = self.get_usb_device_dirs()

        diff_list = list(set(usb_list_dir_on).difference(set(usb_list_dir_off)))
        if len(diff_list) == 0:
            # Fail if no devices detected after
            raise error.TestError('No connected devices were detected. Make '
                                  'sure the devices are connected to USB_KEY '
                                  'and DUT_HUB1_USB on the servo board.')
        logging.debug('Connected devices list: %s', diff_list)

        # Test 1: check USB peripherals info in detail
        failed = self.check_usb_peripherals_details()
        if len(failed)> 0:
            raise error.TestError('USB device not detected %s', str(failed))

        # Test 2: check USB device dir under /sys/bus/usb/devices
        vendor_ids = {}
        # Gets a dict idVendor: dir_path
        vendor_ids = self.get_vendor_id_dict_from_dut(diff_list)
        for vid in vendor_id_dict_control_file.keys():
            peripheral = vendor_id_dict_control_file[vid]
            if vid not in vendor_ids.keys():
                raise error.TestFail('%s is not detected at %s dir'
                                     % (peripheral, _USB_DIR))
            else:
            # Test 3: check driver symlink and dir for each USB device
                tmp_list = [device_dir for device_dir in
                            self.host.run('ls -1 %s' % vendor_ids[vid],
                            ignore_status=True).stdout.split('\n')
                            if re.match(r'\d-\d.*:\d\.\d', device_dir)]
                if not tmp_list:
                    raise error.TestFail('No driver created/loaded for %s'
                                         % peripheral)
                logging.info('---- Drivers for %s ----', peripheral)
                flag = False
                for device_dir in tmp_list:
                    driver_path = os.path.join(vendor_ids[vid],
                                               '%s/driver' % device_dir)
                    if self._exists_on(driver_path):
                        flag = True
                        link = (self.host.run('ls -l %s | grep ^l'
                                              '| grep driver'
                                              % driver_path, ignore_status=True)
                                              .stdout.strip())
                        logging.info('%s', link)
                if not flag:
                    raise error.TestFail('Driver for %s is not loaded - %s'
                                         % (peripheral, driver_path))