跳转至

dbus python调用Bluez的API

dbus-python调用Bluez的API

连接到系统总线

busbus = dbus.SystemBus()

获得目前机器上有的适配器,改了一个函数,把busbus传进去,会返回一个列表,里面每个都是一个对象,他的顺序是倒着的

dbus 可以导出对象供其他应用程序使用,要与远程对象进行交互需要使用代理对象get_object获取代理对象

dbus 的接口是一组相关的方法和信号,使用 dbus.Interface(远程对象, 接口名称) 获得

GetManagedObjects() 可以获得所有对象和属性

def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
    objects = remote_om.GetManagedObjects()
    adapters = []
    for o, props in objects.items():
        if "org.bluez.GattManager1" in props.keys():
            adapters.append(o)
    print(adapters)
    if len(adapters) > 0:
        return adapters

    return None

1655264546479-b6e61a28-fb1e-474a-b4fe-1aa04f63181a.png

这样就把适配器选出来了

adapter_obj = bus.get_object("org.bluez", adapter)
adapter_props = dbus.Interface(adapter_obj, "org.freedesktop.DBus.Properties")
adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))

这里就涉及到 bluez 的 api 了,在 adapter-api 处定义了

3 这个是打开适配器,效果应该是hciconfig hci0 up

1655274443054-7f9724f8-7adf-4502-9d54-19e1a95c0257.png

Advertising

开头附 bluez 定义好的广播类

class Advertisement(dbus.service.Object):
    PATH_BASE = "/org/bluez/example/advertisement"

    def __init__(self, bus, index, advertising_type):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.ad_type = advertising_type
        self.service_uuids = None
        self.manufacturer_data = None
        self.solicit_uuids = None
        self.service_data = None
        self.local_name = None
        self.include_tx_power = None
        self.data = None
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        properties = dict()
        properties["Type"] = self.ad_type
        if self.service_uuids is not None:
            properties["ServiceUUIDs"] = dbus.Array(self.service_uuids, signature="s")
        if self.solicit_uuids is not None:
            properties["SolicitUUIDs"] = dbus.Array(self.solicit_uuids, signature="s")
        if self.manufacturer_data is not None:
            properties["ManufacturerData"] = dbus.Dictionary(
                self.manufacturer_data, signature="qv"
            )
        if self.service_data is not None:
            properties["ServiceData"] = dbus.Dictionary(
                self.service_data, signature="sv"
            )
        if self.local_name is not None:
            properties["LocalName"] = dbus.String(self.local_name)
        if self.include_tx_power is not None:
            properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power)

        if self.data is not None:
            properties["Data"] = dbus.Dictionary(self.data, signature="yv")
        return {LE_ADVERTISEMENT_IFACE: properties}

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service_uuid(self, uuid):
        if not self.service_uuids:
            self.service_uuids = []
        self.service_uuids.append(uuid)

    def add_solicit_uuid(self, uuid):
        if not self.solicit_uuids:
            self.solicit_uuids = []
        self.solicit_uuids.append(uuid)

    def add_manufacturer_data(self, manuf_code, data):
        if not self.manufacturer_data:
            self.manufacturer_data = dbus.Dictionary({}, signature="qv")
        self.manufacturer_data[manuf_code] = dbus.Array(data, signature="y")

    def add_service_data(self, uuid, data):
        if not self.service_data:
            self.service_data = dbus.Dictionary({}, signature="sv")
        self.service_data[uuid] = dbus.Array(data, signature="y")

    def add_local_name(self, name):
        if not self.local_name:
            self.local_name = ""
        self.local_name = dbus.String(name)

    def add_data(self, ad_type, data):
        if not self.data:
            self.data = dbus.Dictionary({}, signature="yv")
        self.data[ad_type] = dbus.Array(data, signature="y")

    @dbus.service.method(DBUS_PROP_IFACE, in_signature="s", out_signature="a{sv}")
    def GetAll(self, interface):
        logger.info("GetAll")
        if interface != LE_ADVERTISEMENT_IFACE:
            raise InvalidArgsException()
        logger.info("returning props")
        return self.get_properties()[LE_ADVERTISEMENT_IFACE]

    @dbus.service.method(LE_ADVERTISEMENT_IFACE, in_signature="", out_signature="")
    def Release(self):
        logger.info("%s: Released!" % self.path)

获取到 Advertising 的管理的接口

ad_manager = dbus.Interface(adapter_obj, "org.bluez.LEAdvertisingManager1")

然后先使用已经定义好的广播类,去创建一个广播对象

advertisement = Advertisement(bus, 0, "peripheral")
advertisement.add_manufacturer_data(0x038f, [0x11, 0x22],)   #这里下面就是设置广播数据了
advertisement.add_service_uuid("9fdc9c81-fffe-51a1-e511-5a38c414c2f9")
advertisement.add_local_name("BLUE-F045DAF3F9CC")
advertisement.include_tx_power = True

然后 RegisterAdvertisement 注册和使能广播接口,需要告诉 bluez 我们的 advertisement 在哪里,并且设置应答或错误处理函数

mainloop = MainLoop()
ad_manager.RegisterAdvertisement(advertisement.get_path(),{},reply_handler=register_ad_cb,error_handler=register_ad_error_cb,)
mainloop.run()

成功克隆出一个可以欺骗 APP 的广播包(他检测的是 service_uuid)

1655292913286-c2d24139-c286-407e-9e62-89841b8dfc50.png

1655292931205-04ecfe12-2260-4645-920d-b01c56d07db7.png

GATT

首先注册 GATT 的管理接口

service_manager = dbus.Interface(adapter_obj, "org.bluez.GattManager1")

定义一个 application,定义一个 service,给 service 添加 characteristic 然后把 service 给 application

app = Application(bus)
service = Service(bus, 2, "12634d89-d598-4874-8e86-7d042ee07ba7", True)
service.add_characteristic(TestCharacteristic(bus, 0, "12634d89-d598-4874-8e86-7d042ee07ba7", ["encrypt-read", "encrypt-write"], service))
app.add_service(service)

为了方便,直接定义了一个 TestCharacteristic 类,继承自官方示例中的 Characteristic 类

class TestCharacteristic(Characteristic):
    def __init__(self, bus, index, uuid, flags, service):
        Characteristic.__init__(
            self, bus, index, uuid, flags, service,
        )
        self.value = [0xFF]

    def ReadValue(self, options):
        return self.value

    def WriteValue(self, value, options):
        self.value = value

最后注册 GATT,这样就可以成功识别到特性了,读写这类的属性也可以显示出来

service_manager.RegisterApplication(app.get_path(),{},reply_handler=register_app_cb,error_handler=[register_app_error_cb],)

1655372787167-20b19f09-add5-4641-b753-48ea04491cb9.png

接下来解决数据收发的问题

问题备忘

如何检测外设是否可连接(connectable)

广播报文如何定义可连接与不可连接

gatt 如何设置不配对仅连接,以及如何设置各种配对的 IO 能力

参考:https://github.com/PunchThrough/espresso-ble

原文: https://www.yuque.com/hxfqg9/iot/lrnevg