python的低功耗蓝牙工具

本文最后更新于 2024年6月25日 早上

最近需要接收一个低功耗蓝牙设备的数据,就想着去找个类似串口助手的收发工具,Android上是有些,但是PC上一个没找到,于是就去github看看有没有,目前就发现了三个,bluepy、pygatt、bleak。只看了python的,没学习成本。最后用pygatt撸了个自用的蓝牙助手

使用bleak

安装

1
pip install bleak

文档:官方文档 | 源码

扫描

例一

通过监控广播数据发现BLE设备,以下是使用bleak.backends.scanner.BleakScanner的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

import asyncio
from bleak import BleakScanner
# 扫描发现设备时的回调,同一个设备可能会触发多次
def detection_callback(device, advertisement_data):
    print(device.address, "RSSI:", device.rssi, advertisement_data)
    if(device.address == "54:B7:E5:79:F4:49"):
        print("find smartpen")
async def run():
    scanner = BleakScanner()
    scanner.register_detection_callback(detection_callback)
    await scanner.start()
    await asyncio.sleep(1.0)
    await scanner.stop()
    # 最终扫描出来的设备
    print("--------------------")
    for d in scanner.discovered_devices:
        print(d)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())

扫描结果

例二

也可以直接使用discover,将扫描5秒,生成设备列表

1
2
3
4
5
6
7
8
9
10
11


import asyncio
from bleak import discover

async def run():
devices = await BleakScanner.discover()
for d in devices:
print(d)

loop = asyncio.get_event_loop()loop.run_until_complete(run())

连接

可以直接连接

1
async with BleakClient(mac_addr) as client:

也可以先扫描再连接,都是通过设备MAC地址。

1
2
3
4
    device = await BleakScanner.find_device_by_address(ble_addresstimeout=20.0)
    if not device:
        raise BleakError(f"A device with address {ble_address} could not be found.")
    async with BleakClient(device) as client:

例一

下列是先扫描再连接的完整示例代码,输出服务号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
"""
Connect by BLEDevice
"""

import asyncio
import platform
import sys

from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError


ADDRESS = (
"54:B7:E5:79:F4:49"
if platform.system() != "Darwin"
else "0000fff0-0000-1000-8000-00805f9b34fb"
)
if len(sys.argv) == 2:
ADDRESS = sys.argv[1]


async def print_services(ble_address: str):
device = await BleakScanner.find_device_by_address(ble_address, timeout=20.0)
if not device:
raise BleakError(f"A device with address {ble_address} could not be found.")
async with BleakClient(device) as client:
svcs = await client.get_services()
print("Services:")
# 这里将打印该设备的所以服务
for service in svcs:
print(service)

loop = asyncio.get_event_loop()
loop.run_until_complete(print_services(ADDRESS))

例二

该代码将输出全部服务和特征

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

"""
Service Explorer
----------------
An example showing how to access and print out the services, characteristics and
descriptors of a connected GATT server.
Created on 2019-03-25 by hbldh <[email protected]>
"""
import sys
import platform
import asyncio
import logging
from bleak import BleakClient
ADDRESS = (
    "54:B7:E5:79:F4:49"
    if platform.system() != "Darwin"
    else "0000fff0-0000-1000-8000-00805f9b34fb"
)
if len(sys.argv) == 2:
    ADDRESS = sys.argv[1]
async def run(address, debug=False):
    log = logging.getLogger(__name__)
    if debug:
        import sys
        log.setLevel(logging.DEBUG)
        h = logging.StreamHandler(sys.stdout)
        h.setLevel(logging.DEBUG)
        log.addHandler(h)
    async with BleakClient(address) as client:
        log.info(f"Connected: {client.is_connected}")
        for service in client.services:
            log.info(f"[Service] {service}")
            for char in service.characteristics:
                if "read" in char.properties:
                    try:
                        value = bytes(await client.read_gatt_char(char.uuid))
                        log.info(
                            f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
                        )
                    except Exception as e:
                        log.error(
                            f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {e}"
                        )
                else:
                    value = None
                    log.info(
                        f"\t[Characteristic] {char} ({','.join(char.properties)}), Value: {value}"
                    )
                for descriptor in char.descriptors:
                    try:
                        value = bytes(
                            await client.read_gatt_descriptor(descriptor.handle)
                        )
                        log.info(f"\t\t[Descriptor] {descriptor}) | Value: {value}")
                    except Exception as e:
                        log.error(f"\t\t[Descriptor] {descriptor}) | Value: {e}")
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    loop.run_until_complete(run(ADDRESS, True))

输出结果如下

通讯

建立指定地址设备,接收数据,30s后断开连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

