aboutsummaryrefslogtreecommitdiff
path: root/server/site_tests/power_USBHotplugInSuspend/power_USBHotplugInSuspend.py
blob: 33c0f3b93340029a3e437dfb0c6d01eba675e3c8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Lint as: python2, python3
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# WARNING(crbug.com/743265): This test is currently broken because the ability
# to run client tests in the background from a server-side test has been
# deleted.

import logging, time

from autotest_lib.server import autotest, test
from autotest_lib.client.common_lib import error

_SUSPEND_TIME = 60
_SUSPEND_TIMEOUT = 30

class power_USBHotplugInSuspend(test.test):

    version = 1

    # Constant to wait in seconds after turning on or off the usb port power.
    USB_POWEROFF_DELAY_S = 2


    def _switch_usbkey_power(self, on):
        """
        Turn on/off the power to the USB key.

        @param on True to turn on, false otherwise.
        """
        if on:
            self._host.servo.set('image_usbkey_pwr', 'on')
        else:
            self._host.servo.set('image_usbkey_pwr', 'off')
        time.sleep(self.USB_POWEROFF_DELAY_S)

    def _get_usb_devices(self):
        """
        Get the USB devices attached to the client.

        Parses output from lsusb and returns the set of device IDs.
        """
        try:
            lines = self._host.run('lsusb').stdout.strip().split('\n')
        except:
            raise error.TestError('Failed to get list of USB devices.')
        devices = set(line.split()[5] for line in lines)
        logging.info('USB Devices: %s' % (",".join(devices)))
        return devices

    def _suspend_client(self):
        """
        Start the client test power_KernelSuspend to suspend the client and
        do not wait for it to finish.
        """
        self._host.suspend_bg(suspend_time=_SUSPEND_TIME)

    def _suspend_and_hotplug(self, insert):
        """
        Suspend the client and add/remove the USB key.  This assumes that a
        USB key is plugged into the servo and is facing the DUT.

        @param insert True to test insertion during suspend, False to test
                      removal.
        """
        # Initialize the USB key and get the set of USB devices before
        # suspending.
        self._switch_usbkey_power(not insert)
        before_suspend = self._get_usb_devices()

        # Suspend the client and wait for it to go down before powering on/off
        # the usb key.
        self._suspend_client()
        if not self._host.ping_wait_down(_SUSPEND_TIMEOUT):
            raise error.TestError('Client failed to suspend.')
        self._switch_usbkey_power(insert)

        # Wait for the client to come back up (suspend time + some slack time).
        # TODO(beeps): Combine the two timeouts in wait_up after
        # crbug.com/221785 is resolved.
        time.sleep(_SUSPEND_TIME)
        if not self._host.wait_up(self._host.RESUME_TIMEOUT):
            raise error.TestError('Client failed to resume.')

        # Get the set of devices plugged in and make sure the change was
        # detected.
        after_suspend = self._get_usb_devices()
        diff = after_suspend ^ before_suspend
        if not diff:
            raise error.TestFail('No USB changes detected after resuming.')

        # Finally, make sure hotplug still works after resuming by switching
        # the USB key's power once more.
        self._switch_usbkey_power(not insert)
        after_hotplug = self._get_usb_devices()
        diff = after_hotplug ^ after_suspend
        if not diff:
            raise error.TestFail('No USB changes detected after hotplugging.')

    def cleanup(self):
        """
        Reset the USB key to its initial state.
        """
        self._host.servo.switch_usbkey(self._init_usbkey_direction)
        self._switch_usbkey_power(self._init_usbkey_power == 'on')
        super(power_USBHotplugInSuspend, self).cleanup()

    def run_once(self, host):
        """
        Tests adding and removing a USB device while the client is suspended.
        """
        self._host = host
        self._init_usbkey_power = self._host.servo.get('image_usbkey_pwr')
        self._init_usbkey_direction = self._host.servo.get_usbkey_state()

        # Make sure the USB key is facing the DUT and is actually present.
        self._host.servo.switch_usbkey('dut')
        self._switch_usbkey_power(False)
        before_insert = self._get_usb_devices()
        self._switch_usbkey_power(True)
        after_insert = self._get_usb_devices()
        diff = after_insert - before_insert
        logging.info('Inserted USB device(s): %s' % (",".join(diff)))
        if not diff:
            raise error.TestError('No new USB devices detected. Is a USB key '
                                  'plugged into the servo?')

        logging.info('Testing insertion during suspend.')
        self._suspend_and_hotplug(True)
        logging.info('Testing removal during suspend.')
        self._suspend_and_hotplug(False)