aboutsummaryrefslogtreecommitdiff
path: root/client/common_lib/cros/cfm/usb/usb_device_collector.py
blob: 78f3c7ad82cb30572733c46bfa04ac13b5394cf9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import six

from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib.cros import textfsm
from autotest_lib.client.common_lib.cros.cfm.usb import usb_device


class UsbDeviceCollector(object):
    """Utility class for obtaining info about connected USB devices."""

    USB_DEVICES_TEMPLATE = (
        'Value Required Vendor ([0-9a-fA-F]+)\n'
        'Value Required ProdID ([0-9A-Fa-f]+)\n'
        'Value Required prev ([0-9a-fA-Z.]+)\n'
        'Value Required Bus ([0-9.]+)\n'
        'Value Required Port ([0-9.]+)\n'
        'Value Required Lev ([0-9.]+)\n'
        'Value Required Dev ([0-9.]+)\n'
        'Value Required Prnt ([0-9.]+)\n'
        'Value Manufacturer (.+)\n'
        'Value Product (.+)\n'
        'Value serialnumber ([0-9a-fA-Z\:\-]+)\n'
        'Value cinterfaces (\d)\n'
        'Value List intindex ([0-9])\n'
        'Value List intdriver ([A-Za-z-\(\)]+)\n\n'
        'Start\n'
        '  ^USB-Device -> Continue.Record\n'
        '  ^T:\s+Bus=${Bus}\s+Lev=${Lev}\s+Prnt=${Prnt}'
        '\s+Port=${Port}.*Dev#=\s*${Dev}.*\n'
        '  ^P:\s+Vendor=${Vendor}\s+ProdID=${ProdID}\sRev=${prev}\n'
        '  ^S:\s+Manufacturer=${Manufacturer}\n'
        '  ^S:\s+Product=${Product}\n'
        '  ^S:\s+SerialNumber=${serialnumber}\n'
        '  ^C:\s+\#Ifs=\s+${cinterfaces}\n'
        '  ^I:\s+If\#=\s+${intindex}.*Driver=${intdriver}\n'
    )

    def __init__(self, host=None):
        """
        Constructor
        @param host: An optional host object if running against a remote host.
        """
        self._host = host

    def _extract_usb_data(self, rawdata):
        """
      Populate usb data into a list of dictionaries.
      @param rawdata The output of "usb-devices" on CfM.
      @returns list of dictionary, example dictionary:
      {'Manufacturer': 'USBest Technology',
      'Product': 'SiS HID Touch Controller',
      'Vendor': '266e',
      'intindex': ['0'],
      'tport': '00',
      'tcnt': '01',
      'serialnumber': '',
      'tlev': '03',
      'tdev': '18',
      'dver': '',
      'intdriver': ['usbhid'],
      'tbus': '01',
      'prev': '03.00',
      'cinterfaces': '1',
      'ProdID': '0110',
      'tprnt': '14'}
      """
        usbdata = []
        rawdata += '\n'
        re_table = textfsm.TextFSM(six.StringIO(self.USB_DEVICES_TEMPLATE))
        fsm_results = re_table.ParseText(rawdata)
        usbdata = [dict(zip(re_table.header, row)) for row in fsm_results]
        return usbdata

    def _collect_usb_device_data(self):
        """Collecting usb device data."""
        run = utils.run if self._host is None else self._host.run
        usb_devices = (run('usb-devices', ignore_status=True).
                       stdout.strip().split('\n\n'))
        return self._extract_usb_data(
            '\nUSB-Device\n'+'\nUSB-Device\n'.join(usb_devices))


    def _create_usb_device(self, usbdata):
        return usb_device.UsbDevice(
            vid=usbdata['Vendor'],
            pid=usbdata['ProdID'],
            product=usbdata.get('Product', 'Not available'),
            interfaces=usbdata['intdriver'],
            bus=int(usbdata['Bus']),
            level=int(usbdata['Lev']),
            # We increment here by 1 because usb-devices reports 0-indexed port
            # numbers where as lsusb reports 1-indexed. We opted to follow the
            # the lsusb standard.
            port=int(usbdata['Port']) + 1)

    def get_usb_devices(self):
        """
        Returns the list of UsbDevices connected to the DUT.
        @returns A list of UsbDevice instances.
        """
        usbdata = self._collect_usb_device_data()
        data_and_devices = []
        for data in usbdata:
            usb_device = self._create_usb_device(data)
            data_and_devices.append((data, usb_device))
        # Make a pass to populate parents of the UsbDevices.
        # We need parent ID and Device ID from the raw data since we do not
        # care about storing those in a UsbDevice. That's why we bother
        # iterating through the (data,UsbDevice) pairs.
        for data, usb_device in data_and_devices:
            parent_id = int(data['Prnt'])
            bus = usb_device.bus
            # Device IDs are not unique across busses. When finding a device's
            # parent we look for a device with the parent ID on the same bus.
            usb_device.parent = self._find_device_on_same_bus(
                    data_and_devices, parent_id, bus)
        return [x[1] for x in data_and_devices]

    def _find_device_on_same_bus(self, data_and_devices, device_id, bus):
        for data, usb_device in data_and_devices:
            if int(data['Dev']) == device_id and usb_device.bus == bus:
                return usb_device
        return None

    def get_devices_by_spec(self, *specs):
        """
        Returns all UsbDevices that match the any of the given specs.
        @param specs instances of UsbDeviceSpec.
        @returns a list UsbDevice instances.
        """
        spec_vid_pids = [spec.vid_pid for spec in specs]
        return [d for d in self.get_usb_devices()
                if d.vid_pid in spec_vid_pids]