import time
import asyncio
from loguru import logger
from bleak import BleakClient
address = "54:B7:E5:79:F4:49"
#下面这个是特征
NOTIFICATION_UUID = f"0000{0xFFF1:x}-0000-1000-8000-00805f9b34fb"
async def run_ble_client(address: str, queue: asyncio.Queue):
    async def callback_handler(sender, data):
        await queue.put((time.time(), data))
    async with BleakClient(address) as client:
        logger.info(f"Connected: {client.is_connected}")
        await client.start_notify(NOTIFICATION_UUID, callback_handler)
        await asyncio.sleep(30.0)
        await client.stop_notify(NOTIFICATION_UUID)
        # Send an "exit command to the consumer"
        await queue.put((time.time(), None))
async def run_queue_consumer(queue: asyncio.Queue):
    while True:
        # Use await asyncio.wait_for(queue.get(), timeout=1.0) if you want a timeout for getting data.
        epoch, data = await queue.get()
        if data is None:
            logger.info(
                "Got message from client about disconnection. Exiting consumer loop..."
            )
            break
        else:
            logger.info(f"Received callback data via async queue at {epoch}{data}")
async def main():
    queue = asyncio.Queue()
    client_task = run_ble_client(address, queue)
    consumer_task = run_queue_consumer(queue)
    await asyncio.gather(client_task, consumer_task)
    logger.info("Main method done.")
if __name__ == "__main__":
    asyncio.run(main())


使用pygatt

pygatt包说明
代码仓库

扫描示例

1
2
3
4
5
6
7
8
9
10
import pygatt
adapter = pygatt.BGAPIBackend()
try:
    adapter.start()
    print("===== adapter.scan() =====")
    devices = adapter.scan()
    for dev in devices:
        print("address: %s, name: %s ",dev['address'], dev['name'])
finally:
    adapter.stop()

如果报下列错误,可以需要设置下端口

1
BGAPIError: Unable to auto-detect BLED112 serial port

例如

1
adapter = pygatt.BGAPIBackend(serial_port='COM9')

补充说明下如何在WIN10上查看端口暂用

  • 命令行输入set devmgr_show_nonpresent_devices=1
  • 命令行输入devmgmt.msc 启动设备管理器
  • 点击查看->显示隐藏设备
    下面就多出一个端口(COM和LPT)出来了。
    如果经过上列方式仍然无法运行,则应该是蓝牙适配器问题,readme所述可以选择BLED112,这东西百多块,我选择用这十几块的适配插到linux上使用。

如果在linux上使用pygatt,只需要修改

1
adapter = pygatt.GATTToolBackend()

通讯

下列示例使用的是linux接口,GATTToolBackend获取蓝牙设备,subscribe绑定数据接收回调函数,char_write向某个特征发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pygatt

adapter = pygatt.GATTToolBackend(search_window_size=2048)
try:
            adapter.start()
            device = adapter.connect(self.mac)
            device.subscribe(uuid_1,
                         callback=dataCallback,
                         indication=True)   
        except:
            print("连接失败")

def dataCallback(handle,value):
str = "rcv:" + format(value.hex())
print(str)

def sendCmd(str):
device.char_write(sendUUID,bytearray(binascii.unhexlify(str.encode("utf-8"))))

bluepy

github源地址
这东西只能在linux上玩,安装

1
2
sudo apt-get install python-pip libglib2.0-dev
sudo pip install bluepy

扫描示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

from bluepy.btle import Scanner, DefaultDelegate
class ScanDelegate(DefaultDelegate):
    def __init__(self):
        DefaultDelegate.__init__(self)
    def handleDiscovery(selfdevisNewDevisNewData):
        if isNewDev:
            print("Discovered device", dev.addr)
        elif isNewData:
            print("Received new data from", dev.addr)
scanner = Scanner().withDelegate(ScanDelegate())
devices = scanner.scan(10.0)
for dev in devices:
    print("Device %s (%s), RSSI=%d dB" % (dev.addr, dev.addrType, dev.rssi))
    for (adtype, desc, value) in dev.getScanData():
        print("  %s = %s" % (desc, value))

这个没去继续了解,之后用到再补充

蓝牙调试工具

首先源码都在python-tool这个仓库中,/ble/pygatt中是这个蓝牙调试工具,效果如图。

目前:

  • 能够监听两个UUID的数据
  • 能够对一个UUID发送数据
  • 能够发现设备的服务
  • 不能够SCAN
  • 不能在设备断开连接时提示

python的低功耗蓝牙工具
https://blog.kala.love/posts/607f3f/
作者
久远·卡拉
发布于
2021年9月9日
许可协议