aboutsummaryrefslogtreecommitdiff
path: root/bumble/link.py
diff options
context:
space:
mode:
Diffstat (limited to 'bumble/link.py')
-rw-r--r--bumble/link.py110
1 files changed, 109 insertions, 1 deletions
diff --git a/bumble/link.py b/bumble/link.py
index 85ad96e..5ef56b7 100644
--- a/bumble/link.py
+++ b/bumble/link.py
@@ -26,9 +26,13 @@ from bumble.hci import (
HCI_SUCCESS,
HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR,
HCI_CONNECTION_TIMEOUT_ERROR,
+ HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
HCI_PAGE_TIMEOUT_ERROR,
HCI_Connection_Complete_Event,
)
+from bumble import controller
+
+from typing import Optional, Set
# -----------------------------------------------------------------------------
# Logging
@@ -57,6 +61,8 @@ class LocalLink:
Link bus for controllers to communicate with each other
'''
+ controllers: Set[controller.Controller]
+
def __init__(self):
self.controllers = set()
self.pending_connection = None
@@ -79,7 +85,9 @@ class LocalLink:
return controller
return None
- def find_classic_controller(self, address):
+ def find_classic_controller(
+ self, address: Address
+ ) -> Optional[controller.Controller]:
for controller in self.controllers:
if controller.public_address == address:
return controller
@@ -188,6 +196,60 @@ class LocalLink:
if peripheral_controller := self.find_controller(peripheral_address):
peripheral_controller.on_link_encrypted(central_address, rand, ediv, ltk)
+ def create_cis(
+ self,
+ central_controller: controller.Controller,
+ peripheral_address: Address,
+ cig_id: int,
+ cis_id: int,
+ ) -> None:
+ logger.debug(
+ f'$$$ CIS Request {central_controller.random_address} -> {peripheral_address}'
+ )
+ if peripheral_controller := self.find_controller(peripheral_address):
+ asyncio.get_running_loop().call_soon(
+ peripheral_controller.on_link_cis_request,
+ central_controller.random_address,
+ cig_id,
+ cis_id,
+ )
+
+ def accept_cis(
+ self,
+ peripheral_controller: controller.Controller,
+ central_address: Address,
+ cig_id: int,
+ cis_id: int,
+ ) -> None:
+ logger.debug(
+ f'$$$ CIS Accept {peripheral_controller.random_address} -> {central_address}'
+ )
+ if central_controller := self.find_controller(central_address):
+ asyncio.get_running_loop().call_soon(
+ central_controller.on_link_cis_established, cig_id, cis_id
+ )
+ asyncio.get_running_loop().call_soon(
+ peripheral_controller.on_link_cis_established, cig_id, cis_id
+ )
+
+ def disconnect_cis(
+ self,
+ initiator_controller: controller.Controller,
+ peer_address: Address,
+ cig_id: int,
+ cis_id: int,
+ ) -> None:
+ logger.debug(
+ f'$$$ CIS Disconnect {initiator_controller.random_address} -> {peer_address}'
+ )
+ if peer_controller := self.find_controller(peer_address):
+ asyncio.get_running_loop().call_soon(
+ initiator_controller.on_link_cis_disconnected, cig_id, cis_id
+ )
+ asyncio.get_running_loop().call_soon(
+ peer_controller.on_link_cis_disconnected, cig_id, cis_id
+ )
+
############################################################
# Classic handlers
############################################################
@@ -271,6 +333,52 @@ class LocalLink:
initiator_controller.public_address, int(not (initiator_new_role))
)
+ def classic_sco_connect(
+ self,
+ initiator_controller: controller.Controller,
+ responder_address: Address,
+ link_type: int,
+ ):
+ logger.debug(
+ f'[Classic] {initiator_controller.public_address} connects SCO to {responder_address}'
+ )
+ responder_controller = self.find_classic_controller(responder_address)
+ # Initiator controller should handle it.
+ assert responder_controller
+
+ responder_controller.on_classic_connection_request(
+ initiator_controller.public_address,
+ link_type,
+ )
+
+ def classic_accept_sco_connection(
+ self,
+ responder_controller: controller.Controller,
+ initiator_address: Address,
+ link_type: int,
+ ):
+ logger.debug(
+ f'[Classic] {responder_controller.public_address} accepts to connect SCO {initiator_address}'
+ )
+ initiator_controller = self.find_classic_controller(initiator_address)
+ if initiator_controller is None:
+ responder_controller.on_classic_sco_connection_complete(
+ responder_controller.public_address,
+ HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
+ link_type,
+ )
+ return
+
+ async def task():
+ initiator_controller.on_classic_sco_connection_complete(
+ responder_controller.public_address, HCI_SUCCESS, link_type
+ )
+
+ asyncio.create_task(task())
+ responder_controller.on_classic_sco_connection_complete(
+ initiator_controller.public_address, HCI_SUCCESS, link_type
+ )
+
# -----------------------------------------------------------------------------
class RemoteLink: