aboutsummaryrefslogtreecommitdiff
path: root/bumble/smp.py
diff options
context:
space:
mode:
Diffstat (limited to 'bumble/smp.py')
-rw-r--r--bumble/smp.py292
1 files changed, 219 insertions, 73 deletions
diff --git a/bumble/smp.py b/bumble/smp.py
index f8bba40..73fd439 100644
--- a/bumble/smp.py
+++ b/bumble/smp.py
@@ -27,6 +27,7 @@ import logging
import asyncio
import enum
import secrets
+from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
@@ -53,6 +54,7 @@ from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
BT_LE_TRANSPORT,
+ AdvertisingData,
ProtocolError,
name_or_number,
)
@@ -185,8 +187,8 @@ SMP_KEYPRESS_AUTHREQ = 0b00010000
SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt
-SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
-SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032')
+SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
+SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
# fmt: on
# pylint: enable=line-too-long
@@ -564,6 +566,54 @@ class PairingMethod(enum.IntEnum):
# -----------------------------------------------------------------------------
+class OobContext:
+ """Cryptographic context for LE SC OOB pairing."""
+
+ ecc_key: crypto.EccKey
+ r: bytes
+
+ def __init__(
+ self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None
+ ) -> None:
+ self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key
+ self.r = crypto.r() if r is None else r
+
+ def share(self) -> OobSharedData:
+ pkx = self.ecc_key.x[::-1]
+ return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
+
+
+# -----------------------------------------------------------------------------
+class OobLegacyContext:
+ """Cryptographic context for LE Legacy OOB pairing."""
+
+ tk: bytes
+
+ def __init__(self, tk: Optional[bytes] = None) -> None:
+ self.tk = crypto.r() if tk is None else tk
+
+
+# -----------------------------------------------------------------------------
+@dataclass
+class OobSharedData:
+ """Shareable data for LE SC OOB pairing."""
+
+ c: bytes
+ r: bytes
+
+ def to_ad(self) -> AdvertisingData:
+ return AdvertisingData(
+ [
+ (AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c),
+ (AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r),
+ ]
+ )
+
+ def __str__(self) -> str:
+ return f'OOB(C={self.c.hex()}, R={self.r.hex()})'
+
+
+# -----------------------------------------------------------------------------
class Session:
# I/O Capability to pairing method decision matrix
#
@@ -627,6 +677,13 @@ class Session:
},
}
+ ea: bytes
+ eb: bytes
+ ltk: bytes
+ preq: bytes
+ pres: bytes
+ tk: bytes
+
def __init__(
self,
manager: Manager,
@@ -636,17 +693,10 @@ class Session:
) -> None:
self.manager = manager
self.connection = connection
- self.preq: Optional[bytes] = None
- self.pres: Optional[bytes] = None
- self.ea = None
- self.eb = None
- self.tk = bytes(16)
- self.r = bytes(16)
self.stk = None
- self.ltk = None
self.ltk_ediv = 0
self.ltk_rand = bytes(8)
- self.link_key = None
+ self.link_key: Optional[bytes] = None
self.initiator_key_distribution: int = 0
self.responder_key_distribution: int = 0
self.peer_random_value: Optional[bytes] = None
@@ -659,7 +709,7 @@ class Session:
self.peer_bd_addr: Optional[Address] = None
self.peer_signature_key = None
self.peer_expected_distributions: List[Type[SMP_Command]] = []
- self.dh_key = None
+ self.dh_key = b''
self.confirm_value = None
self.passkey: Optional[int] = None
self.passkey_ready = asyncio.Event()
@@ -712,8 +762,8 @@ class Session:
self.io_capability = pairing_config.delegate.io_capability
self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
- # OOB (not supported yet)
- self.oob = False
+ # OOB
+ self.oob_data_flag = 0 if pairing_config.oob is None else 1
# Set up addresses
self_address = connection.self_address
@@ -729,9 +779,35 @@ class Session:
self.ia = bytes(peer_address)
self.iat = 1 if peer_address.is_random else 0
+ # Select the ECC key, TK and r initial value
+ if pairing_config.oob:
+ self.peer_oob_data = pairing_config.oob.peer_data
+ if pairing_config.sc:
+ if pairing_config.oob.our_context is None:
+ raise ValueError(
+ "oob pairing config requires a context when sc is True"
+ )
+ self.r = pairing_config.oob.our_context.r
+ self.ecc_key = pairing_config.oob.our_context.ecc_key
+ if pairing_config.oob.legacy_context is not None:
+ self.tk = pairing_config.oob.legacy_context.tk
+ else:
+ if pairing_config.oob.legacy_context is None:
+ raise ValueError(
+ "oob pairing config requires a legacy context when sc is False"
+ )
+ self.r = bytes(16)
+ self.ecc_key = manager.ecc_key
+ self.tk = pairing_config.oob.legacy_context.tk
+ else:
+ self.peer_oob_data = None
+ self.r = bytes(16)
+ self.ecc_key = manager.ecc_key
+ self.tk = bytes(16)
+
@property
def pkx(self) -> Tuple[bytes, bytes]:
- return (bytes(reversed(self.manager.ecc_key.x)), self.peer_public_key_x)
+ return (self.ecc_key.x[::-1], self.peer_public_key_x)
@property
def pka(self) -> bytes:
@@ -768,7 +844,10 @@ class Session:
return None
def decide_pairing_method(
- self, auth_req: int, initiator_io_capability: int, responder_io_capability: int
+ self,
+ auth_req: int,
+ initiator_io_capability: int,
+ responder_io_capability: int,
) -> None:
if self.connection.transport == BT_BR_EDR_TRANSPORT:
self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
@@ -909,7 +988,7 @@ class Session:
command = SMP_Pairing_Request_Command(
io_capability=self.io_capability,
- oob_data_flag=0,
+ oob_data_flag=self.oob_data_flag,
auth_req=self.auth_req,
maximum_encryption_key_size=16,
initiator_key_distribution=self.initiator_key_distribution,
@@ -921,7 +1000,7 @@ class Session:
def send_pairing_response_command(self) -> None:
response = SMP_Pairing_Response_Command(
io_capability=self.io_capability,
- oob_data_flag=0,
+ oob_data_flag=self.oob_data_flag,
auth_req=self.auth_req,
maximum_encryption_key_size=16,
initiator_key_distribution=self.initiator_key_distribution,
@@ -982,8 +1061,8 @@ class Session:
def send_public_key_command(self) -> None:
self.send_command(
SMP_Pairing_Public_Key_Command(
- public_key_x=bytes(reversed(self.manager.ecc_key.x)),
- public_key_y=bytes(reversed(self.manager.ecc_key.y)),
+ public_key_x=self.ecc_key.x[::-1],
+ public_key_y=self.ecc_key.y[::-1],
)
)
@@ -1011,7 +1090,7 @@ class Session:
# We can now encrypt the connection with the short term key, so that we can
# distribute the long term and/or other keys over an encrypted connection
self.manager.device.host.send_command_sync(
- HCI_LE_Enable_Encryption_Command( # type: ignore[call-arg]
+ HCI_LE_Enable_Encryption_Command(
connection_handle=self.connection.handle,
random_number=bytes(8),
encrypted_diversifier=0,
@@ -1019,18 +1098,56 @@ class Session:
)
)
- async def derive_ltk(self) -> None:
- link_key = await self.manager.device.get_link_key(self.connection.peer_address)
- assert link_key is not None
+ @classmethod
+ def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes:
+ '''Derives Long Term Key from Link Key.
+
+ Args:
+ link_key: BR/EDR Link Key bytes in little-endian.
+ ct2: whether ct2 is supported on both devices.
+ Returns:
+ LE Long Tern Key bytes in little-endian.
+ '''
ilk = (
crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
- if self.ct2
+ if ct2
else crypto.h6(link_key, b'tmp2')
)
- self.ltk = crypto.h6(ilk, b'brle')
+ return crypto.h6(ilk, b'brle')
+
+ @classmethod
+ def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes:
+ '''Derives Link Key from Long Term Key.
+
+ Args:
+ ltk: LE Long Term Key bytes in little-endian.
+ ct2: whether ct2 is supported on both devices.
+ Returns:
+ BR/EDR Link Key bytes in little-endian.
+ '''
+ ilk = (
+ crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk)
+ if ct2
+ else crypto.h6(ltk, b'tmp1')
+ )
+ return crypto.h6(ilk, b'lebr')
- def distribute_keys(self) -> None:
+ async def get_link_key_and_derive_ltk(self) -> None:
+ '''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
+ self.link_key = await self.manager.device.get_link_key(
+ self.connection.peer_address
+ )
+ if self.link_key is None:
+ logging.warning(
+ 'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
+ )
+ self.send_pairing_failed(
+ SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
+ )
+ else:
+ self.ltk = self.derive_ltk(self.link_key, self.ct2)
+ def distribute_keys(self) -> None:
# Distribute the keys as required
if self.is_initiator:
# CTKD: Derive LTK from LinkKey
@@ -1039,7 +1156,7 @@ class Session:
and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
):
self.ctkd_task = self.connection.abort_on(
- 'disconnection', self.derive_ltk()
+ 'disconnection', self.get_link_key_and_derive_ltk()
)
elif not self.sc:
# Distribute the LTK, EDIV and RAND
@@ -1069,12 +1186,7 @@ class Session:
# CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
- ilk = (
- crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
- if self.ct2
- else crypto.h6(self.ltk, b'tmp1')
- )
- self.link_key = crypto.h6(ilk, b'lebr')
+ self.link_key = self.derive_link_key(self.ltk, self.ct2)
else:
# CTKD: Derive LTK from LinkKey
@@ -1083,7 +1195,7 @@ class Session:
and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
):
self.ctkd_task = self.connection.abort_on(
- 'disconnection', self.derive_ltk()
+ 'disconnection', self.get_link_key_and_derive_ltk()
)
# Distribute the LTK, EDIV and RAND
elif not self.sc:
@@ -1113,12 +1225,7 @@ class Session:
# CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
- ilk = (
- crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
- if self.ct2
- else crypto.h6(self.ltk, b'tmp1')
- )
- self.link_key = crypto.h6(ilk, b'lebr')
+ self.link_key = self.derive_link_key(self.ltk, self.ct2)
def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
# Set our expectations for what to wait for in the key distribution phase
@@ -1296,7 +1403,7 @@ class Session:
try:
handler(command)
except Exception as error:
- logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
+ logger.exception(f'{color("!!! Exception in handler:", "red")} {error}')
response = SMP_Pairing_Failed_Command(
reason=SMP_UNSPECIFIED_REASON_ERROR
)
@@ -1333,15 +1440,28 @@ class Session:
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
- # Check for OOB
- if command.oob_data_flag != 0:
- self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
- return
+ # Infer the pairing method
+ if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
+ not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
+ ):
+ # Use OOB
+ self.pairing_method = PairingMethod.OOB
+ if not self.sc and self.tk is None:
+ # For legacy OOB, TK is required.
+ logger.warning("legacy OOB without TK")
+ self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
+ return
+ if command.oob_data_flag == 0:
+ # The peer doesn't have OOB data, use r=0
+ self.r = bytes(16)
+ else:
+ # Decide which pairing method to use from the IO capability
+ self.decide_pairing_method(
+ command.auth_req,
+ command.io_capability,
+ self.io_capability,
+ )
- # Decide which pairing method to use
- self.decide_pairing_method(
- command.auth_req, command.io_capability, self.io_capability
- )
logger.debug(f'pairing method: {self.pairing_method.name}')
# Key distribution
@@ -1390,15 +1510,26 @@ class Session:
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
- # Check for OOB
- if self.sc and command.oob_data_flag:
- self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
- return
+ # Infer the pairing method
+ if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
+ not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
+ ):
+ # Use OOB
+ self.pairing_method = PairingMethod.OOB
+ if not self.sc and self.tk is None:
+ # For legacy OOB, TK is required.
+ logger.warning("legacy OOB without TK")
+ self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
+ return
+ if command.oob_data_flag == 0:
+ # The peer doesn't have OOB data, use r=0
+ self.r = bytes(16)
+ else:
+ # Decide which pairing method to use from the IO capability
+ self.decide_pairing_method(
+ command.auth_req, self.io_capability, command.io_capability
+ )
- # Decide which pairing method to use
- self.decide_pairing_method(
- command.auth_req, self.io_capability, command.io_capability
- )
logger.debug(f'pairing method: {self.pairing_method.name}')
# Key distribution
@@ -1549,12 +1680,13 @@ class Session:
if self.passkey_step < 20:
self.send_pairing_confirm_command()
return
- else:
+ elif self.pairing_method != PairingMethod.OOB:
return
else:
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
+ PairingMethod.OOB,
):
self.send_pairing_random_command()
elif self.pairing_method == PairingMethod.PASSKEY:
@@ -1591,6 +1723,7 @@ class Session:
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
+ PairingMethod.OOB,
):
ra = bytes(16)
rb = ra
@@ -1599,7 +1732,6 @@ class Session:
ra = self.passkey.to_bytes(16, byteorder='little')
rb = ra
else:
- # OOB not implemented yet
return
assert self.preq and self.pres
@@ -1651,18 +1783,33 @@ class Session:
self.peer_public_key_y = command.public_key_y
# Compute the DH key
- self.dh_key = bytes(
- reversed(
- self.manager.ecc_key.dh(
- bytes(reversed(command.public_key_x)),
- bytes(reversed(command.public_key_y)),
- )
- )
- )
+ self.dh_key = self.ecc_key.dh(
+ command.public_key_x[::-1],
+ command.public_key_y[::-1],
+ )[::-1]
logger.debug(f'DH key: {self.dh_key.hex()}')
+ if self.pairing_method == PairingMethod.OOB:
+ # Check against shared OOB data
+ if self.peer_oob_data:
+ confirm_verifier = crypto.f4(
+ self.peer_public_key_x,
+ self.peer_public_key_x,
+ self.peer_oob_data.r,
+ bytes(1),
+ )
+ if not self.check_expected_value(
+ self.peer_oob_data.c,
+ confirm_verifier,
+ SMP_CONFIRM_VALUE_FAILED_ERROR,
+ ):
+ return
+
if self.is_initiator:
- self.send_pairing_confirm_command()
+ if self.pairing_method == PairingMethod.OOB:
+ self.send_pairing_random_command()
+ else:
+ self.send_pairing_confirm_command()
else:
if self.pairing_method == PairingMethod.PASSKEY:
self.display_or_input_passkey()
@@ -1673,6 +1820,7 @@ class Session:
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
+ PairingMethod.OOB,
):
# We can now send the confirmation value
self.send_pairing_confirm_command()
@@ -1701,7 +1849,6 @@ class Session:
else:
self.send_pairing_dhkey_check_command()
else:
- assert self.ltk
self.start_encryption(self.ltk)
def on_smp_pairing_failed_command(
@@ -1751,6 +1898,7 @@ class Manager(EventEmitter):
sessions: Dict[int, Session]
pairing_config_factory: Callable[[Connection], PairingConfig]
session_proxy: Type[Session]
+ _ecc_key: Optional[crypto.EccKey]
def __init__(
self,
@@ -1845,10 +1993,8 @@ class Manager(EventEmitter):
) -> None:
# Store the keys in the key store
if self.device.keystore and identity_address is not None:
- self.device.abort_on(
- 'flush', self.device.update_keys(str(identity_address), keys)
- )
-
+ # Make sure on_pairing emits after key update.
+ await self.device.update_keys(str(identity_address), keys)
# Notify the device
self.device.on_pairing(session.connection, identity_address, keys, session.sc)