diff options
Diffstat (limited to 'bumble/link.py')
-rw-r--r-- | bumble/link.py | 110 |
1 files changed, 109 insertions, 1 deletions
diff --git a/bumble/link.py b/bumble/link.py index 85ad96e..5ef56b7 100644 --- a/bumble/link.py +++ b/bumble/link.py @@ -26,9 +26,13 @@ from bumble.hci import ( HCI_SUCCESS, HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR, HCI_CONNECTION_TIMEOUT_ERROR, + HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, HCI_PAGE_TIMEOUT_ERROR, HCI_Connection_Complete_Event, ) +from bumble import controller + +from typing import Optional, Set # ----------------------------------------------------------------------------- # Logging @@ -57,6 +61,8 @@ class LocalLink: Link bus for controllers to communicate with each other ''' + controllers: Set[controller.Controller] + def __init__(self): self.controllers = set() self.pending_connection = None @@ -79,7 +85,9 @@ class LocalLink: return controller return None - def find_classic_controller(self, address): + def find_classic_controller( + self, address: Address + ) -> Optional[controller.Controller]: for controller in self.controllers: if controller.public_address == address: return controller @@ -188,6 +196,60 @@ class LocalLink: if peripheral_controller := self.find_controller(peripheral_address): peripheral_controller.on_link_encrypted(central_address, rand, ediv, ltk) + def create_cis( + self, + central_controller: controller.Controller, + peripheral_address: Address, + cig_id: int, + cis_id: int, + ) -> None: + logger.debug( + f'$$$ CIS Request {central_controller.random_address} -> {peripheral_address}' + ) + if peripheral_controller := self.find_controller(peripheral_address): + asyncio.get_running_loop().call_soon( + peripheral_controller.on_link_cis_request, + central_controller.random_address, + cig_id, + cis_id, + ) + + def accept_cis( + self, + peripheral_controller: controller.Controller, + central_address: Address, + cig_id: int, + cis_id: int, + ) -> None: + logger.debug( + f'$$$ CIS Accept {peripheral_controller.random_address} -> {central_address}' + ) + if central_controller := self.find_controller(central_address): + asyncio.get_running_loop().call_soon( + central_controller.on_link_cis_established, cig_id, cis_id + ) + asyncio.get_running_loop().call_soon( + peripheral_controller.on_link_cis_established, cig_id, cis_id + ) + + def disconnect_cis( + self, + initiator_controller: controller.Controller, + peer_address: Address, + cig_id: int, + cis_id: int, + ) -> None: + logger.debug( + f'$$$ CIS Disconnect {initiator_controller.random_address} -> {peer_address}' + ) + if peer_controller := self.find_controller(peer_address): + asyncio.get_running_loop().call_soon( + initiator_controller.on_link_cis_disconnected, cig_id, cis_id + ) + asyncio.get_running_loop().call_soon( + peer_controller.on_link_cis_disconnected, cig_id, cis_id + ) + ############################################################ # Classic handlers ############################################################ @@ -271,6 +333,52 @@ class LocalLink: initiator_controller.public_address, int(not (initiator_new_role)) ) + def classic_sco_connect( + self, + initiator_controller: controller.Controller, + responder_address: Address, + link_type: int, + ): + logger.debug( + f'[Classic] {initiator_controller.public_address} connects SCO to {responder_address}' + ) + responder_controller = self.find_classic_controller(responder_address) + # Initiator controller should handle it. + assert responder_controller + + responder_controller.on_classic_connection_request( + initiator_controller.public_address, + link_type, + ) + + def classic_accept_sco_connection( + self, + responder_controller: controller.Controller, + initiator_address: Address, + link_type: int, + ): + logger.debug( + f'[Classic] {responder_controller.public_address} accepts to connect SCO {initiator_address}' + ) + initiator_controller = self.find_classic_controller(initiator_address) + if initiator_controller is None: + responder_controller.on_classic_sco_connection_complete( + responder_controller.public_address, + HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR, + link_type, + ) + return + + async def task(): + initiator_controller.on_classic_sco_connection_complete( + responder_controller.public_address, HCI_SUCCESS, link_type + ) + + asyncio.create_task(task()) + responder_controller.on_classic_sco_connection_complete( + initiator_controller.public_address, HCI_SUCCESS, link_type + ) + # ----------------------------------------------------------------------------- class RemoteLink: |