diff options
Diffstat (limited to 'bumble/smp.py')
-rw-r--r-- | bumble/smp.py | 292 |
1 files changed, 219 insertions, 73 deletions
diff --git a/bumble/smp.py b/bumble/smp.py index f8bba40..73fd439 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -27,6 +27,7 @@ import logging import asyncio import enum import secrets +from dataclasses import dataclass from typing import ( TYPE_CHECKING, Any, @@ -53,6 +54,7 @@ from .core import ( BT_BR_EDR_TRANSPORT, BT_CENTRAL_ROLE, BT_LE_TRANSPORT, + AdvertisingData, ProtocolError, name_or_number, ) @@ -185,8 +187,8 @@ SMP_KEYPRESS_AUTHREQ = 0b00010000 SMP_CT2_AUTHREQ = 0b00100000 # Crypto salt -SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031') -SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032') +SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031') +SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032') # fmt: on # pylint: enable=line-too-long @@ -564,6 +566,54 @@ class PairingMethod(enum.IntEnum): # ----------------------------------------------------------------------------- +class OobContext: + """Cryptographic context for LE SC OOB pairing.""" + + ecc_key: crypto.EccKey + r: bytes + + def __init__( + self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None + ) -> None: + self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key + self.r = crypto.r() if r is None else r + + def share(self) -> OobSharedData: + pkx = self.ecc_key.x[::-1] + return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r) + + +# ----------------------------------------------------------------------------- +class OobLegacyContext: + """Cryptographic context for LE Legacy OOB pairing.""" + + tk: bytes + + def __init__(self, tk: Optional[bytes] = None) -> None: + self.tk = crypto.r() if tk is None else tk + + +# ----------------------------------------------------------------------------- +@dataclass +class OobSharedData: + """Shareable data for LE SC OOB pairing.""" + + c: bytes + r: bytes + + def to_ad(self) -> AdvertisingData: + return AdvertisingData( + [ + (AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c), + (AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r), + ] + ) + + def __str__(self) -> str: + return f'OOB(C={self.c.hex()}, R={self.r.hex()})' + + +# ----------------------------------------------------------------------------- class Session: # I/O Capability to pairing method decision matrix # @@ -627,6 +677,13 @@ class Session: }, } + ea: bytes + eb: bytes + ltk: bytes + preq: bytes + pres: bytes + tk: bytes + def __init__( self, manager: Manager, @@ -636,17 +693,10 @@ class Session: ) -> None: self.manager = manager self.connection = connection - self.preq: Optional[bytes] = None - self.pres: Optional[bytes] = None - self.ea = None - self.eb = None - self.tk = bytes(16) - self.r = bytes(16) self.stk = None - self.ltk = None self.ltk_ediv = 0 self.ltk_rand = bytes(8) - self.link_key = None + self.link_key: Optional[bytes] = None self.initiator_key_distribution: int = 0 self.responder_key_distribution: int = 0 self.peer_random_value: Optional[bytes] = None @@ -659,7 +709,7 @@ class Session: self.peer_bd_addr: Optional[Address] = None self.peer_signature_key = None self.peer_expected_distributions: List[Type[SMP_Command]] = [] - self.dh_key = None + self.dh_key = b'' self.confirm_value = None self.passkey: Optional[int] = None self.passkey_ready = asyncio.Event() @@ -712,8 +762,8 @@ class Session: self.io_capability = pairing_config.delegate.io_capability self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY - # OOB (not supported yet) - self.oob = False + # OOB + self.oob_data_flag = 0 if pairing_config.oob is None else 1 # Set up addresses self_address = connection.self_address @@ -729,9 +779,35 @@ class Session: self.ia = bytes(peer_address) self.iat = 1 if peer_address.is_random else 0 + # Select the ECC key, TK and r initial value + if pairing_config.oob: + self.peer_oob_data = pairing_config.oob.peer_data + if pairing_config.sc: + if pairing_config.oob.our_context is None: + raise ValueError( + "oob pairing config requires a context when sc is True" + ) + self.r = pairing_config.oob.our_context.r + self.ecc_key = pairing_config.oob.our_context.ecc_key + if pairing_config.oob.legacy_context is not None: + self.tk = pairing_config.oob.legacy_context.tk + else: + if pairing_config.oob.legacy_context is None: + raise ValueError( + "oob pairing config requires a legacy context when sc is False" + ) + self.r = bytes(16) + self.ecc_key = manager.ecc_key + self.tk = pairing_config.oob.legacy_context.tk + else: + self.peer_oob_data = None + self.r = bytes(16) + self.ecc_key = manager.ecc_key + self.tk = bytes(16) + @property def pkx(self) -> Tuple[bytes, bytes]: - return (bytes(reversed(self.manager.ecc_key.x)), self.peer_public_key_x) + return (self.ecc_key.x[::-1], self.peer_public_key_x) @property def pka(self) -> bytes: @@ -768,7 +844,10 @@ class Session: return None def decide_pairing_method( - self, auth_req: int, initiator_io_capability: int, responder_io_capability: int + self, + auth_req: int, + initiator_io_capability: int, + responder_io_capability: int, ) -> None: if self.connection.transport == BT_BR_EDR_TRANSPORT: self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC @@ -909,7 +988,7 @@ class Session: command = SMP_Pairing_Request_Command( io_capability=self.io_capability, - oob_data_flag=0, + oob_data_flag=self.oob_data_flag, auth_req=self.auth_req, maximum_encryption_key_size=16, initiator_key_distribution=self.initiator_key_distribution, @@ -921,7 +1000,7 @@ class Session: def send_pairing_response_command(self) -> None: response = SMP_Pairing_Response_Command( io_capability=self.io_capability, - oob_data_flag=0, + oob_data_flag=self.oob_data_flag, auth_req=self.auth_req, maximum_encryption_key_size=16, initiator_key_distribution=self.initiator_key_distribution, @@ -982,8 +1061,8 @@ class Session: def send_public_key_command(self) -> None: self.send_command( SMP_Pairing_Public_Key_Command( - public_key_x=bytes(reversed(self.manager.ecc_key.x)), - public_key_y=bytes(reversed(self.manager.ecc_key.y)), + public_key_x=self.ecc_key.x[::-1], + public_key_y=self.ecc_key.y[::-1], ) ) @@ -1011,7 +1090,7 @@ class Session: # We can now encrypt the connection with the short term key, so that we can # distribute the long term and/or other keys over an encrypted connection self.manager.device.host.send_command_sync( - HCI_LE_Enable_Encryption_Command( # type: ignore[call-arg] + HCI_LE_Enable_Encryption_Command( connection_handle=self.connection.handle, random_number=bytes(8), encrypted_diversifier=0, @@ -1019,18 +1098,56 @@ class Session: ) ) - async def derive_ltk(self) -> None: - link_key = await self.manager.device.get_link_key(self.connection.peer_address) - assert link_key is not None + @classmethod + def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes: + '''Derives Long Term Key from Link Key. + + Args: + link_key: BR/EDR Link Key bytes in little-endian. + ct2: whether ct2 is supported on both devices. + Returns: + LE Long Tern Key bytes in little-endian. + ''' ilk = ( crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key) - if self.ct2 + if ct2 else crypto.h6(link_key, b'tmp2') ) - self.ltk = crypto.h6(ilk, b'brle') + return crypto.h6(ilk, b'brle') + + @classmethod + def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes: + '''Derives Link Key from Long Term Key. + + Args: + ltk: LE Long Term Key bytes in little-endian. + ct2: whether ct2 is supported on both devices. + Returns: + BR/EDR Link Key bytes in little-endian. + ''' + ilk = ( + crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk) + if ct2 + else crypto.h6(ltk, b'tmp1') + ) + return crypto.h6(ilk, b'lebr') - def distribute_keys(self) -> None: + async def get_link_key_and_derive_ltk(self) -> None: + '''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.''' + self.link_key = await self.manager.device.get_link_key( + self.connection.peer_address + ) + if self.link_key is None: + logging.warning( + 'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!' + ) + self.send_pairing_failed( + SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR + ) + else: + self.ltk = self.derive_ltk(self.link_key, self.ct2) + def distribute_keys(self) -> None: # Distribute the keys as required if self.is_initiator: # CTKD: Derive LTK from LinkKey @@ -1039,7 +1156,7 @@ class Session: and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG ): self.ctkd_task = self.connection.abort_on( - 'disconnection', self.derive_ltk() + 'disconnection', self.get_link_key_and_derive_ltk() ) elif not self.sc: # Distribute the LTK, EDIV and RAND @@ -1069,12 +1186,7 @@ class Session: # CTKD, calculate BR/EDR link key if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: - ilk = ( - crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk) - if self.ct2 - else crypto.h6(self.ltk, b'tmp1') - ) - self.link_key = crypto.h6(ilk, b'lebr') + self.link_key = self.derive_link_key(self.ltk, self.ct2) else: # CTKD: Derive LTK from LinkKey @@ -1083,7 +1195,7 @@ class Session: and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG ): self.ctkd_task = self.connection.abort_on( - 'disconnection', self.derive_ltk() + 'disconnection', self.get_link_key_and_derive_ltk() ) # Distribute the LTK, EDIV and RAND elif not self.sc: @@ -1113,12 +1225,7 @@ class Session: # CTKD, calculate BR/EDR link key if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: - ilk = ( - crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk) - if self.ct2 - else crypto.h6(self.ltk, b'tmp1') - ) - self.link_key = crypto.h6(ilk, b'lebr') + self.link_key = self.derive_link_key(self.ltk, self.ct2) def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None: # Set our expectations for what to wait for in the key distribution phase @@ -1296,7 +1403,7 @@ class Session: try: handler(command) except Exception as error: - logger.warning(f'{color("!!! Exception in handler:", "red")} {error}') + logger.exception(f'{color("!!! Exception in handler:", "red")} {error}') response = SMP_Pairing_Failed_Command( reason=SMP_UNSPECIFIED_REASON_ERROR ) @@ -1333,15 +1440,28 @@ class Session: self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0) - # Check for OOB - if command.oob_data_flag != 0: - self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) - return + # Infer the pairing method + if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( + not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0) + ): + # Use OOB + self.pairing_method = PairingMethod.OOB + if not self.sc and self.tk is None: + # For legacy OOB, TK is required. + logger.warning("legacy OOB without TK") + self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) + return + if command.oob_data_flag == 0: + # The peer doesn't have OOB data, use r=0 + self.r = bytes(16) + else: + # Decide which pairing method to use from the IO capability + self.decide_pairing_method( + command.auth_req, + command.io_capability, + self.io_capability, + ) - # Decide which pairing method to use - self.decide_pairing_method( - command.auth_req, command.io_capability, self.io_capability - ) logger.debug(f'pairing method: {self.pairing_method.name}') # Key distribution @@ -1390,15 +1510,26 @@ class Session: self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) - # Check for OOB - if self.sc and command.oob_data_flag: - self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) - return + # Infer the pairing method + if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( + not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0) + ): + # Use OOB + self.pairing_method = PairingMethod.OOB + if not self.sc and self.tk is None: + # For legacy OOB, TK is required. + logger.warning("legacy OOB without TK") + self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) + return + if command.oob_data_flag == 0: + # The peer doesn't have OOB data, use r=0 + self.r = bytes(16) + else: + # Decide which pairing method to use from the IO capability + self.decide_pairing_method( + command.auth_req, self.io_capability, command.io_capability + ) - # Decide which pairing method to use - self.decide_pairing_method( - command.auth_req, self.io_capability, command.io_capability - ) logger.debug(f'pairing method: {self.pairing_method.name}') # Key distribution @@ -1549,12 +1680,13 @@ class Session: if self.passkey_step < 20: self.send_pairing_confirm_command() return - else: + elif self.pairing_method != PairingMethod.OOB: return else: if self.pairing_method in ( PairingMethod.JUST_WORKS, PairingMethod.NUMERIC_COMPARISON, + PairingMethod.OOB, ): self.send_pairing_random_command() elif self.pairing_method == PairingMethod.PASSKEY: @@ -1591,6 +1723,7 @@ class Session: if self.pairing_method in ( PairingMethod.JUST_WORKS, PairingMethod.NUMERIC_COMPARISON, + PairingMethod.OOB, ): ra = bytes(16) rb = ra @@ -1599,7 +1732,6 @@ class Session: ra = self.passkey.to_bytes(16, byteorder='little') rb = ra else: - # OOB not implemented yet return assert self.preq and self.pres @@ -1651,18 +1783,33 @@ class Session: self.peer_public_key_y = command.public_key_y # Compute the DH key - self.dh_key = bytes( - reversed( - self.manager.ecc_key.dh( - bytes(reversed(command.public_key_x)), - bytes(reversed(command.public_key_y)), - ) - ) - ) + self.dh_key = self.ecc_key.dh( + command.public_key_x[::-1], + command.public_key_y[::-1], + )[::-1] logger.debug(f'DH key: {self.dh_key.hex()}') + if self.pairing_method == PairingMethod.OOB: + # Check against shared OOB data + if self.peer_oob_data: + confirm_verifier = crypto.f4( + self.peer_public_key_x, + self.peer_public_key_x, + self.peer_oob_data.r, + bytes(1), + ) + if not self.check_expected_value( + self.peer_oob_data.c, + confirm_verifier, + SMP_CONFIRM_VALUE_FAILED_ERROR, + ): + return + if self.is_initiator: - self.send_pairing_confirm_command() + if self.pairing_method == PairingMethod.OOB: + self.send_pairing_random_command() + else: + self.send_pairing_confirm_command() else: if self.pairing_method == PairingMethod.PASSKEY: self.display_or_input_passkey() @@ -1673,6 +1820,7 @@ class Session: if self.pairing_method in ( PairingMethod.JUST_WORKS, PairingMethod.NUMERIC_COMPARISON, + PairingMethod.OOB, ): # We can now send the confirmation value self.send_pairing_confirm_command() @@ -1701,7 +1849,6 @@ class Session: else: self.send_pairing_dhkey_check_command() else: - assert self.ltk self.start_encryption(self.ltk) def on_smp_pairing_failed_command( @@ -1751,6 +1898,7 @@ class Manager(EventEmitter): sessions: Dict[int, Session] pairing_config_factory: Callable[[Connection], PairingConfig] session_proxy: Type[Session] + _ecc_key: Optional[crypto.EccKey] def __init__( self, @@ -1845,10 +1993,8 @@ class Manager(EventEmitter): ) -> None: # Store the keys in the key store if self.device.keystore and identity_address is not None: - self.device.abort_on( - 'flush', self.device.update_keys(str(identity_address), keys) - ) - + # Make sure on_pairing emits after key update. + await self.device.update_keys(str(identity_address), keys) # Notify the device self.device.on_pairing(session.connection, identity_address, keys, session.sc) |