diff options
Diffstat (limited to 'bumble/hci.py')
-rw-r--r-- | bumble/hci.py | 961 |
1 files changed, 660 insertions, 301 deletions
diff --git a/bumble/hci.py b/bumble/hci.py index 9b5793d..41deed2 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -16,11 +16,11 @@ # Imports # ----------------------------------------------------------------------------- from __future__ import annotations -import struct import collections -import logging import functools -from typing import Dict, Type, Union +import logging +import struct +from typing import Any, Dict, Callable, Optional, Type, Union from .colors import color from .core import ( @@ -47,6 +47,10 @@ def hci_command_op_code(ogf, ocf): return ogf << 10 | ocf +def hci_vendor_command_op_code(ocf): + return hci_command_op_code(HCI_VENDOR_OGF, ocf) + + def key_with_value(dictionary, target_value): for key, value in dictionary.items(): if value == target_value: @@ -62,7 +66,7 @@ def map_null_terminated_utf8_string(utf8_bytes): try: terminator = utf8_bytes.find(0) if terminator < 0: - return utf8_bytes + terminator = len(utf8_bytes) return utf8_bytes[0:terminator].decode('utf8') except UnicodeDecodeError: return utf8_bytes @@ -101,6 +105,8 @@ def phy_list_to_bits(phys): # fmt: off # pylint: disable=line-too-long +HCI_VENDOR_OGF = 0x3F + # HCI Version HCI_VERSION_BLUETOOTH_CORE_1_0B = 0 HCI_VERSION_BLUETOOTH_CORE_1_1 = 1 @@ -185,7 +191,7 @@ HCI_IO_CAPABILITY_REQUEST_EVENT = 0x31 HCI_IO_CAPABILITY_RESPONSE_EVENT = 0x32 HCI_USER_CONFIRMATION_REQUEST_EVENT = 0x33 HCI_USER_PASSKEY_REQUEST_EVENT = 0x34 -HCI_REMOTE_OOB_DATA_REQUEST = 0x35 +HCI_REMOTE_OOB_DATA_REQUEST_EVENT = 0x35 HCI_SIMPLE_PAIRING_COMPLETE_EVENT = 0x36 HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT = 0x38 HCI_ENHANCED_FLUSH_COMPLETE_EVENT = 0x39 @@ -206,10 +212,8 @@ HCI_INQUIRY_RESPONSE_NOTIFICATION_EVENT = 0X56 HCI_AUTHENTICATED_PAYLOAD_TIMEOUT_EXPIRED_EVENT = 0X57 HCI_SAM_STATUS_CHANGE_EVENT = 0X58 -HCI_EVENT_NAMES = { - event_code: event_name for (event_name, event_code) in globals().items() - if event_name.startswith('HCI_') and event_name.endswith('_EVENT') -} +HCI_VENDOR_EVENT = 0xFF + # HCI Subevent Codes HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01 @@ -248,10 +252,6 @@ HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21 HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22 HCI_LE_SUBRATE_CHANGE_EVENT = 0X23 -HCI_SUBEVENT_NAMES = { - event_code: event_name for (event_name, event_code) in globals().items() - if event_name.startswith('HCI_LE_') and event_name.endswith('_EVENT') and event_code != HCI_LE_META_EVENT -} # HCI Command HCI_INQUIRY_COMMAND = hci_command_op_code(0x01, 0x0001) @@ -557,10 +557,6 @@ HCI_LE_SET_DATA_RELATED_ADDRESS_CHANGES_COMMAND = hci_c HCI_LE_SET_DEFAULT_SUBRATE_COMMAND = hci_command_op_code(0x08, 0x007D) HCI_LE_SUBRATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x007E) -HCI_COMMAND_NAMES = { - command_code: command_name for (command_name, command_code) in globals().items() - if command_name.startswith('HCI_') and command_name.endswith('_COMMAND') -} # HCI Error Codes # See Bluetooth spec Vol 2, Part D - 1.3 LIST OF ERROR CODES @@ -1445,8 +1441,14 @@ class HCI_Object: @staticmethod def init_from_fields(hci_object, fields, values): if isinstance(values, dict): - for field_name, _ in fields: - setattr(hci_object, field_name, values[field_name]) + for field in fields: + if isinstance(field, list): + # The field is an array, up-level the array field names + for sub_field_name, _ in field: + setattr(hci_object, sub_field_name, values[sub_field_name]) + else: + field_name = field[0] + setattr(hci_object, field_name, values[field_name]) else: for field_name, field_value in zip(fields, values): setattr(hci_object, field_name, field_value) @@ -1457,132 +1459,160 @@ class HCI_Object: HCI_Object.init_from_fields(hci_object, parsed.keys(), parsed.values()) @staticmethod + def parse_field(data, offset, field_type): + # The field_type may be a dictionary with a mapper, parser, and/or size + if isinstance(field_type, dict): + if 'size' in field_type: + field_type = field_type['size'] + elif 'parser' in field_type: + field_type = field_type['parser'] + + # Parse the field + if field_type == '*': + # The rest of the bytes + field_value = data[offset:] + return (field_value, len(field_value)) + if field_type == 1: + # 8-bit unsigned + return (data[offset], 1) + if field_type == -1: + # 8-bit signed + return (struct.unpack_from('b', data, offset)[0], 1) + if field_type == 2: + # 16-bit unsigned + return (struct.unpack_from('<H', data, offset)[0], 2) + if field_type == '>2': + # 16-bit unsigned big-endian + return (struct.unpack_from('>H', data, offset)[0], 2) + if field_type == -2: + # 16-bit signed + return (struct.unpack_from('<h', data, offset)[0], 2) + if field_type == 3: + # 24-bit unsigned + padded = data[offset : offset + 3] + bytes([0]) + return (struct.unpack('<I', padded)[0], 3) + if field_type == 4: + # 32-bit unsigned + return (struct.unpack_from('<I', data, offset)[0], 4) + if field_type == '>4': + # 32-bit unsigned big-endian + return (struct.unpack_from('>I', data, offset)[0], 4) + if isinstance(field_type, int) and 4 < field_type <= 256: + # Byte array (from 5 up to 256 bytes) + return (data[offset : offset + field_type], field_type) + if callable(field_type): + new_offset, field_value = field_type(data, offset) + return (field_value, new_offset - offset) + + raise ValueError(f'unknown field type {field_type}') + + @staticmethod def dict_from_bytes(data, offset, fields): result = collections.OrderedDict() - for (field_name, field_type) in fields: - # The field_type may be a dictionary with a mapper, parser, and/or size - if isinstance(field_type, dict): - if 'size' in field_type: - field_type = field_type['size'] - elif 'parser' in field_type: - field_type = field_type['parser'] - - # Parse the field - if field_type == '*': - # The rest of the bytes - field_value = data[offset:] - offset += len(field_value) - elif field_type == 1: - # 8-bit unsigned - field_value = data[offset] + for field in fields: + if isinstance(field, list): + # This is an array field, starting with a 1-byte item count. + item_count = data[offset] offset += 1 - elif field_type == -1: - # 8-bit signed - field_value = struct.unpack_from('b', data, offset)[0] - offset += 1 - elif field_type == 2: - # 16-bit unsigned - field_value = struct.unpack_from('<H', data, offset)[0] - offset += 2 - elif field_type == '>2': - # 16-bit unsigned big-endian - field_value = struct.unpack_from('>H', data, offset)[0] - offset += 2 - elif field_type == -2: - # 16-bit signed - field_value = struct.unpack_from('<h', data, offset)[0] - offset += 2 - elif field_type == 3: - # 24-bit unsigned - padded = data[offset : offset + 3] + bytes([0]) - field_value = struct.unpack('<I', padded)[0] - offset += 3 - elif field_type == 4: - # 32-bit unsigned - field_value = struct.unpack_from('<I', data, offset)[0] - offset += 4 - elif field_type == '>4': - # 32-bit unsigned big-endian - field_value = struct.unpack_from('>I', data, offset)[0] - offset += 4 - elif isinstance(field_type, int) and 4 < field_type <= 256: - # Byte array (from 5 up to 256 bytes) - field_value = data[offset : offset + field_type] - offset += field_type - elif callable(field_type): - offset, field_value = field_type(data, offset) - else: - raise ValueError(f'unknown field type {field_type}') - + for _ in range(item_count): + for sub_field_name, sub_field_type in field: + value, size = HCI_Object.parse_field( + data, offset, sub_field_type + ) + result.setdefault(sub_field_name, []).append(value) + offset += size + continue + + field_name, field_type = field + field_value, field_size = HCI_Object.parse_field(data, offset, field_type) result[field_name] = field_value + offset += field_size return result @staticmethod - def dict_to_bytes(hci_object, fields): - result = bytearray() - for (field_name, field_type) in fields: - # The field_type may be a dictionary with a mapper, parser, serializer, - # and/or size - serializer = None - if isinstance(field_type, dict): - if 'serializer' in field_type: - serializer = field_type['serializer'] - if 'size' in field_type: - field_type = field_type['size'] - - # Serialize the field - field_value = hci_object[field_name] - if serializer: - field_bytes = serializer(field_value) - elif field_type == 1: - # 8-bit unsigned - field_bytes = bytes([field_value]) - elif field_type == -1: - # 8-bit signed - field_bytes = struct.pack('b', field_value) - elif field_type == 2: - # 16-bit unsigned - field_bytes = struct.pack('<H', field_value) - elif field_type == '>2': - # 16-bit unsigned big-endian - field_bytes = struct.pack('>H', field_value) - elif field_type == -2: - # 16-bit signed - field_bytes = struct.pack('<h', field_value) - elif field_type == 3: - # 24-bit unsigned - field_bytes = struct.pack('<I', field_value)[0:3] - elif field_type == 4: - # 32-bit unsigned - field_bytes = struct.pack('<I', field_value) - elif field_type == '>4': - # 32-bit unsigned big-endian - field_bytes = struct.pack('>I', field_value) - elif field_type == '*': - if isinstance(field_value, int): - if 0 <= field_value <= 255: - field_bytes = bytes([field_value]) - else: - raise ValueError('value too large for *-typed field') + def serialize_field(field_value, field_type): + # The field_type may be a dictionary with a mapper, parser, serializer, + # and/or size + serializer = None + if isinstance(field_type, dict): + if 'serializer' in field_type: + serializer = field_type['serializer'] + if 'size' in field_type: + field_type = field_type['size'] + + # Serialize the field + if serializer: + field_bytes = serializer(field_value) + elif field_type == 1: + # 8-bit unsigned + field_bytes = bytes([field_value]) + elif field_type == -1: + # 8-bit signed + field_bytes = struct.pack('b', field_value) + elif field_type == 2: + # 16-bit unsigned + field_bytes = struct.pack('<H', field_value) + elif field_type == '>2': + # 16-bit unsigned big-endian + field_bytes = struct.pack('>H', field_value) + elif field_type == -2: + # 16-bit signed + field_bytes = struct.pack('<h', field_value) + elif field_type == 3: + # 24-bit unsigned + field_bytes = struct.pack('<I', field_value)[0:3] + elif field_type == 4: + # 32-bit unsigned + field_bytes = struct.pack('<I', field_value) + elif field_type == '>4': + # 32-bit unsigned big-endian + field_bytes = struct.pack('>I', field_value) + elif field_type == '*': + if isinstance(field_value, int): + if 0 <= field_value <= 255: + field_bytes = bytes([field_value]) else: - field_bytes = bytes(field_value) - elif isinstance(field_value, (bytes, bytearray)) or hasattr( - field_value, 'to_bytes' - ): - field_bytes = bytes(field_value) - if isinstance(field_type, int) and 4 < field_type <= 256: - # Truncate or Pad with zeros if the field is too long or too short - if len(field_bytes) < field_type: - field_bytes += bytes(field_type - len(field_bytes)) - elif len(field_bytes) > field_type: - field_bytes = field_bytes[:field_type] + raise ValueError('value too large for *-typed field') else: - raise ValueError( - f"don't know how to serialize type {type(field_value)}" + field_bytes = bytes(field_value) + elif isinstance(field_value, (bytes, bytearray)) or hasattr( + field_value, 'to_bytes' + ): + field_bytes = bytes(field_value) + if isinstance(field_type, int) and 4 < field_type <= 256: + # Truncate or pad with zeros if the field is too long or too short + if len(field_bytes) < field_type: + field_bytes += bytes(field_type - len(field_bytes)) + elif len(field_bytes) > field_type: + field_bytes = field_bytes[:field_type] + else: + raise ValueError(f"don't know how to serialize type {type(field_value)}") + + return field_bytes + + @staticmethod + def dict_to_bytes(hci_object, fields): + result = bytearray() + for field in fields: + if isinstance(field, list): + # The field is an array. The serialized form starts with a 1-byte + # item count. We use the length of the first array field as the + # array count, since all array fields have the same number of items. + item_count = len(hci_object[field[0][0]]) + result += bytes([item_count]) + b''.join( + b''.join( + HCI_Object.serialize_field( + hci_object[sub_field_name][i], sub_field_type + ) + for sub_field_name, sub_field_type in field + ) + for i in range(item_count) ) + continue - result += field_bytes + (field_name, field_type) = field + result += HCI_Object.serialize_field(hci_object[field_name], field_type) return bytes(result) @@ -1617,46 +1647,73 @@ class HCI_Object: return str(value) @staticmethod - def format_fields(hci_object, keys, indentation='', value_mappers=None): - if not keys: - return '' - - # Measure the widest field name - max_field_name_length = max( - (len(key[0] if isinstance(key, tuple) else key) for key in keys) + def stringify_field( + field_name, field_type, field_value, indentation, value_mappers + ): + value_mapper = None + if isinstance(field_type, dict): + # Get the value mapper from the specifier + value_mapper = field_type.get('mapper') + + # Check if there's a matching mapper passed + if value_mappers: + value_mapper = value_mappers.get(field_name, value_mapper) + + # Map the value if we have a mapper + if value_mapper is not None: + field_value = value_mapper(field_value) + + # Get the string representation of the value + return HCI_Object.format_field_value( + field_value, indentation=indentation + ' ' ) + @staticmethod + def format_fields(hci_object, fields, indentation='', value_mappers=None): + if not fields: + return '' + # Build array of formatted key:value pairs - fields = [] - for key in keys: - value_mapper = None - if isinstance(key, tuple): - # The key has an associated specifier - key, specifier = key - - # Get the value mapper from the specifier - if isinstance(specifier, dict): - value_mapper = specifier.get('mapper') - - # Get the value for the field - value = hci_object[key] - - # Map the value if needed - if value_mappers: - value_mapper = value_mappers.get(key, value_mapper) - if value_mapper is not None: - value = value_mapper(value) - - # Get the string representation of the value - value_str = HCI_Object.format_field_value( - value, indentation=indentation + ' ' + field_strings = [] + for field in fields: + if isinstance(field, list): + for sub_field in field: + sub_field_name, sub_field_type = sub_field + item_count = len(hci_object[sub_field_name]) + for i in range(item_count): + field_strings.append( + ( + f'{sub_field_name}[{i}]', + HCI_Object.stringify_field( + sub_field_name, + sub_field_type, + hci_object[sub_field_name][i], + indentation, + value_mappers, + ), + ), + ) + continue + + field_name, field_type = field + field_value = hci_object[field_name] + field_strings.append( + ( + field_name, + HCI_Object.stringify_field( + field_name, field_type, field_value, indentation, value_mappers + ), + ), ) - # Add the field to the formatted result - key_str = color(f'{key + ":":{1 + max_field_name_length}}', 'cyan') - fields.append(f'{indentation}{key_str} {value_str}') - - return '\n'.join(fields) + # Measure the widest field name + max_field_name_length = max(len(s[0]) for s in field_strings) + sep = ':' + return '\n'.join( + f'{indentation}' + f'{color(f"{field_name + sep:{1 + max_field_name_length}}", "cyan")} {field_value}' + for field_name, field_value in field_strings + ) def __bytes__(self): return self.to_bytes() @@ -1795,6 +1852,16 @@ class Address: def to_bytes(self): return self.address_bytes + def to_string(self, with_type_qualifier=True): + ''' + String representation of the address, MSB first, with an optional type + qualifier. + ''' + result = ':'.join([f'{x:02X}' for x in reversed(self.address_bytes)]) + if not with_type_qualifier or not self.is_public: + return result + return result + '/P' + def __bytes__(self): return self.to_bytes() @@ -1808,13 +1875,7 @@ class Address: ) def __str__(self): - ''' - String representation of the address, MSB first - ''' - result = ':'.join([f'{x:02X}' for x in reversed(self.address_bytes)]) - if not self.is_public: - return result - return result + '/P' + return self.to_string() # Predefined address values @@ -1853,7 +1914,7 @@ class HCI_Packet: hci_packet_type: int @staticmethod - def from_bytes(packet): + def from_bytes(packet: bytes) -> HCI_Packet: packet_type = packet[0] if packet_type == HCI_COMMAND_PACKET: @@ -1895,6 +1956,7 @@ class HCI_Command(HCI_Packet): ''' hci_packet_type = HCI_COMMAND_PACKET + command_names: Dict[int, str] = {} command_classes: Dict[int, Type[HCI_Command]] = {} @staticmethod @@ -1905,9 +1967,9 @@ class HCI_Command(HCI_Packet): def inner(cls): cls.name = cls.__name__.upper() - cls.op_code = key_with_value(HCI_COMMAND_NAMES, cls.name) + cls.op_code = key_with_value(cls.command_names, cls.name) if cls.op_code is None: - raise KeyError(f'command {cls.name} not found in HCI_COMMAND_NAMES') + raise KeyError(f'command {cls.name} not found in command_names') cls.fields = fields cls.return_parameters_fields = return_parameters_fields @@ -1927,7 +1989,19 @@ class HCI_Command(HCI_Packet): return inner @staticmethod - def from_bytes(packet): + def command_map(symbols: Dict[str, Any]) -> Dict[int, str]: + return { + command_code: command_name + for (command_name, command_code) in symbols.items() + if command_name.startswith('HCI_') and command_name.endswith('_COMMAND') + } + + @classmethod + def register_commands(cls, symbols: Dict[str, Any]) -> None: + cls.command_names.update(cls.command_map(symbols)) + + @staticmethod + def from_bytes(packet: bytes) -> HCI_Command: op_code, length = struct.unpack_from('<HB', packet, 1) parameters = packet[4:] if len(parameters) != length: @@ -1946,11 +2020,11 @@ class HCI_Command(HCI_Packet): HCI_Object.init_from_bytes(self, parameters, 0, fields) return self - return cls.from_parameters(parameters) + return cls.from_parameters(parameters) # type: ignore @staticmethod def command_name(op_code): - name = HCI_COMMAND_NAMES.get(op_code) + name = HCI_Command.command_names.get(op_code) if name is not None: return name return f'[OGF=0x{op_code >> 10:02x}, OCF=0x{op_code & 0x3FF:04x}]' @@ -1959,6 +2033,16 @@ class HCI_Command(HCI_Packet): def create_return_parameters(cls, **kwargs): return HCI_Object(cls.return_parameters_fields, **kwargs) + @classmethod + def parse_return_parameters(cls, parameters): + if not cls.return_parameters_fields: + return None + return_parameters = HCI_Object.from_bytes( + parameters, 0, cls.return_parameters_fields + ) + return_parameters.fields = cls.return_parameters_fields + return return_parameters + def __init__(self, op_code, parameters=None, **kwargs): super().__init__(HCI_Command.command_name(op_code)) if (fields := getattr(self, 'fields', None)) and kwargs: @@ -1988,6 +2072,9 @@ class HCI_Command(HCI_Packet): return result +HCI_Command.register_commands(globals()) + + # ----------------------------------------------------------------------------- @HCI_Command.command( [ @@ -2284,6 +2371,55 @@ class HCI_User_Passkey_Request_Negative_Reply_Command(HCI_Command): # ----------------------------------------------------------------------------- @HCI_Command.command( + fields=[ + ('bd_addr', Address.parse_address), + ('c', 16), + ('r', 16), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('bd_addr', Address.parse_address), + ], +) +class HCI_Remote_OOB_Data_Request_Reply_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.34 Remote OOB Data Request Reply Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[('bd_addr', Address.parse_address)], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('bd_addr', Address.parse_address), + ], +) +class HCI_Remote_OOB_Data_Request_Negative_Reply_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.35 Remote OOB Data Request Negative Reply Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('bd_addr', Address.parse_address), + ('reason', 1), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('bd_addr', Address.parse_address), + ], +) +class HCI_IO_Capability_Request_Negative_Reply_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.36 IO Capability Request Negative Reply Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( [ ('connection_handle', 2), ('transmit_bandwidth', 4), @@ -2320,6 +2456,161 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command): # ----------------------------------------------------------------------------- @HCI_Command.command( [ + ('bd_addr', Address.parse_address), + ('transmit_bandwidth', 4), + ('receive_bandwidth', 4), + ('transmit_coding_format', 5), + ('receive_coding_format', 5), + ('transmit_codec_frame_size', 2), + ('receive_codec_frame_size', 2), + ('input_bandwidth', 4), + ('output_bandwidth', 4), + ('input_coding_format', 5), + ('output_coding_format', 5), + ('input_coded_data_size', 2), + ('output_coded_data_size', 2), + ('input_pcm_data_format', 1), + ('output_pcm_data_format', 1), + ('input_pcm_sample_payload_msb_position', 1), + ('output_pcm_sample_payload_msb_position', 1), + ('input_data_path', 1), + ('output_data_path', 1), + ('input_transport_unit_size', 1), + ('output_transport_unit_size', 1), + ('max_latency', 2), + ('packet_type', 2), + ('retransmission_effort', 1), + ] +) +class HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.46 Enhanced Accept Synchronous Connection Request Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('bd_addr', Address.parse_address), + ('page_scan_repetition_mode', 1), + ('clock_offset', 2), + ] +) +class HCI_Truncated_Page_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.47 Truncated Page Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[('bd_addr', Address.parse_address)], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('bd_addr', Address.parse_address), + ], +) +class HCI_Truncated_Page_Cancel_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.48 Truncated Page Cancel Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('enable', 1), + ('lt_addr', 1), + ('lpo_allowed', 1), + ('packet_type', 2), + ('interval_min', 2), + ('interval_max', 2), + ('supervision_timeout', 2), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('lt_addr', 1), + ('interval', 2), + ], +) +class HCI_Set_Connectionless_Peripheral_Broadcast_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.49 Set Connectionless Peripheral Broadcast Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('enable', 1), + ('bd_addr', Address.parse_address), + ('lt_addr', 1), + ('interval', 2), + ('clock_offset', 4), + ('next_connectionless_peripheral_broadcast_clock', 4), + ('supervision_timeout', 2), + ('remote_timing_accuracy', 1), + ('skip', 1), + ('packet_type', 2), + ('afh_channel_map', 10), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('bd_addr', Address.parse_address), + ('lt_addr', 1), + ], +) +class HCI_Set_Connectionless_Peripheral_Broadcast_Receive_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.50 Set Connectionless Peripheral Broadcast Receive Command + ''' + + +# ----------------------------------------------------------------------------- +class HCI_Start_Synchronization_Train_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.51 Start Synchronization Train Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('bd_addr', Address.parse_address), + ('sync_scan_timeout', 2), + ('sync_scan_window', 2), + ('sync_scan_interval', 2), + ], +) +class HCI_Receive_Synchronization_Train_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.52 Receive Synchronization Train Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + fields=[ + ('bd_addr', Address.parse_address), + ('c_192', 16), + ('r_192', 16), + ('c_256', 16), + ('r_256', 16), + ], + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('bd_addr', Address.parse_address), + ], +) +class HCI_Remote_OOB_Extended_Data_Request_Reply_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.1.53 Remote OOB Extended Data Request Reply Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + [ ('connection_handle', 2), ('sniff_max_interval', 2), ('sniff_min_interval', 2), @@ -2685,6 +2976,20 @@ class HCI_Write_Simple_Pairing_Mode_Command(HCI_Command): # ----------------------------------------------------------------------------- @HCI_Command.command( + return_parameters_fields=[ + ('status', STATUS_SPEC), + ('c', 16), + ('r', 16), + ] +) +class HCI_Read_Local_OOB_Data_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.3.60 Read Local OOB Data Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( return_parameters_fields=[('status', STATUS_SPEC), ('tx_power', -1)] ) class HCI_Read_Inquiry_Response_Transmit_Power_Level_Command(HCI_Command): @@ -2747,6 +3052,22 @@ class HCI_Write_Authenticated_Payload_Timeout_Command(HCI_Command): @HCI_Command.command( return_parameters_fields=[ ('status', STATUS_SPEC), + ('c_192', 16), + ('r_192', 16), + ('c_256', 16), + ('r_256', 16), + ] +) +class HCI_Read_Local_OOB_Extended_Data_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.3.95 Read Local OOB Extended Data Command + ''' + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + return_parameters_fields=[ + ('status', STATUS_SPEC), ('hci_version', 1), ('hci_subversion', 2), ('lmp_version', 1), @@ -3529,9 +3850,7 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command): 'advertising_data', { 'parser': HCI_Object.parse_length_prefixed_bytes, - 'serializer': functools.partial( - HCI_Object.serialize_length_prefixed_bytes - ), + 'serializer': HCI_Object.serialize_length_prefixed_bytes, }, ), ] @@ -3579,9 +3898,7 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command): 'scan_response_data', { 'parser': HCI_Object.parse_length_prefixed_bytes, - 'serializer': functools.partial( - HCI_Object.serialize_length_prefixed_bytes - ), + 'serializer': HCI_Object.serialize_length_prefixed_bytes, }, ), ] @@ -3609,73 +3926,21 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command): # ----------------------------------------------------------------------------- -@HCI_Command.command(fields=None) +@HCI_Command.command( + [ + ('enable', 1), + [ + ('advertising_handles', 1), + ('durations', 2), + ('max_extended_advertising_events', 1), + ], + ] +) class HCI_LE_Set_Extended_Advertising_Enable_Command(HCI_Command): ''' See Bluetooth spec @ 7.8.56 LE Set Extended Advertising Enable Command ''' - @classmethod - def from_parameters(cls, parameters): - enable = parameters[0] - num_sets = parameters[1] - advertising_handles = [] - durations = [] - max_extended_advertising_events = [] - offset = 2 - for _ in range(num_sets): - advertising_handles.append(parameters[offset]) - durations.append(struct.unpack_from('<H', parameters, offset + 1)[0]) - max_extended_advertising_events.append(parameters[offset + 3]) - offset += 4 - - return cls( - enable, advertising_handles, durations, max_extended_advertising_events - ) - - def __init__( - self, enable, advertising_handles, durations, max_extended_advertising_events - ): - super().__init__(HCI_LE_SET_EXTENDED_ADVERTISING_ENABLE_COMMAND) - self.enable = enable - self.advertising_handles = advertising_handles - self.durations = durations - self.max_extended_advertising_events = max_extended_advertising_events - - self.parameters = bytes([enable, len(advertising_handles)]) + b''.join( - [ - struct.pack( - '<BHB', - advertising_handles[i], - durations[i], - max_extended_advertising_events[i], - ) - for i in range(len(advertising_handles)) - ] - ) - - def __str__(self): - fields = [('enable:', self.enable)] - for i, advertising_handle in enumerate(self.advertising_handles): - fields.append( - (f'advertising_handle[{i}]: ', advertising_handle) - ) - fields.append((f'duration[{i}]: ', self.durations[i])) - fields.append( - ( - f'max_extended_advertising_events[{i}]:', - self.max_extended_advertising_events[i], - ) - ) - - return ( - color(self.name, 'green') - + ':\n' - + '\n'.join( - [color(field[0], 'cyan') + ' ' + str(field[1]) for field in fields] - ) - ) - # ----------------------------------------------------------------------------- @HCI_Command.command( @@ -3826,7 +4091,10 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command): color(self.name, 'green') + ':\n' + '\n'.join( - [color(field[0], 'cyan') + ' ' + str(field[1]) for field in fields] + [ + color(' ' + field[0], 'cyan') + ' ' + str(field[1]) + for field in fields + ] ) ) @@ -4002,7 +4270,10 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command): color(self.name, 'green') + ':\n' + '\n'.join( - [color(field[0], 'cyan') + ' ' + str(field[1]) for field in fields] + [ + color(' ' + field[0], 'cyan') + ' ' + str(field[1]) + for field in fields + ] ) ) @@ -4059,8 +4330,8 @@ class HCI_Event(HCI_Packet): ''' hci_packet_type = HCI_EVENT_PACKET + event_names: Dict[int, str] = {} event_classes: Dict[int, Type[HCI_Event]] = {} - meta_event_classes: Dict[int, Type[HCI_LE_Meta_Event]] = {} @staticmethod def event(fields=()): @@ -4070,9 +4341,9 @@ class HCI_Event(HCI_Packet): def inner(cls): cls.name = cls.__name__.upper() - cls.event_code = key_with_value(HCI_EVENT_NAMES, cls.name) + cls.event_code = key_with_value(cls.event_names, cls.name) if cls.event_code is None: - raise KeyError('event not found in HCI_EVENT_NAMES') + raise KeyError(f'event {cls.name} not found in event_names') cls.fields = fields # Patch the __init__ method to fix the event_code @@ -4089,11 +4360,29 @@ class HCI_Event(HCI_Packet): return inner @staticmethod + def event_map(symbols: Dict[str, Any]) -> Dict[int, str]: + return { + event_code: event_name + for (event_name, event_code) in symbols.items() + if event_name.startswith('HCI_') + and not event_name.startswith('HCI_LE_') + and event_name.endswith('_EVENT') + } + + @staticmethod + def event_name(event_code): + return name_or_number(HCI_Event.event_names, event_code) + + @staticmethod + def register_events(symbols: Dict[str, Any]) -> None: + HCI_Event.event_names.update(HCI_Event.event_map(symbols)) + + @staticmethod def registered(event_class): event_class.name = event_class.__name__.upper() - event_class.event_code = key_with_value(HCI_EVENT_NAMES, event_class.name) + event_class.event_code = key_with_value(HCI_Event.event_names, event_class.name) if event_class.event_code is None: - raise KeyError('event not found in HCI_EVENT_NAMES') + raise KeyError(f'event {event_class.name} not found in event_names') # Register a factory for this class HCI_Event.event_classes[event_class.event_code] = event_class @@ -4101,22 +4390,28 @@ class HCI_Event(HCI_Packet): return event_class @staticmethod - def from_bytes(packet): + def from_bytes(packet: bytes) -> HCI_Event: event_code = packet[1] length = packet[2] parameters = packet[3:] if len(parameters) != length: raise ValueError('invalid packet length') + cls: Any if event_code == HCI_LE_META_EVENT: # We do this dispatch here and not in the subclass in order to avoid call # loops subevent_code = parameters[0] - cls = HCI_Event.meta_event_classes.get(subevent_code) + cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code) if cls is None: # No class registered, just use a generic class instance return HCI_LE_Meta_Event(subevent_code, parameters) - + elif event_code == HCI_VENDOR_EVENT: + subevent_code = parameters[0] + cls = HCI_Vendor_Event.subevent_classes.get(subevent_code) + if cls is None: + # No class registered, just use a generic class instance + return HCI_Vendor_Event(subevent_code, parameters) else: cls = HCI_Event.event_classes.get(event_code) if cls is None: @@ -4124,7 +4419,7 @@ class HCI_Event(HCI_Packet): return HCI_Event(event_code, parameters) # Invoke the factory to create a new instance - return cls.from_parameters(parameters) + return cls.from_parameters(parameters) # type: ignore @classmethod def from_parameters(cls, parameters): @@ -4134,10 +4429,6 @@ class HCI_Event(HCI_Packet): HCI_Object.init_from_bytes(self, parameters, 0, fields) return self - @staticmethod - def event_name(event_code): - return name_or_number(HCI_EVENT_NAMES, event_code) - def __init__(self, event_code, parameters=None, **kwargs): super().__init__(HCI_Event.event_name(event_code)) if (fields := getattr(self, 'fields', None)) and kwargs: @@ -4164,71 +4455,111 @@ class HCI_Event(HCI_Packet): return result +HCI_Event.register_events(globals()) + + # ----------------------------------------------------------------------------- -class HCI_LE_Meta_Event(HCI_Event): +class HCI_Extended_Event(HCI_Event): ''' - See Bluetooth spec @ 7.7.65 LE Meta Event + HCI_Event subclass for events that has a subevent code. ''' - @staticmethod - def event(fields=()): + subevent_names: Dict[int, str] = {} + subevent_classes: Dict[int, Type[HCI_Extended_Event]] + + @classmethod + def event(cls, fields=()): ''' Decorator used to declare and register subclasses ''' def inner(cls): cls.name = cls.__name__.upper() - cls.subevent_code = key_with_value(HCI_SUBEVENT_NAMES, cls.name) + cls.subevent_code = key_with_value(cls.subevent_names, cls.name) if cls.subevent_code is None: - raise KeyError('subevent not found in HCI_SUBEVENT_NAMES') + raise KeyError(f'subevent {cls.name} not found in subevent_names') cls.fields = fields # Patch the __init__ method to fix the subevent_code + original_init = cls.__init__ + def init(self, parameters=None, **kwargs): - return HCI_LE_Meta_Event.__init__( - self, cls.subevent_code, parameters, **kwargs - ) + return original_init(self, cls.subevent_code, parameters, **kwargs) cls.__init__ = init # Register a factory for this class - HCI_Event.meta_event_classes[cls.subevent_code] = cls + cls.subevent_classes[cls.subevent_code] = cls return cls return inner @classmethod + def subevent_name(cls, subevent_code): + subevent_name = cls.subevent_names.get(subevent_code) + if subevent_name is not None: + return subevent_name + + return f'{cls.__name__.upper()}[0x{subevent_code:02X}]' + + @staticmethod + def subevent_map(symbols: Dict[str, Any]) -> Dict[int, str]: + return { + subevent_code: subevent_name + for (subevent_name, subevent_code) in symbols.items() + if subevent_name.startswith('HCI_') and subevent_name.endswith('_EVENT') + } + + @classmethod + def register_subevents(cls, symbols: Dict[str, Any]) -> None: + cls.subevent_names.update(cls.subevent_map(symbols)) + + @classmethod def from_parameters(cls, parameters): self = cls.__new__(cls) - HCI_LE_Meta_Event.__init__(self, self.subevent_code, parameters) + HCI_Extended_Event.__init__(self, self.subevent_code, parameters) if fields := getattr(self, 'fields', None): HCI_Object.init_from_bytes(self, parameters, 1, fields) return self - @staticmethod - def subevent_name(subevent_code): - return name_or_number(HCI_SUBEVENT_NAMES, subevent_code) - def __init__(self, subevent_code, parameters, **kwargs): self.subevent_code = subevent_code if parameters is None and (fields := getattr(self, 'fields', None)) and kwargs: parameters = bytes([subevent_code]) + HCI_Object.dict_to_bytes( kwargs, fields ) - super().__init__(HCI_LE_META_EVENT, parameters, **kwargs) + super().__init__(self.event_code, parameters, **kwargs) # Override the name in order to adopt the subevent name instead self.name = self.subevent_name(subevent_code) - def __str__(self): - result = color(self.subevent_name(self.subevent_code), 'magenta') - if fields := getattr(self, 'fields', None): - result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') - else: - if self.parameters: - result += f': {self.parameters.hex()}' - return result + +# ----------------------------------------------------------------------------- +class HCI_LE_Meta_Event(HCI_Extended_Event): + ''' + See Bluetooth spec @ 7.7.65 LE Meta Event + ''' + + event_code: int = HCI_LE_META_EVENT + subevent_classes = {} + + @staticmethod + def subevent_map(symbols: Dict[str, Any]) -> Dict[int, str]: + return { + subevent_code: subevent_name + for (subevent_name, subevent_code) in symbols.items() + if subevent_name.startswith('HCI_LE_') and subevent_name.endswith('_EVENT') + } + + +HCI_LE_Meta_Event.register_subevents(globals()) + + +# ----------------------------------------------------------------------------- +class HCI_Vendor_Event(HCI_Extended_Event): + event_code: int = HCI_VENDOR_EVENT + subevent_classes = {} # ----------------------------------------------------------------------------- @@ -4342,7 +4673,7 @@ class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event): return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' -HCI_Event.meta_event_classes[ +HCI_LE_Meta_Event.subevent_classes[ HCI_LE_ADVERTISING_REPORT_EVENT ] = HCI_LE_Advertising_Report_Event @@ -4596,7 +4927,7 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event): return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' -HCI_Event.meta_event_classes[ +HCI_LE_Meta_Event.subevent_classes[ HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT ] = HCI_LE_Extended_Advertising_Report_Event @@ -4837,6 +5168,7 @@ class HCI_Command_Complete_Event(HCI_Event): ''' return_parameters = b'' + command_opcode: int def map_return_parameters(self, return_parameters): '''Map simple 'status' return parameters to their named constant form''' @@ -4869,11 +5201,11 @@ class HCI_Command_Complete_Event(HCI_Event): self.return_parameters = self.return_parameters[0] else: cls = HCI_Command.command_classes.get(self.command_opcode) - if cls and cls.return_parameters_fields: - self.return_parameters = HCI_Object.from_bytes( - self.return_parameters, 0, cls.return_parameters_fields - ) - self.return_parameters.fields = cls.return_parameters_fields + if cls: + # Try to parse the return parameters bytes into an object. + return_parameters = cls.parse_return_parameters(self.return_parameters) + if return_parameters is not None: + self.return_parameters = return_parameters return self @@ -4965,7 +5297,7 @@ class HCI_Number_Of_Completed_Packets_Event(HCI_Event): def __str__(self): lines = [ color(self.name, 'magenta') + ':', - color(' number_of_handles: ', 'cyan') + color(' number_of_handles: ', 'cyan') + f'{len(self.connection_handles)}', ] for i, connection_handle in enumerate(self.connection_handles): @@ -5300,6 +5632,14 @@ class HCI_User_Passkey_Request_Event(HCI_Event): # ----------------------------------------------------------------------------- +@HCI_Event.event([('bd_addr', Address.parse_address)]) +class HCI_Remote_OOB_Data_Request_Event(HCI_Event): + ''' + See Bluetooth spec @ 7.7.44 Remote OOB Data Request Event + ''' + + +# ----------------------------------------------------------------------------- @HCI_Event.event([('status', STATUS_SPEC), ('bd_addr', Address.parse_address)]) class HCI_Simple_Pairing_Complete_Event(HCI_Event): ''' @@ -5316,6 +5656,14 @@ class HCI_Link_Supervision_Timeout_Changed_Event(HCI_Event): # ----------------------------------------------------------------------------- +@HCI_Event.event([('handle', 2)]) +class HCI_Enhanced_Flush_Complete_Event(HCI_Event): + ''' + See Bluetooth spec @ 7.7.47 Enhanced Flush Complete Event + ''' + + +# ----------------------------------------------------------------------------- @HCI_Event.event([('bd_addr', Address.parse_address), ('passkey', 4)]) class HCI_User_Passkey_Notification_Event(HCI_Event): ''' @@ -5324,6 +5672,14 @@ class HCI_User_Passkey_Notification_Event(HCI_Event): # ----------------------------------------------------------------------------- +@HCI_Event.event([('bd_addr', Address.parse_address), ('notification_type', 1)]) +class HCI_Keypress_Notification_Event(HCI_Event): + ''' + See Bluetooth spec @ 7.7.49 Keypress Notification Event + ''' + + +# ----------------------------------------------------------------------------- @HCI_Event.event([('bd_addr', Address.parse_address), ('host_supported_features', 8)]) class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event): ''' @@ -5332,7 +5688,7 @@ class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event): # ----------------------------------------------------------------------------- -class HCI_AclDataPacket: +class HCI_AclDataPacket(HCI_Packet): ''' See Bluetooth spec @ 5.4.2 HCI ACL Data Packets ''' @@ -5340,7 +5696,7 @@ class HCI_AclDataPacket: hci_packet_type = HCI_ACL_DATA_PACKET @staticmethod - def from_bytes(packet): + def from_bytes(packet: bytes) -> HCI_AclDataPacket: # Read the header h, data_total_length = struct.unpack_from('<HH', packet, 1) connection_handle = h & 0xFFF @@ -5373,7 +5729,7 @@ class HCI_AclDataPacket: def __str__(self): return ( f'{color("ACL", "blue")}: ' - f'handle=0x{self.connection_handle:04x}' + f'handle=0x{self.connection_handle:04x}, ' f'pb={self.pb_flag}, bc={self.bc_flag}, ' f'data_total_length={self.data_total_length}, ' f'data={self.data.hex()}' @@ -5382,12 +5738,14 @@ class HCI_AclDataPacket: # ----------------------------------------------------------------------------- class HCI_AclDataPacketAssembler: - def __init__(self, callback): + current_data: Optional[bytes] + + def __init__(self, callback: Callable[[bytes], Any]) -> None: self.callback = callback self.current_data = None self.l2cap_pdu_length = 0 - def feed_packet(self, packet): + def feed_packet(self, packet: HCI_AclDataPacket) -> None: if packet.pb_flag in ( HCI_ACL_PB_FIRST_NON_FLUSHABLE, HCI_ACL_PB_FIRST_FLUSHABLE, @@ -5401,6 +5759,7 @@ class HCI_AclDataPacketAssembler: return self.current_data += packet.data + assert self.current_data is not None if len(self.current_data) == self.l2cap_pdu_length + 4: # The packet is complete, invoke the callback logger.debug(f'<<< ACL PDU: {self.current_data.hex()}') |