aboutsummaryrefslogtreecommitdiff
path: root/bumble/hci.py
diff options
context:
space:
mode:
Diffstat (limited to 'bumble/hci.py')
-rw-r--r--bumble/hci.py961
1 files changed, 660 insertions, 301 deletions
diff --git a/bumble/hci.py b/bumble/hci.py
index 9b5793d..41deed2 100644
--- a/bumble/hci.py
+++ b/bumble/hci.py
@@ -16,11 +16,11 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
-import struct
import collections
-import logging
import functools
-from typing import Dict, Type, Union
+import logging
+import struct
+from typing import Any, Dict, Callable, Optional, Type, Union
from .colors import color
from .core import (
@@ -47,6 +47,10 @@ def hci_command_op_code(ogf, ocf):
return ogf << 10 | ocf
+def hci_vendor_command_op_code(ocf):
+ return hci_command_op_code(HCI_VENDOR_OGF, ocf)
+
+
def key_with_value(dictionary, target_value):
for key, value in dictionary.items():
if value == target_value:
@@ -62,7 +66,7 @@ def map_null_terminated_utf8_string(utf8_bytes):
try:
terminator = utf8_bytes.find(0)
if terminator < 0:
- return utf8_bytes
+ terminator = len(utf8_bytes)
return utf8_bytes[0:terminator].decode('utf8')
except UnicodeDecodeError:
return utf8_bytes
@@ -101,6 +105,8 @@ def phy_list_to_bits(phys):
# fmt: off
# pylint: disable=line-too-long
+HCI_VENDOR_OGF = 0x3F
+
# HCI Version
HCI_VERSION_BLUETOOTH_CORE_1_0B = 0
HCI_VERSION_BLUETOOTH_CORE_1_1 = 1
@@ -185,7 +191,7 @@ HCI_IO_CAPABILITY_REQUEST_EVENT = 0x31
HCI_IO_CAPABILITY_RESPONSE_EVENT = 0x32
HCI_USER_CONFIRMATION_REQUEST_EVENT = 0x33
HCI_USER_PASSKEY_REQUEST_EVENT = 0x34
-HCI_REMOTE_OOB_DATA_REQUEST = 0x35
+HCI_REMOTE_OOB_DATA_REQUEST_EVENT = 0x35
HCI_SIMPLE_PAIRING_COMPLETE_EVENT = 0x36
HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT = 0x38
HCI_ENHANCED_FLUSH_COMPLETE_EVENT = 0x39
@@ -206,10 +212,8 @@ HCI_INQUIRY_RESPONSE_NOTIFICATION_EVENT = 0X56
HCI_AUTHENTICATED_PAYLOAD_TIMEOUT_EXPIRED_EVENT = 0X57
HCI_SAM_STATUS_CHANGE_EVENT = 0X58
-HCI_EVENT_NAMES = {
- event_code: event_name for (event_name, event_code) in globals().items()
- if event_name.startswith('HCI_') and event_name.endswith('_EVENT')
-}
+HCI_VENDOR_EVENT = 0xFF
+
# HCI Subevent Codes
HCI_LE_CONNECTION_COMPLETE_EVENT = 0x01
@@ -248,10 +252,6 @@ HCI_LE_TRANSMIT_POWER_REPORTING_EVENT = 0X21
HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT = 0X22
HCI_LE_SUBRATE_CHANGE_EVENT = 0X23
-HCI_SUBEVENT_NAMES = {
- event_code: event_name for (event_name, event_code) in globals().items()
- if event_name.startswith('HCI_LE_') and event_name.endswith('_EVENT') and event_code != HCI_LE_META_EVENT
-}
# HCI Command
HCI_INQUIRY_COMMAND = hci_command_op_code(0x01, 0x0001)
@@ -557,10 +557,6 @@ HCI_LE_SET_DATA_RELATED_ADDRESS_CHANGES_COMMAND = hci_c
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND = hci_command_op_code(0x08, 0x007D)
HCI_LE_SUBRATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x007E)
-HCI_COMMAND_NAMES = {
- command_code: command_name for (command_name, command_code) in globals().items()
- if command_name.startswith('HCI_') and command_name.endswith('_COMMAND')
-}
# HCI Error Codes
# See Bluetooth spec Vol 2, Part D - 1.3 LIST OF ERROR CODES
@@ -1445,8 +1441,14 @@ class HCI_Object:
@staticmethod
def init_from_fields(hci_object, fields, values):
if isinstance(values, dict):
- for field_name, _ in fields:
- setattr(hci_object, field_name, values[field_name])
+ for field in fields:
+ if isinstance(field, list):
+ # The field is an array, up-level the array field names
+ for sub_field_name, _ in field:
+ setattr(hci_object, sub_field_name, values[sub_field_name])
+ else:
+ field_name = field[0]
+ setattr(hci_object, field_name, values[field_name])
else:
for field_name, field_value in zip(fields, values):
setattr(hci_object, field_name, field_value)
@@ -1457,132 +1459,160 @@ class HCI_Object:
HCI_Object.init_from_fields(hci_object, parsed.keys(), parsed.values())
@staticmethod
+ def parse_field(data, offset, field_type):
+ # The field_type may be a dictionary with a mapper, parser, and/or size
+ if isinstance(field_type, dict):
+ if 'size' in field_type:
+ field_type = field_type['size']
+ elif 'parser' in field_type:
+ field_type = field_type['parser']
+
+ # Parse the field
+ if field_type == '*':
+ # The rest of the bytes
+ field_value = data[offset:]
+ return (field_value, len(field_value))
+ if field_type == 1:
+ # 8-bit unsigned
+ return (data[offset], 1)
+ if field_type == -1:
+ # 8-bit signed
+ return (struct.unpack_from('b', data, offset)[0], 1)
+ if field_type == 2:
+ # 16-bit unsigned
+ return (struct.unpack_from('<H', data, offset)[0], 2)
+ if field_type == '>2':
+ # 16-bit unsigned big-endian
+ return (struct.unpack_from('>H', data, offset)[0], 2)
+ if field_type == -2:
+ # 16-bit signed
+ return (struct.unpack_from('<h', data, offset)[0], 2)
+ if field_type == 3:
+ # 24-bit unsigned
+ padded = data[offset : offset + 3] + bytes([0])
+ return (struct.unpack('<I', padded)[0], 3)
+ if field_type == 4:
+ # 32-bit unsigned
+ return (struct.unpack_from('<I', data, offset)[0], 4)
+ if field_type == '>4':
+ # 32-bit unsigned big-endian
+ return (struct.unpack_from('>I', data, offset)[0], 4)
+ if isinstance(field_type, int) and 4 < field_type <= 256:
+ # Byte array (from 5 up to 256 bytes)
+ return (data[offset : offset + field_type], field_type)
+ if callable(field_type):
+ new_offset, field_value = field_type(data, offset)
+ return (field_value, new_offset - offset)
+
+ raise ValueError(f'unknown field type {field_type}')
+
+ @staticmethod
def dict_from_bytes(data, offset, fields):
result = collections.OrderedDict()
- for (field_name, field_type) in fields:
- # The field_type may be a dictionary with a mapper, parser, and/or size
- if isinstance(field_type, dict):
- if 'size' in field_type:
- field_type = field_type['size']
- elif 'parser' in field_type:
- field_type = field_type['parser']
-
- # Parse the field
- if field_type == '*':
- # The rest of the bytes
- field_value = data[offset:]
- offset += len(field_value)
- elif field_type == 1:
- # 8-bit unsigned
- field_value = data[offset]
+ for field in fields:
+ if isinstance(field, list):
+ # This is an array field, starting with a 1-byte item count.
+ item_count = data[offset]
offset += 1
- elif field_type == -1:
- # 8-bit signed
- field_value = struct.unpack_from('b', data, offset)[0]
- offset += 1
- elif field_type == 2:
- # 16-bit unsigned
- field_value = struct.unpack_from('<H', data, offset)[0]
- offset += 2
- elif field_type == '>2':
- # 16-bit unsigned big-endian
- field_value = struct.unpack_from('>H', data, offset)[0]
- offset += 2
- elif field_type == -2:
- # 16-bit signed
- field_value = struct.unpack_from('<h', data, offset)[0]
- offset += 2
- elif field_type == 3:
- # 24-bit unsigned
- padded = data[offset : offset + 3] + bytes([0])
- field_value = struct.unpack('<I', padded)[0]
- offset += 3
- elif field_type == 4:
- # 32-bit unsigned
- field_value = struct.unpack_from('<I', data, offset)[0]
- offset += 4
- elif field_type == '>4':
- # 32-bit unsigned big-endian
- field_value = struct.unpack_from('>I', data, offset)[0]
- offset += 4
- elif isinstance(field_type, int) and 4 < field_type <= 256:
- # Byte array (from 5 up to 256 bytes)
- field_value = data[offset : offset + field_type]
- offset += field_type
- elif callable(field_type):
- offset, field_value = field_type(data, offset)
- else:
- raise ValueError(f'unknown field type {field_type}')
-
+ for _ in range(item_count):
+ for sub_field_name, sub_field_type in field:
+ value, size = HCI_Object.parse_field(
+ data, offset, sub_field_type
+ )
+ result.setdefault(sub_field_name, []).append(value)
+ offset += size
+ continue
+
+ field_name, field_type = field
+ field_value, field_size = HCI_Object.parse_field(data, offset, field_type)
result[field_name] = field_value
+ offset += field_size
return result
@staticmethod
- def dict_to_bytes(hci_object, fields):
- result = bytearray()
- for (field_name, field_type) in fields:
- # The field_type may be a dictionary with a mapper, parser, serializer,
- # and/or size
- serializer = None
- if isinstance(field_type, dict):
- if 'serializer' in field_type:
- serializer = field_type['serializer']
- if 'size' in field_type:
- field_type = field_type['size']
-
- # Serialize the field
- field_value = hci_object[field_name]
- if serializer:
- field_bytes = serializer(field_value)
- elif field_type == 1:
- # 8-bit unsigned
- field_bytes = bytes([field_value])
- elif field_type == -1:
- # 8-bit signed
- field_bytes = struct.pack('b', field_value)
- elif field_type == 2:
- # 16-bit unsigned
- field_bytes = struct.pack('<H', field_value)
- elif field_type == '>2':
- # 16-bit unsigned big-endian
- field_bytes = struct.pack('>H', field_value)
- elif field_type == -2:
- # 16-bit signed
- field_bytes = struct.pack('<h', field_value)
- elif field_type == 3:
- # 24-bit unsigned
- field_bytes = struct.pack('<I', field_value)[0:3]
- elif field_type == 4:
- # 32-bit unsigned
- field_bytes = struct.pack('<I', field_value)
- elif field_type == '>4':
- # 32-bit unsigned big-endian
- field_bytes = struct.pack('>I', field_value)
- elif field_type == '*':
- if isinstance(field_value, int):
- if 0 <= field_value <= 255:
- field_bytes = bytes([field_value])
- else:
- raise ValueError('value too large for *-typed field')
+ def serialize_field(field_value, field_type):
+ # The field_type may be a dictionary with a mapper, parser, serializer,
+ # and/or size
+ serializer = None
+ if isinstance(field_type, dict):
+ if 'serializer' in field_type:
+ serializer = field_type['serializer']
+ if 'size' in field_type:
+ field_type = field_type['size']
+
+ # Serialize the field
+ if serializer:
+ field_bytes = serializer(field_value)
+ elif field_type == 1:
+ # 8-bit unsigned
+ field_bytes = bytes([field_value])
+ elif field_type == -1:
+ # 8-bit signed
+ field_bytes = struct.pack('b', field_value)
+ elif field_type == 2:
+ # 16-bit unsigned
+ field_bytes = struct.pack('<H', field_value)
+ elif field_type == '>2':
+ # 16-bit unsigned big-endian
+ field_bytes = struct.pack('>H', field_value)
+ elif field_type == -2:
+ # 16-bit signed
+ field_bytes = struct.pack('<h', field_value)
+ elif field_type == 3:
+ # 24-bit unsigned
+ field_bytes = struct.pack('<I', field_value)[0:3]
+ elif field_type == 4:
+ # 32-bit unsigned
+ field_bytes = struct.pack('<I', field_value)
+ elif field_type == '>4':
+ # 32-bit unsigned big-endian
+ field_bytes = struct.pack('>I', field_value)
+ elif field_type == '*':
+ if isinstance(field_value, int):
+ if 0 <= field_value <= 255:
+ field_bytes = bytes([field_value])
else:
- field_bytes = bytes(field_value)
- elif isinstance(field_value, (bytes, bytearray)) or hasattr(
- field_value, 'to_bytes'
- ):
- field_bytes = bytes(field_value)
- if isinstance(field_type, int) and 4 < field_type <= 256:
- # Truncate or Pad with zeros if the field is too long or too short
- if len(field_bytes) < field_type:
- field_bytes += bytes(field_type - len(field_bytes))
- elif len(field_bytes) > field_type:
- field_bytes = field_bytes[:field_type]
+ raise ValueError('value too large for *-typed field')
else:
- raise ValueError(
- f"don't know how to serialize type {type(field_value)}"
+ field_bytes = bytes(field_value)
+ elif isinstance(field_value, (bytes, bytearray)) or hasattr(
+ field_value, 'to_bytes'
+ ):
+ field_bytes = bytes(field_value)
+ if isinstance(field_type, int) and 4 < field_type <= 256:
+ # Truncate or pad with zeros if the field is too long or too short
+ if len(field_bytes) < field_type:
+ field_bytes += bytes(field_type - len(field_bytes))
+ elif len(field_bytes) > field_type:
+ field_bytes = field_bytes[:field_type]
+ else:
+ raise ValueError(f"don't know how to serialize type {type(field_value)}")
+
+ return field_bytes
+
+ @staticmethod
+ def dict_to_bytes(hci_object, fields):
+ result = bytearray()
+ for field in fields:
+ if isinstance(field, list):
+ # The field is an array. The serialized form starts with a 1-byte
+ # item count. We use the length of the first array field as the
+ # array count, since all array fields have the same number of items.
+ item_count = len(hci_object[field[0][0]])
+ result += bytes([item_count]) + b''.join(
+ b''.join(
+ HCI_Object.serialize_field(
+ hci_object[sub_field_name][i], sub_field_type
+ )
+ for sub_field_name, sub_field_type in field
+ )
+ for i in range(item_count)
)
+ continue
- result += field_bytes
+ (field_name, field_type) = field
+ result += HCI_Object.serialize_field(hci_object[field_name], field_type)
return bytes(result)
@@ -1617,46 +1647,73 @@ class HCI_Object:
return str(value)
@staticmethod
- def format_fields(hci_object, keys, indentation='', value_mappers=None):
- if not keys:
- return ''
-
- # Measure the widest field name
- max_field_name_length = max(
- (len(key[0] if isinstance(key, tuple) else key) for key in keys)
+ def stringify_field(
+ field_name, field_type, field_value, indentation, value_mappers
+ ):
+ value_mapper = None
+ if isinstance(field_type, dict):
+ # Get the value mapper from the specifier
+ value_mapper = field_type.get('mapper')
+
+ # Check if there's a matching mapper passed
+ if value_mappers:
+ value_mapper = value_mappers.get(field_name, value_mapper)
+
+ # Map the value if we have a mapper
+ if value_mapper is not None:
+ field_value = value_mapper(field_value)
+
+ # Get the string representation of the value
+ return HCI_Object.format_field_value(
+ field_value, indentation=indentation + ' '
)
+ @staticmethod
+ def format_fields(hci_object, fields, indentation='', value_mappers=None):
+ if not fields:
+ return ''
+
# Build array of formatted key:value pairs
- fields = []
- for key in keys:
- value_mapper = None
- if isinstance(key, tuple):
- # The key has an associated specifier
- key, specifier = key
-
- # Get the value mapper from the specifier
- if isinstance(specifier, dict):
- value_mapper = specifier.get('mapper')
-
- # Get the value for the field
- value = hci_object[key]
-
- # Map the value if needed
- if value_mappers:
- value_mapper = value_mappers.get(key, value_mapper)
- if value_mapper is not None:
- value = value_mapper(value)
-
- # Get the string representation of the value
- value_str = HCI_Object.format_field_value(
- value, indentation=indentation + ' '
+ field_strings = []
+ for field in fields:
+ if isinstance(field, list):
+ for sub_field in field:
+ sub_field_name, sub_field_type = sub_field
+ item_count = len(hci_object[sub_field_name])
+ for i in range(item_count):
+ field_strings.append(
+ (
+ f'{sub_field_name}[{i}]',
+ HCI_Object.stringify_field(
+ sub_field_name,
+ sub_field_type,
+ hci_object[sub_field_name][i],
+ indentation,
+ value_mappers,
+ ),
+ ),
+ )
+ continue
+
+ field_name, field_type = field
+ field_value = hci_object[field_name]
+ field_strings.append(
+ (
+ field_name,
+ HCI_Object.stringify_field(
+ field_name, field_type, field_value, indentation, value_mappers
+ ),
+ ),
)
- # Add the field to the formatted result
- key_str = color(f'{key + ":":{1 + max_field_name_length}}', 'cyan')
- fields.append(f'{indentation}{key_str} {value_str}')
-
- return '\n'.join(fields)
+ # Measure the widest field name
+ max_field_name_length = max(len(s[0]) for s in field_strings)
+ sep = ':'
+ return '\n'.join(
+ f'{indentation}'
+ f'{color(f"{field_name + sep:{1 + max_field_name_length}}", "cyan")} {field_value}'
+ for field_name, field_value in field_strings
+ )
def __bytes__(self):
return self.to_bytes()
@@ -1795,6 +1852,16 @@ class Address:
def to_bytes(self):
return self.address_bytes
+ def to_string(self, with_type_qualifier=True):
+ '''
+ String representation of the address, MSB first, with an optional type
+ qualifier.
+ '''
+ result = ':'.join([f'{x:02X}' for x in reversed(self.address_bytes)])
+ if not with_type_qualifier or not self.is_public:
+ return result
+ return result + '/P'
+
def __bytes__(self):
return self.to_bytes()
@@ -1808,13 +1875,7 @@ class Address:
)
def __str__(self):
- '''
- String representation of the address, MSB first
- '''
- result = ':'.join([f'{x:02X}' for x in reversed(self.address_bytes)])
- if not self.is_public:
- return result
- return result + '/P'
+ return self.to_string()
# Predefined address values
@@ -1853,7 +1914,7 @@ class HCI_Packet:
hci_packet_type: int
@staticmethod
- def from_bytes(packet):
+ def from_bytes(packet: bytes) -> HCI_Packet:
packet_type = packet[0]
if packet_type == HCI_COMMAND_PACKET:
@@ -1895,6 +1956,7 @@ class HCI_Command(HCI_Packet):
'''
hci_packet_type = HCI_COMMAND_PACKET
+ command_names: Dict[int, str] = {}
command_classes: Dict[int, Type[HCI_Command]] = {}
@staticmethod
@@ -1905,9 +1967,9 @@ class HCI_Command(HCI_Packet):
def inner(cls):
cls.name = cls.__name__.upper()
- cls.op_code = key_with_value(HCI_COMMAND_NAMES, cls.name)
+ cls.op_code = key_with_value(cls.command_names, cls.name)
if cls.op_code is None:
- raise KeyError(f'command {cls.name} not found in HCI_COMMAND_NAMES')
+ raise KeyError(f'command {cls.name} not found in command_names')
cls.fields = fields
cls.return_parameters_fields = return_parameters_fields
@@ -1927,7 +1989,19 @@ class HCI_Command(HCI_Packet):
return inner
@staticmethod
- def from_bytes(packet):
+ def command_map(symbols: Dict[str, Any]) -> Dict[int, str]:
+ return {
+ command_code: command_name
+ for (command_name, command_code) in symbols.items()
+ if command_name.startswith('HCI_') and command_name.endswith('_COMMAND')
+ }
+
+ @classmethod
+ def register_commands(cls, symbols: Dict[str, Any]) -> None:
+ cls.command_names.update(cls.command_map(symbols))
+
+ @staticmethod
+ def from_bytes(packet: bytes) -> HCI_Command:
op_code, length = struct.unpack_from('<HB', packet, 1)
parameters = packet[4:]
if len(parameters) != length:
@@ -1946,11 +2020,11 @@ class HCI_Command(HCI_Packet):
HCI_Object.init_from_bytes(self, parameters, 0, fields)
return self
- return cls.from_parameters(parameters)
+ return cls.from_parameters(parameters) # type: ignore
@staticmethod
def command_name(op_code):
- name = HCI_COMMAND_NAMES.get(op_code)
+ name = HCI_Command.command_names.get(op_code)
if name is not None:
return name
return f'[OGF=0x{op_code >> 10:02x}, OCF=0x{op_code & 0x3FF:04x}]'
@@ -1959,6 +2033,16 @@ class HCI_Command(HCI_Packet):
def create_return_parameters(cls, **kwargs):
return HCI_Object(cls.return_parameters_fields, **kwargs)
+ @classmethod
+ def parse_return_parameters(cls, parameters):
+ if not cls.return_parameters_fields:
+ return None
+ return_parameters = HCI_Object.from_bytes(
+ parameters, 0, cls.return_parameters_fields
+ )
+ return_parameters.fields = cls.return_parameters_fields
+ return return_parameters
+
def __init__(self, op_code, parameters=None, **kwargs):
super().__init__(HCI_Command.command_name(op_code))
if (fields := getattr(self, 'fields', None)) and kwargs:
@@ -1988,6 +2072,9 @@ class HCI_Command(HCI_Packet):
return result
+HCI_Command.register_commands(globals())
+
+
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
@@ -2284,6 +2371,55 @@ class HCI_User_Passkey_Request_Negative_Reply_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command(
+ fields=[
+ ('bd_addr', Address.parse_address),
+ ('c', 16),
+ ('r', 16),
+ ],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('bd_addr', Address.parse_address),
+ ],
+)
+class HCI_Remote_OOB_Data_Request_Reply_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.34 Remote OOB Data Request Reply Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[('bd_addr', Address.parse_address)],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('bd_addr', Address.parse_address),
+ ],
+)
+class HCI_Remote_OOB_Data_Request_Negative_Reply_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.35 Remote OOB Data Request Negative Reply Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('bd_addr', Address.parse_address),
+ ('reason', 1),
+ ],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('bd_addr', Address.parse_address),
+ ],
+)
+class HCI_IO_Capability_Request_Negative_Reply_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.36 IO Capability Request Negative Reply Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
[
('connection_handle', 2),
('transmit_bandwidth', 4),
@@ -2320,6 +2456,161 @@ class HCI_Enhanced_Setup_Synchronous_Connection_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
+ ('bd_addr', Address.parse_address),
+ ('transmit_bandwidth', 4),
+ ('receive_bandwidth', 4),
+ ('transmit_coding_format', 5),
+ ('receive_coding_format', 5),
+ ('transmit_codec_frame_size', 2),
+ ('receive_codec_frame_size', 2),
+ ('input_bandwidth', 4),
+ ('output_bandwidth', 4),
+ ('input_coding_format', 5),
+ ('output_coding_format', 5),
+ ('input_coded_data_size', 2),
+ ('output_coded_data_size', 2),
+ ('input_pcm_data_format', 1),
+ ('output_pcm_data_format', 1),
+ ('input_pcm_sample_payload_msb_position', 1),
+ ('output_pcm_sample_payload_msb_position', 1),
+ ('input_data_path', 1),
+ ('output_data_path', 1),
+ ('input_transport_unit_size', 1),
+ ('output_transport_unit_size', 1),
+ ('max_latency', 2),
+ ('packet_type', 2),
+ ('retransmission_effort', 1),
+ ]
+)
+class HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.46 Enhanced Accept Synchronous Connection Request Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('bd_addr', Address.parse_address),
+ ('page_scan_repetition_mode', 1),
+ ('clock_offset', 2),
+ ]
+)
+class HCI_Truncated_Page_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.47 Truncated Page Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[('bd_addr', Address.parse_address)],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('bd_addr', Address.parse_address),
+ ],
+)
+class HCI_Truncated_Page_Cancel_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.48 Truncated Page Cancel Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('enable', 1),
+ ('lt_addr', 1),
+ ('lpo_allowed', 1),
+ ('packet_type', 2),
+ ('interval_min', 2),
+ ('interval_max', 2),
+ ('supervision_timeout', 2),
+ ],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('lt_addr', 1),
+ ('interval', 2),
+ ],
+)
+class HCI_Set_Connectionless_Peripheral_Broadcast_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.49 Set Connectionless Peripheral Broadcast Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('enable', 1),
+ ('bd_addr', Address.parse_address),
+ ('lt_addr', 1),
+ ('interval', 2),
+ ('clock_offset', 4),
+ ('next_connectionless_peripheral_broadcast_clock', 4),
+ ('supervision_timeout', 2),
+ ('remote_timing_accuracy', 1),
+ ('skip', 1),
+ ('packet_type', 2),
+ ('afh_channel_map', 10),
+ ],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('bd_addr', Address.parse_address),
+ ('lt_addr', 1),
+ ],
+)
+class HCI_Set_Connectionless_Peripheral_Broadcast_Receive_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.50 Set Connectionless Peripheral Broadcast Receive Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+class HCI_Start_Synchronization_Train_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.51 Start Synchronization Train Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('bd_addr', Address.parse_address),
+ ('sync_scan_timeout', 2),
+ ('sync_scan_window', 2),
+ ('sync_scan_interval', 2),
+ ],
+)
+class HCI_Receive_Synchronization_Train_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.52 Receive Synchronization Train Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ fields=[
+ ('bd_addr', Address.parse_address),
+ ('c_192', 16),
+ ('r_192', 16),
+ ('c_256', 16),
+ ('r_256', 16),
+ ],
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('bd_addr', Address.parse_address),
+ ],
+)
+class HCI_Remote_OOB_Extended_Data_Request_Reply_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.1.53 Remote OOB Extended Data Request Reply Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ [
('connection_handle', 2),
('sniff_max_interval', 2),
('sniff_min_interval', 2),
@@ -2685,6 +2976,20 @@ class HCI_Write_Simple_Pairing_Mode_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command(
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('c', 16),
+ ('r', 16),
+ ]
+)
+class HCI_Read_Local_OOB_Data_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.3.60 Read Local OOB Data Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
return_parameters_fields=[('status', STATUS_SPEC), ('tx_power', -1)]
)
class HCI_Read_Inquiry_Response_Transmit_Power_Level_Command(HCI_Command):
@@ -2747,6 +3052,22 @@ class HCI_Write_Authenticated_Payload_Timeout_Command(HCI_Command):
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
+ ('c_192', 16),
+ ('r_192', 16),
+ ('c_256', 16),
+ ('r_256', 16),
+ ]
+)
+class HCI_Read_Local_OOB_Extended_Data_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.3.95 Read Local OOB Extended Data Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
('hci_version', 1),
('hci_subversion', 2),
('lmp_version', 1),
@@ -3529,9 +3850,7 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
'advertising_data',
{
'parser': HCI_Object.parse_length_prefixed_bytes,
- 'serializer': functools.partial(
- HCI_Object.serialize_length_prefixed_bytes
- ),
+ 'serializer': HCI_Object.serialize_length_prefixed_bytes,
},
),
]
@@ -3579,9 +3898,7 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
'scan_response_data',
{
'parser': HCI_Object.parse_length_prefixed_bytes,
- 'serializer': functools.partial(
- HCI_Object.serialize_length_prefixed_bytes
- ),
+ 'serializer': HCI_Object.serialize_length_prefixed_bytes,
},
),
]
@@ -3609,73 +3926,21 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command):
# -----------------------------------------------------------------------------
-@HCI_Command.command(fields=None)
+@HCI_Command.command(
+ [
+ ('enable', 1),
+ [
+ ('advertising_handles', 1),
+ ('durations', 2),
+ ('max_extended_advertising_events', 1),
+ ],
+ ]
+)
class HCI_LE_Set_Extended_Advertising_Enable_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.56 LE Set Extended Advertising Enable Command
'''
- @classmethod
- def from_parameters(cls, parameters):
- enable = parameters[0]
- num_sets = parameters[1]
- advertising_handles = []
- durations = []
- max_extended_advertising_events = []
- offset = 2
- for _ in range(num_sets):
- advertising_handles.append(parameters[offset])
- durations.append(struct.unpack_from('<H', parameters, offset + 1)[0])
- max_extended_advertising_events.append(parameters[offset + 3])
- offset += 4
-
- return cls(
- enable, advertising_handles, durations, max_extended_advertising_events
- )
-
- def __init__(
- self, enable, advertising_handles, durations, max_extended_advertising_events
- ):
- super().__init__(HCI_LE_SET_EXTENDED_ADVERTISING_ENABLE_COMMAND)
- self.enable = enable
- self.advertising_handles = advertising_handles
- self.durations = durations
- self.max_extended_advertising_events = max_extended_advertising_events
-
- self.parameters = bytes([enable, len(advertising_handles)]) + b''.join(
- [
- struct.pack(
- '<BHB',
- advertising_handles[i],
- durations[i],
- max_extended_advertising_events[i],
- )
- for i in range(len(advertising_handles))
- ]
- )
-
- def __str__(self):
- fields = [('enable:', self.enable)]
- for i, advertising_handle in enumerate(self.advertising_handles):
- fields.append(
- (f'advertising_handle[{i}]: ', advertising_handle)
- )
- fields.append((f'duration[{i}]: ', self.durations[i]))
- fields.append(
- (
- f'max_extended_advertising_events[{i}]:',
- self.max_extended_advertising_events[i],
- )
- )
-
- return (
- color(self.name, 'green')
- + ':\n'
- + '\n'.join(
- [color(field[0], 'cyan') + ' ' + str(field[1]) for field in fields]
- )
- )
-
# -----------------------------------------------------------------------------
@HCI_Command.command(
@@ -3826,7 +4091,10 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command):
color(self.name, 'green')
+ ':\n'
+ '\n'.join(
- [color(field[0], 'cyan') + ' ' + str(field[1]) for field in fields]
+ [
+ color(' ' + field[0], 'cyan') + ' ' + str(field[1])
+ for field in fields
+ ]
)
)
@@ -4002,7 +4270,10 @@ class HCI_LE_Extended_Create_Connection_Command(HCI_Command):
color(self.name, 'green')
+ ':\n'
+ '\n'.join(
- [color(field[0], 'cyan') + ' ' + str(field[1]) for field in fields]
+ [
+ color(' ' + field[0], 'cyan') + ' ' + str(field[1])
+ for field in fields
+ ]
)
)
@@ -4059,8 +4330,8 @@ class HCI_Event(HCI_Packet):
'''
hci_packet_type = HCI_EVENT_PACKET
+ event_names: Dict[int, str] = {}
event_classes: Dict[int, Type[HCI_Event]] = {}
- meta_event_classes: Dict[int, Type[HCI_LE_Meta_Event]] = {}
@staticmethod
def event(fields=()):
@@ -4070,9 +4341,9 @@ class HCI_Event(HCI_Packet):
def inner(cls):
cls.name = cls.__name__.upper()
- cls.event_code = key_with_value(HCI_EVENT_NAMES, cls.name)
+ cls.event_code = key_with_value(cls.event_names, cls.name)
if cls.event_code is None:
- raise KeyError('event not found in HCI_EVENT_NAMES')
+ raise KeyError(f'event {cls.name} not found in event_names')
cls.fields = fields
# Patch the __init__ method to fix the event_code
@@ -4089,11 +4360,29 @@ class HCI_Event(HCI_Packet):
return inner
@staticmethod
+ def event_map(symbols: Dict[str, Any]) -> Dict[int, str]:
+ return {
+ event_code: event_name
+ for (event_name, event_code) in symbols.items()
+ if event_name.startswith('HCI_')
+ and not event_name.startswith('HCI_LE_')
+ and event_name.endswith('_EVENT')
+ }
+
+ @staticmethod
+ def event_name(event_code):
+ return name_or_number(HCI_Event.event_names, event_code)
+
+ @staticmethod
+ def register_events(symbols: Dict[str, Any]) -> None:
+ HCI_Event.event_names.update(HCI_Event.event_map(symbols))
+
+ @staticmethod
def registered(event_class):
event_class.name = event_class.__name__.upper()
- event_class.event_code = key_with_value(HCI_EVENT_NAMES, event_class.name)
+ event_class.event_code = key_with_value(HCI_Event.event_names, event_class.name)
if event_class.event_code is None:
- raise KeyError('event not found in HCI_EVENT_NAMES')
+ raise KeyError(f'event {event_class.name} not found in event_names')
# Register a factory for this class
HCI_Event.event_classes[event_class.event_code] = event_class
@@ -4101,22 +4390,28 @@ class HCI_Event(HCI_Packet):
return event_class
@staticmethod
- def from_bytes(packet):
+ def from_bytes(packet: bytes) -> HCI_Event:
event_code = packet[1]
length = packet[2]
parameters = packet[3:]
if len(parameters) != length:
raise ValueError('invalid packet length')
+ cls: Any
if event_code == HCI_LE_META_EVENT:
# We do this dispatch here and not in the subclass in order to avoid call
# loops
subevent_code = parameters[0]
- cls = HCI_Event.meta_event_classes.get(subevent_code)
+ cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
if cls is None:
# No class registered, just use a generic class instance
return HCI_LE_Meta_Event(subevent_code, parameters)
-
+ elif event_code == HCI_VENDOR_EVENT:
+ subevent_code = parameters[0]
+ cls = HCI_Vendor_Event.subevent_classes.get(subevent_code)
+ if cls is None:
+ # No class registered, just use a generic class instance
+ return HCI_Vendor_Event(subevent_code, parameters)
else:
cls = HCI_Event.event_classes.get(event_code)
if cls is None:
@@ -4124,7 +4419,7 @@ class HCI_Event(HCI_Packet):
return HCI_Event(event_code, parameters)
# Invoke the factory to create a new instance
- return cls.from_parameters(parameters)
+ return cls.from_parameters(parameters) # type: ignore
@classmethod
def from_parameters(cls, parameters):
@@ -4134,10 +4429,6 @@ class HCI_Event(HCI_Packet):
HCI_Object.init_from_bytes(self, parameters, 0, fields)
return self
- @staticmethod
- def event_name(event_code):
- return name_or_number(HCI_EVENT_NAMES, event_code)
-
def __init__(self, event_code, parameters=None, **kwargs):
super().__init__(HCI_Event.event_name(event_code))
if (fields := getattr(self, 'fields', None)) and kwargs:
@@ -4164,71 +4455,111 @@ class HCI_Event(HCI_Packet):
return result
+HCI_Event.register_events(globals())
+
+
# -----------------------------------------------------------------------------
-class HCI_LE_Meta_Event(HCI_Event):
+class HCI_Extended_Event(HCI_Event):
'''
- See Bluetooth spec @ 7.7.65 LE Meta Event
+ HCI_Event subclass for events that has a subevent code.
'''
- @staticmethod
- def event(fields=()):
+ subevent_names: Dict[int, str] = {}
+ subevent_classes: Dict[int, Type[HCI_Extended_Event]]
+
+ @classmethod
+ def event(cls, fields=()):
'''
Decorator used to declare and register subclasses
'''
def inner(cls):
cls.name = cls.__name__.upper()
- cls.subevent_code = key_with_value(HCI_SUBEVENT_NAMES, cls.name)
+ cls.subevent_code = key_with_value(cls.subevent_names, cls.name)
if cls.subevent_code is None:
- raise KeyError('subevent not found in HCI_SUBEVENT_NAMES')
+ raise KeyError(f'subevent {cls.name} not found in subevent_names')
cls.fields = fields
# Patch the __init__ method to fix the subevent_code
+ original_init = cls.__init__
+
def init(self, parameters=None, **kwargs):
- return HCI_LE_Meta_Event.__init__(
- self, cls.subevent_code, parameters, **kwargs
- )
+ return original_init(self, cls.subevent_code, parameters, **kwargs)
cls.__init__ = init
# Register a factory for this class
- HCI_Event.meta_event_classes[cls.subevent_code] = cls
+ cls.subevent_classes[cls.subevent_code] = cls
return cls
return inner
@classmethod
+ def subevent_name(cls, subevent_code):
+ subevent_name = cls.subevent_names.get(subevent_code)
+ if subevent_name is not None:
+ return subevent_name
+
+ return f'{cls.__name__.upper()}[0x{subevent_code:02X}]'
+
+ @staticmethod
+ def subevent_map(symbols: Dict[str, Any]) -> Dict[int, str]:
+ return {
+ subevent_code: subevent_name
+ for (subevent_name, subevent_code) in symbols.items()
+ if subevent_name.startswith('HCI_') and subevent_name.endswith('_EVENT')
+ }
+
+ @classmethod
+ def register_subevents(cls, symbols: Dict[str, Any]) -> None:
+ cls.subevent_names.update(cls.subevent_map(symbols))
+
+ @classmethod
def from_parameters(cls, parameters):
self = cls.__new__(cls)
- HCI_LE_Meta_Event.__init__(self, self.subevent_code, parameters)
+ HCI_Extended_Event.__init__(self, self.subevent_code, parameters)
if fields := getattr(self, 'fields', None):
HCI_Object.init_from_bytes(self, parameters, 1, fields)
return self
- @staticmethod
- def subevent_name(subevent_code):
- return name_or_number(HCI_SUBEVENT_NAMES, subevent_code)
-
def __init__(self, subevent_code, parameters, **kwargs):
self.subevent_code = subevent_code
if parameters is None and (fields := getattr(self, 'fields', None)) and kwargs:
parameters = bytes([subevent_code]) + HCI_Object.dict_to_bytes(
kwargs, fields
)
- super().__init__(HCI_LE_META_EVENT, parameters, **kwargs)
+ super().__init__(self.event_code, parameters, **kwargs)
# Override the name in order to adopt the subevent name instead
self.name = self.subevent_name(subevent_code)
- def __str__(self):
- result = color(self.subevent_name(self.subevent_code), 'magenta')
- if fields := getattr(self, 'fields', None):
- result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ')
- else:
- if self.parameters:
- result += f': {self.parameters.hex()}'
- return result
+
+# -----------------------------------------------------------------------------
+class HCI_LE_Meta_Event(HCI_Extended_Event):
+ '''
+ See Bluetooth spec @ 7.7.65 LE Meta Event
+ '''
+
+ event_code: int = HCI_LE_META_EVENT
+ subevent_classes = {}
+
+ @staticmethod
+ def subevent_map(symbols: Dict[str, Any]) -> Dict[int, str]:
+ return {
+ subevent_code: subevent_name
+ for (subevent_name, subevent_code) in symbols.items()
+ if subevent_name.startswith('HCI_LE_') and subevent_name.endswith('_EVENT')
+ }
+
+
+HCI_LE_Meta_Event.register_subevents(globals())
+
+
+# -----------------------------------------------------------------------------
+class HCI_Vendor_Event(HCI_Extended_Event):
+ event_code: int = HCI_VENDOR_EVENT
+ subevent_classes = {}
# -----------------------------------------------------------------------------
@@ -4342,7 +4673,7 @@ class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event):
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}'
-HCI_Event.meta_event_classes[
+HCI_LE_Meta_Event.subevent_classes[
HCI_LE_ADVERTISING_REPORT_EVENT
] = HCI_LE_Advertising_Report_Event
@@ -4596,7 +4927,7 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event):
return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}'
-HCI_Event.meta_event_classes[
+HCI_LE_Meta_Event.subevent_classes[
HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT
] = HCI_LE_Extended_Advertising_Report_Event
@@ -4837,6 +5168,7 @@ class HCI_Command_Complete_Event(HCI_Event):
'''
return_parameters = b''
+ command_opcode: int
def map_return_parameters(self, return_parameters):
'''Map simple 'status' return parameters to their named constant form'''
@@ -4869,11 +5201,11 @@ class HCI_Command_Complete_Event(HCI_Event):
self.return_parameters = self.return_parameters[0]
else:
cls = HCI_Command.command_classes.get(self.command_opcode)
- if cls and cls.return_parameters_fields:
- self.return_parameters = HCI_Object.from_bytes(
- self.return_parameters, 0, cls.return_parameters_fields
- )
- self.return_parameters.fields = cls.return_parameters_fields
+ if cls:
+ # Try to parse the return parameters bytes into an object.
+ return_parameters = cls.parse_return_parameters(self.return_parameters)
+ if return_parameters is not None:
+ self.return_parameters = return_parameters
return self
@@ -4965,7 +5297,7 @@ class HCI_Number_Of_Completed_Packets_Event(HCI_Event):
def __str__(self):
lines = [
color(self.name, 'magenta') + ':',
- color(' number_of_handles: ', 'cyan')
+ color(' number_of_handles: ', 'cyan')
+ f'{len(self.connection_handles)}',
]
for i, connection_handle in enumerate(self.connection_handles):
@@ -5300,6 +5632,14 @@ class HCI_User_Passkey_Request_Event(HCI_Event):
# -----------------------------------------------------------------------------
+@HCI_Event.event([('bd_addr', Address.parse_address)])
+class HCI_Remote_OOB_Data_Request_Event(HCI_Event):
+ '''
+ See Bluetooth spec @ 7.7.44 Remote OOB Data Request Event
+ '''
+
+
+# -----------------------------------------------------------------------------
@HCI_Event.event([('status', STATUS_SPEC), ('bd_addr', Address.parse_address)])
class HCI_Simple_Pairing_Complete_Event(HCI_Event):
'''
@@ -5316,6 +5656,14 @@ class HCI_Link_Supervision_Timeout_Changed_Event(HCI_Event):
# -----------------------------------------------------------------------------
+@HCI_Event.event([('handle', 2)])
+class HCI_Enhanced_Flush_Complete_Event(HCI_Event):
+ '''
+ See Bluetooth spec @ 7.7.47 Enhanced Flush Complete Event
+ '''
+
+
+# -----------------------------------------------------------------------------
@HCI_Event.event([('bd_addr', Address.parse_address), ('passkey', 4)])
class HCI_User_Passkey_Notification_Event(HCI_Event):
'''
@@ -5324,6 +5672,14 @@ class HCI_User_Passkey_Notification_Event(HCI_Event):
# -----------------------------------------------------------------------------
+@HCI_Event.event([('bd_addr', Address.parse_address), ('notification_type', 1)])
+class HCI_Keypress_Notification_Event(HCI_Event):
+ '''
+ See Bluetooth spec @ 7.7.49 Keypress Notification Event
+ '''
+
+
+# -----------------------------------------------------------------------------
@HCI_Event.event([('bd_addr', Address.parse_address), ('host_supported_features', 8)])
class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event):
'''
@@ -5332,7 +5688,7 @@ class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event):
# -----------------------------------------------------------------------------
-class HCI_AclDataPacket:
+class HCI_AclDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.2 HCI ACL Data Packets
'''
@@ -5340,7 +5696,7 @@ class HCI_AclDataPacket:
hci_packet_type = HCI_ACL_DATA_PACKET
@staticmethod
- def from_bytes(packet):
+ def from_bytes(packet: bytes) -> HCI_AclDataPacket:
# Read the header
h, data_total_length = struct.unpack_from('<HH', packet, 1)
connection_handle = h & 0xFFF
@@ -5373,7 +5729,7 @@ class HCI_AclDataPacket:
def __str__(self):
return (
f'{color("ACL", "blue")}: '
- f'handle=0x{self.connection_handle:04x}'
+ f'handle=0x{self.connection_handle:04x}, '
f'pb={self.pb_flag}, bc={self.bc_flag}, '
f'data_total_length={self.data_total_length}, '
f'data={self.data.hex()}'
@@ -5382,12 +5738,14 @@ class HCI_AclDataPacket:
# -----------------------------------------------------------------------------
class HCI_AclDataPacketAssembler:
- def __init__(self, callback):
+ current_data: Optional[bytes]
+
+ def __init__(self, callback: Callable[[bytes], Any]) -> None:
self.callback = callback
self.current_data = None
self.l2cap_pdu_length = 0
- def feed_packet(self, packet):
+ def feed_packet(self, packet: HCI_AclDataPacket) -> None:
if packet.pb_flag in (
HCI_ACL_PB_FIRST_NON_FLUSHABLE,
HCI_ACL_PB_FIRST_FLUSHABLE,
@@ -5401,6 +5759,7 @@ class HCI_AclDataPacketAssembler:
return
self.current_data += packet.data
+ assert self.current_data is not None
if len(self.current_data) == self.l2cap_pdu_length + 4:
# The packet is complete, invoke the callback
logger.debug(f'<<< ACL PDU: {self.current_data.hex()}')