aboutsummaryrefslogtreecommitdiff
path: root/client/common_lib/cros/dbus_send.py
blob: dd4ec7e91f53ca2c353f1967bcc8470c39dff179 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import collections
import dbus
import logging
import pipes
import re
import shlex

from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error


# Represents the result of a dbus-send call.  |sender| refers to the temporary
# bus name of dbus-send, |responder| to the remote process, and |response|
# contains the parsed response.
DBusSendResult = collections.namedtuple('DBusSendResult', ['sender',
                                                           'responder',
                                                           'response'])
# Used internally.
DictEntry = collections.namedtuple('DictEntry', ['key', 'value'])


def _build_token_stream(headerless_dbus_send_output):
    """A tokenizer for dbus-send output.

    The output is basically just like splitting on whitespace, except that
    strings are kept together by " characters.

    @param headerless_dbus_send_output: list of lines of dbus-send output
            without the meta-information prefix.
    @return list of tokens in dbus-send output.
    """
    return shlex.split(' '.join(headerless_dbus_send_output))


def _parse_value(token_stream):
    """Turn a stream of tokens from dbus-send output into native python types.

    @param token_stream: output from _build_token_stream() above.

    """
    if len(token_stream) == 0:
      # Return None for dbus-send output with no return values.
      return None
    # Assumes properly tokenized output (strings with spaces handled).
    # Assumes tokens are pre-stripped
    token_type = token_stream.pop(0)
    if token_type == 'variant':
        token_type = token_stream.pop(0)
    if token_type == 'object':
        token_type = token_stream.pop(0)  # Should be 'path'
    token_value = token_stream.pop(0)
    INT_TYPES = ('int16', 'uint16', 'int32', 'uint32',
                 'int64', 'uint64', 'byte')
    if token_type in INT_TYPES:
        return int(token_value)
    if token_type == 'string' or token_type == 'path':
        return token_value  # shlex removed surrounding " chars.
    if token_type == 'boolean':
        return token_value == 'true'
    if token_type == 'double':
        return float(token_value)
    if token_type == 'array':
        values = []
        while token_stream[0] != ']':
            values.append(_parse_value(token_stream))
        token_stream.pop(0)
        if values and all([isinstance(x, DictEntry) for x in values]):
            values = dict(values)
        return values
    if token_type == 'dict':
        assert token_value == 'entry('
        key = _parse_value(token_stream)
        value = _parse_value(token_stream)
        assert token_stream.pop(0) == ')'
        return DictEntry(key=key, value=value)
    raise error.TestError('Unhandled DBus type found: %s' % token_type)


def _parse_dbus_send_output(dbus_send_stdout):
    """Turn dbus-send output into usable Python types.

    This looks like:

    localhost ~ # dbus-send --system --dest=org.chromium.flimflam \
            --print-reply --reply-timeout=2000 / \
            org.chromium.flimflam.Manager.GetProperties
    method return time=1490931987.170070 sender=org.chromium.flimflam -> \
        destination=:1.37 serial=6 reply_serial=2
       array [
          dict entry(
             string "ActiveProfile"
             variant             string "/profile/default"
          )
          dict entry(
             string "ArpGateway"
             variant             boolean true
          )
          ...
       ]

    @param dbus_send_output: string stdout from dbus-send
    @return a DBusSendResult.

    """
    lines = dbus_send_stdout.strip().splitlines()
    # The first line contains meta-information about the response
    header = lines[0]
    lines = lines[1:]
    dbus_address_pattern = r'[:\d\\.]+|[a-zA-Z.]+'
    # The header may or may not have a time= field.
    match = re.match(r'method return (time=[\d\\.]+ )?sender=(%s) -> '
                     r'destination=(%s) serial=\d+ reply_serial=\d+' %
                     (dbus_address_pattern, dbus_address_pattern), header)

    if match is None:
        raise error.TestError('Could not parse dbus-send header: %s' % header)

    sender = match.group(2)
    responder = match.group(3)
    token_stream = _build_token_stream(lines)
    ret_val = _parse_value(token_stream)
    # Note that DBus permits multiple response values, and this is not handled.
    logging.debug('Got DBus response: %r', ret_val)
    return DBusSendResult(sender=sender, responder=responder, response=ret_val)


def _dbus2string(raw_arg):
    """Turn a dbus.* type object into a string that dbus-send expects.

    @param raw_dbus dbus.* type object to stringify.
    @return string suitable for dbus-send.

    """
    int_map = {
            dbus.Int16: 'int16:',
            dbus.Int32: 'int32:',
            dbus.Int64: 'int64:',
            dbus.UInt16: 'uint16:',
            dbus.UInt32: 'uint32:',
            dbus.UInt64: 'uint64:',
            dbus.Double: 'double:',
            dbus.Byte: 'byte:',
    }

    if isinstance(raw_arg, dbus.String):
        return pipes.quote('string:%s' % raw_arg.replace('"', r'\"'))

    if isinstance(raw_arg, dbus.Boolean):
        if raw_arg:
            return 'boolean:true'
        else:
            return 'boolean:false'

    for prim_type, prefix in int_map.iteritems():
        if isinstance(raw_arg, prim_type):
            return prefix + str(raw_arg)

    raise error.TestError('No support for serializing %r' % raw_arg)


def _build_arg_string(raw_args):
    """Construct a string of arguments to a DBus method as dbus-send expects.

    @param raw_args list of dbus.* type objects to seriallize.
    @return string suitable for dbus-send.

    """
    return ' '.join([_dbus2string(arg) for arg in raw_args])


def dbus_send(bus_name, interface, object_path, method_name, args=None,
              host=None, timeout_seconds=2, tolerate_failures=False, user=None):
    """Call dbus-send without arguments.

    @param bus_name: string identifier of DBus connection to send a message to.
    @param interface: string DBus interface of object to call method on.
    @param object_path: string DBus path of remote object to call method on.
    @param method_name: string name of method to call.
    @param args: optional list of arguments.  Arguments must be of types
            from the python dbus module.
    @param host: An optional host object if running against a remote host.
    @param timeout_seconds: number of seconds to wait for a response.
    @param tolerate_failures: boolean True to ignore problems receiving a
            response.
    @param user: An option argument to run dbus-send as a given user.

    """
    run = utils.run if host is None else host.run
    cmd = ('dbus-send --system --print-reply --reply-timeout=%d --dest=%s '
           '%s %s.%s' % (int(timeout_seconds * 1000), bus_name,
                         object_path, interface, method_name))

    if user is not None:
        cmd = ('sudo -u %s %s' % (user, cmd))
    if args is not None:
        cmd = cmd + ' ' + _build_arg_string(args)
    result = run(cmd, ignore_status=tolerate_failures)
    if result.exit_status != 0:
        logging.debug('%r', result.stdout)
        return None
    return _parse_dbus_send_output(result.stdout)


def get_property(bus_name, interface, object_path, property_name, host=None):
    """A helpful wrapper that extracts the value of a DBus property.

    @param bus_name: string identifier of DBus connection to send a message to.
    @param interface: string DBus interface exposing the property.
    @param object_path: string DBus path of remote object to call method on.
    @param property_name: string name of property to get.
    @param host: An optional host object if running against a remote host.

    """
    return dbus_send(bus_name, dbus.PROPERTIES_IFACE, object_path, 'Get',
                     args=[dbus.String(interface), dbus.String(property_name)],
                     host=host)