aboutsummaryrefslogtreecommitdiff
path: root/client/cros/networking/mm1_proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/cros/networking/mm1_proxy.py')
-rw-r--r--client/cros/networking/mm1_proxy.py201
1 files changed, 156 insertions, 45 deletions
diff --git a/client/cros/networking/mm1_proxy.py b/client/cros/networking/mm1_proxy.py
index 3f55c93ddb..496934d0f0 100644
--- a/client/cros/networking/mm1_proxy.py
+++ b/client/cros/networking/mm1_proxy.py
@@ -1,4 +1,5 @@
-# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
+# Lint as: python2, python3
+# Copyright (c) 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -9,6 +10,8 @@ This module provides bindings for ModemManager1.
import dbus
import dbus.mainloop.glib
+import logging
+import time
from autotest_lib.client.bin import utils
from autotest_lib.client.cros.cellular import mm1_constants
@@ -62,13 +65,13 @@ class ModemManager1Proxy(object):
if _is_unknown_dbus_binding_exception(e):
return None
raise ModemManager1ProxyError(
- 'Error connecting to ModemManager1. DBus error: |%s|',
- repr(e))
+ 'Error connecting to ModemManager1. DBus error: |%s|',
+ repr(e))
utils.poll_for_condition(
lambda: _connect_to_mm1(bus) is not None,
exception=ModemManager1ProxyError(
- 'Timed out connecting to ModemManager1'),
+ 'Timed out connecting to ModemManager1'),
timeout=timeout_seconds,
sleep_interval=ModemManager1Proxy.CONNECT_WAIT_INTERVAL_SECONDS)
connection = _connect_to_mm1(bus)
@@ -79,23 +82,78 @@ class ModemManager1Proxy(object):
return connection
-
def __init__(self, bus=None):
if bus is None:
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
self._bus = bus
self._manager = dbus.Interface(
- self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
- mm1_constants.MM1),
- mm1_constants.I_MODEM_MANAGER)
-
+ self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
+ mm1_constants.MM1),
+ mm1_constants.I_MODEM_MANAGER)
+ self._device = None
@property
def manager(self):
"""@return the DBus ModemManager1 Manager object."""
return self._manager
+ def inhibit_device(self, inhibit):
+ """
+
+ Uses Modem Manager InhibitDevice DBus API to inhibit/uninhibit
+ @param inhibit: true to inhibit the modem and false to uninhibit it.
+
+ InhibitDevice API:
+ @uid: the unique ID of the physical device, given in the
+ #org.freedesktop.ModemManager1.Modem:Device property.
+ @inhibit: %TRUE to inhibit the modem and %FALSE to uninhibit it.
+
+ Inhibit or uninhibit the device.
+
+ When the modem is inhibited ModemManager will close all its ports and
+ unexport it from the bus, so that users of the interface are no longer
+ able to operate with it.
+
+ This operation binds the inhibition request to the existence of the
+ caller in the DBus bus. If the caller disappears from the bus, the
+ inhibition will automatically removed.
+ """
+ try:
+ if not self._manager:
+ raise ModemManager1ProxyError(
+ 'Failed to obtain dbus manager object.- No manager')
+ if not inhibit and not self._device:
+ raise ModemManager1ProxyError(
+ 'Uninhibit called before inhibit %s' % self._device)
+
+ if inhibit:
+ modem = self.get_modem()
+ if not modem:
+ raise ModemManager1ProxyError(
+ 'Failed to to obtain dbus manager object. - No modem')
+
+ self._device = modem.properties(
+ mm1_constants.I_MODEM).get('Device')
+
+ logging.debug('device to be inhibited/uninhibited %s', self._device)
+ self._manager.InhibitDevice(dbus.String(self._device), inhibit)
+
+ logging.debug('inhibit=%r done with %s', inhibit, self._device)
+
+ if inhibit:
+ time.sleep(mm1_constants.MM_INHIBIT_PROCESSING_TIME)
+ else:
+ result = self.wait_for_modem(
+ mm1_constants.MM_UNINHIBIT_PROCESSING_TIME)
+
+ time.sleep(mm1_constants.MM_REPROBE_PROCESSING_TIME)
+ if result is None:
+ raise ModemManager1ProxyError('No modem after uninhibit')
+ except dbus.exceptions.DBusException as e:
+ raise ModemManager1ProxyError(
+ 'Failed to to obtain dbus object for the modem.'
+ 'DBus error: %s' % repr(e))
def get_modem(self):
"""
@@ -123,16 +181,16 @@ class ModemManager1Proxy(object):
modems = object_manager.GetManagedObjects()
except dbus.exceptions.DBusException as e:
raise ModemManager1ProxyError(
- 'Failed to list the available modems. DBus error: |%s|',
- repr(e))
+ 'Failed to list the available modems. DBus error: %s' %
+ repr(e))
if not modems:
return None
elif len(modems) > 1:
raise ModemManager1ProxyError(
- 'Expected one modem object, found %d', len(modems))
+ 'Expected one modem object, found %d' % len(modems))
- modem_proxy = ModemProxy(self._bus, modems.keys()[0])
+ modem_proxy = ModemProxy(self._bus, list(modems.keys())[0])
# Check that this object is valid
try:
modem_proxy.modem.GetAll(mm1_constants.I_MODEM,
@@ -142,9 +200,8 @@ class ModemManager1Proxy(object):
if _is_unknown_dbus_binding_exception(e):
return None
raise ModemManager1ProxyError(
- 'Failed to obtain dbus object for the modem. DBus error: '
- '|%s|', repr(e))
-
+ 'Failed to obtain dbus object for the modem. DBus error: %s' %
+ repr(e))
def wait_for_modem(self, timeout_seconds):
"""
@@ -159,58 +216,51 @@ class ModemManager1Proxy(object):
"""
return utils.poll_for_condition(
- self.get_modem,
- exception=ModemManager1ProxyError('No modem found'),
- timeout=timeout_seconds)
+ self.get_modem,
+ exception=ModemManager1ProxyError('No modem found'),
+ timeout=timeout_seconds)
class ModemProxy(object):
"""A wrapper around a DBus proxy for ModemManager1 modem object."""
# Amount of time to wait for a state transition.
- STATE_TRANSITION_WAIT_SECONDS = 10
+ STATE_TRANSITION_WAIT_SECONDS = 60
def __init__(self, bus, path):
self._bus = bus
self._modem = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path)
-
@property
def modem(self):
"""@return the DBus modem object."""
return self._modem
-
@property
def iface_modem(self):
"""@return org.freedesktop.ModemManager1.Modem DBus interface."""
return dbus.Interface(self._modem, mm1_constants.I_MODEM)
-
@property
def iface_simple_modem(self):
"""@return org.freedesktop.ModemManager1.Simple DBus interface."""
return dbus.Interface(self._modem, mm1_constants.I_MODEM_SIMPLE)
-
@property
def iface_gsm_modem(self):
"""@return org.freedesktop.ModemManager1.Modem3gpp DBus interface."""
return dbus.Interface(self._modem, mm1_constants.I_MODEM_3GPP)
-
@property
def iface_cdma_modem(self):
"""@return org.freedesktop.ModemManager1.ModemCdma DBus interface."""
return dbus.Interface(self._modem, mm1_constants.I_MODEM_CDMA)
-
@property
def iface_properties(self):
"""@return org.freedesktop.DBus.Properties DBus interface."""
return dbus.Interface(self._modem, dbus.PROPERTIES_IFACE)
-
def properties(self, iface):
"""Return the properties associated with the specified interface.
@@ -220,7 +270,6 @@ class ModemProxy(object):
"""
return self.iface_properties.GetAll(iface)
-
def get_sim(self):
"""
Return the SIM proxy object associated with this modem.
@@ -231,6 +280,15 @@ class ModemProxy(object):
sim_path = self.properties(mm1_constants.I_MODEM).get('Sim')
if not sim_path:
return None
+ return self.get_sim_at_path(sim_path)
+
+ def get_sim_at_path(self, sim_path):
+ """
+ Return the SIM proxy object associated with the sim_path.
+
+ @return SimProxy object or None if no SIM exists.
+
+ """
sim_proxy = SimProxy(self._bus, sim_path)
# Check that this object is valid
try:
@@ -240,9 +298,67 @@ class ModemProxy(object):
if _is_unknown_dbus_binding_exception(e):
return None
raise ModemManager1ProxyError(
- 'Failed to obtain dbus object for the SIM. DBus error: '
- '|%s|', repr(e))
+ 'Failed to obtain dbus object for the SIM. DBus error: %s' %
+ repr(e))
+
+ def get_sim_slots(self):
+ """
+ The list of SIM slots available in the system, including the SIM object
+ paths if the cards are present. If a given SIM slot at a given index
+ doesn't have a SIM card available, an empty object path will be given.
+
+ The length of this array of objects will be equal to the amount of
+ available SIM slots in the system, and the index in the array is the
+ slot index.
+
+ This list includes the SIM object considered as primary active SIM slot
+ (#org.freedesktop.ModemManager1.Modem.Sim) at index
+ #org.freedesktop.ModemManager1.Modem.ActiveSimSlot.
+
+ @return list of SimSlot paths
+
+ """
+ return self.properties(mm1_constants.I_MODEM).get('SimSlots')
+
+ def get_primary_sim_slot(self):
+ """
+ The index of the primary active SIM slot in the
+ #org.freedesktop.ModemManager1.Modem.SimSlots array, given in the [1,N]
+ range.
+
+ If multiple SIM slots aren't supported, this property will report None
+
+ In a Multi SIM Single Standby setup, this index identifies the only SIM
+ that is currently active. All the remaining slots will be inactive.
+ In a Multi SIM Multi Standby setup, this index identifies the active SIM
+ that is considered primary, i.e. the one that will be used when a data
+ connection is setup.
+
+ @return current primary slot index
+
+ """
+ return self.properties(mm1_constants.I_MODEM).get('PrimarySimSlot')
+
+ def set_primary_slot(self, sim_slot):
+ """
+ Selects which SIM slot to be considered as primary, on devices that
+ expose multiple slots in the #org.freedesktop.ModemManager1.Modem
+ :SimSlots property.
+
+ When the switch happens the modem may require a full device reprobe,
+ so the modem object in DBus will get removed, and recreated once the
+ selected SIM slot is in use.
+
+ There is no limitation on which SIM slot to select, so the user may
+ also set as primary a slot that doesn't currently have any valid SIM
+ card inserted.
+
+ @param: sim_slot: SIM slot number to set as primary.
+ @return: success or raise error
+
+ """
+ self.iface_modem.SetPrimarySimSlot(dbus.UInt32(sim_slot))
def wait_for_states(self, states,
timeout_seconds=STATE_TRANSITION_WAIT_SECONDS):
@@ -266,19 +382,18 @@ class ModemProxy(object):
mm1_constants.MM_MODEM_STATE_DISCONNECTING,
mm1_constants.MM_MODEM_STATE_CONNECTING]:
raise ModemManager1ProxyError(
- 'wait_for_states() does not support transitory states.')
+ 'wait_for_states() does not support transitory states.')
utils.poll_for_condition(
- lambda: self.properties(mm1_constants.I_MODEM)[
- mm1_constants.MM_MODEM_PROPERTY_NAME_STATE] in states,
- exception=ModemManager1ProxyError(
- 'Timed out waiting for modem to enter one of these '
- 'states: %s, current state=%s',
- states,
- self.properties(mm1_constants.I_MODEM)[
- mm1_constants.MM_MODEM_PROPERTY_NAME_STATE]),
- timeout=timeout_seconds)
-
+ lambda: self.properties(mm1_constants.I_MODEM)[
+ mm1_constants.MM_MODEM_PROPERTY_NAME_STATE] in states,
+ exception=ModemManager1ProxyError(
+ 'Timed out waiting for modem to enter one of these '
+ 'states: %s, current state=%s' %
+ (states,
+ self.properties(mm1_constants.I_MODEM)[
+ mm1_constants.MM_MODEM_PROPERTY_NAME_STATE])),
+ timeout=timeout_seconds)
class SimProxy(object):
"""A wrapper around a DBus proxy for ModemManager1 SIM object."""
@@ -287,25 +402,21 @@ class SimProxy(object):
self._bus = bus
self._sim = self._bus.get_object(mm1_constants.I_MODEM_MANAGER, path)
-
@property
def sim(self):
"""@return the DBus SIM object."""
return self._sim
-
@property
def iface_properties(self):
"""@return org.freedesktop.DBus.Properties DBus interface."""
return dbus.Interface(self._sim, dbus.PROPERTIES_IFACE)
-
@property
def iface_sim(self):
"""@return org.freedesktop.ModemManager1.Sim DBus interface."""
return dbus.Interface(self._sim, mm1_constants.I_SIM)
-
def properties(self, iface=mm1_constants.I_SIM):
"""Return the properties associated with the specified interface.