编程笔记

编程笔记

【python】根据RS485通讯协议,读写数据--完整代码(嵌入式软件自动化测试)
2025-01-29

目录

  • 一、通讯连接工具
  • 二、通讯协议
    • 2.1 数据格式
    • 2.2 协议指令
  • 三、代码实现RS485通讯读写
    • 3.1 完整代码
    • 3.2 串口通讯打开
    • 3.3 发送数据包
    • 3.4 接收数据包
    • 3.5 参数读取
    • 3.6 串口通讯关闭
    • 3.7 运行结果

一、通讯连接工具

1.USB转RS485通讯转换器1个
2.PCB线路板1个
3.转换器的AB口与线路板AB口相连,A连接A,B连接B

二、通讯协议

2.1 数据格式


根据通讯协议文档,可以看出只需要关注【命令/状态】和【返回数据/参数设置】两列,其他列固定。

2.2 协议指令

1、举例:查询指令

(1)根据命令/状态表,查询指令为0x02,即D3位为0x02
(2)发送查询指令:FE 0C A5 02 00 00 00 00 00 00 00 00 00 00 DA
(3)返回数据:D4-D6为当前时间的时分秒,D7D8为用水量,D9D10为用水流速。

解释高低字节:
高字节占据高位(左侧),低字节占据低位(右侧)。
例如:高字节是 b’\xfe’,低字节是 b’\x2e’,它们组合成一个 16 位的数值de2e,转换十进制为56878。

计算方法
将高字节左移 8 位: 高字节 << 8。
将低字节加到高字节的结果中: (高字节 << 8) | 低字节。

协议指令测试:

三、代码实现RS485通讯读写

3.1 完整代码

先附上完整代码:

import serial
import binascii
# 打开串口COM5, 9600波特率, 8数据位, 0停止位
ser = serial.Serial(port='COM5', baudrate=9600, bytesize=8, parity='N', stopbits=1,  # 1.根据实际通讯协议要求,实例化串口及各项参数
                 dsrdtr=False, rtscts=False, xonxoff=False, timeout=2)

# 检查串口是否已经打开
if ser.is_open:
    print('串口已成功打开.')

# 发送的数据包
data_to_send1 = 'FE 0C A5 02 00 00 00 00 00 00 00 00 00 00 DA' #16进制数据
data_to_send1=data_to_send1.replace(' ', '')  # 去除空格
data_to_send=binascii.a2b_hex(data_to_send1)  # 转换为16进制字符串

# 发送16进制字符串
write_len=ser.write(data_to_send)

# 读取数据包, 分字节和功能读取, 数据位为2字节
receive=True
ASSCII_receive_list=[]
while receive:
    receives=ser.read(1) # serial.read() 方法从串口接收到数据是byte字节型,转换要参考 ASCII 与16进制对照表
    if receives == []:
        # 没接受有效数据时,会一直读取到空列表,等于空列表时,利用continue语句再次重新读取
        continue

    elif receives != []:  # 表示当接收数据不等于空列表[]时,将数据存储在列表
        ASSCII_receive_list.append(receives)
        if len(ASSCII_receive_list)==write_len: #读取结束
            print(f'接收到的数据列表(ASSCII):{ASSCII_receive_list}')  # 打印接收到的16进制列表
            hex_receive_list=[]
            for byte_value in ASSCII_receive_list: # byte字节型转换为对应的16进制值
                # hex_receive_list.append(f"{byte[0]:02x}") #方式一:将每个字节单独转换为 16 进制时
                hex_receive_list.append(byte_value.hex().zfill(2)) #方式二:去掉 '0x' 并补齐两位
            print(f'接收到的数据列表(16进制):{hex_receive_list}') #打印接收到的16进制列表
            break

#读取列表中参数(十六进制)
hour=hex_receive_list[4] #读取小时字节
minute=hex_receive_list[5] #读取分钟字节
seconds=hex_receive_list[6] #读取秒字节

high_use_water=hex_receive_list[8] #读取用水量高位字节
low_use_water=hex_receive_list[7] #读取用水量低位字节

high_water_velocity=hex_receive_list[10] #读取水流速高位字节
low_water_velocity=hex_receive_list[9] #读取水流速低位字节


#十六进制字符串转换为十进制
hour_value=int(hour,16)#将字符串按照基数16解析为十进制
minute_value=int(minute,16)
seconds_value=int(seconds,16)

high_use_water_value=int(high_use_water,16)
low_use_water_value=int(low_use_water,16)

high_water_velocity_value=int(high_water_velocity,16)
low_water_velocity_value=int(low_water_velocity,16)

#组合高低字节为一个十进制数
'''
解释高低字节
高字节占据高位(左侧),低字节占据低位(右侧)。
例如:高字节是 b'\xfe',低字节是 b'\x2e',它们组合成一个 16 位的数值de2e,转换十进制为56878。

计算方法
将高字节左移 8 位: 高字节 << 8。
将低字节加到高字节的结果中: (高字节 << 8) | 低字节。
'''
total_use_water_value=(high_use_water_value<<8)|low_use_water_value #总用水量
water_velocity_value=(high_water_velocity_value<<8)|low_water_velocity_value #水流速

# 计算实际用水量的值(根据通讯协议实际定义计算)
total_use_water = total_use_water_value / 10.0

#计算实际水流速的值(根据通讯协议实际定义计算)
if water_velocity_value==0:
    water_velocity=0
else:
    water_velocity = int(str(water_velocity_value)[1:])/10


# 打印接收到的数据
print(f'当前时间:{hour_value}:{minute_value}:{seconds_value}')
print(f'用水量:{total_use_water}')
print(f'水流速:{water_velocity}')

# 关闭串口
ser.close()

3.2 串口通讯打开


通讯协议对接口定义:9600波特率、8个数据位、无校验位、1个停止位。
代码如下:

# 打开串口COM5, 9600波特率, 8数据位, 0停止位
ser = serial.Serial(port='COM5', baudrate=9600, bytesize=8, parity='N', stopbits=1,  # 1.根据实际通讯协议要求,实例化串口及各项参数
                 dsrdtr=False, rtscts=False, xonxoff=False, timeout=2)

# 检查串口是否已经打开
if ser.is_open:
    print('串口已成功打开.')

3.3 发送数据包

对发送数据进行处理,转换为16进制字符串发送,代码如下:

# 发送的数据包
data_to_send1 = 'FE 0C A5 02 00 00 00 00 00 00 00 00 00 00 DA' #16进制数据
data_to_send1=data_to_send1.replace(' ', '')  # 去除空格
data_to_send=binascii.a2b_hex(data_to_send1)  # 转换为16进制字符串

# 发送16进制字符串
write_len=ser.write(data_to_send)

3.4 接收数据包

注意ser.read()方法串口接收到数据是byte字节型,转换可以参考 ASCII 与16进制对照表,所以要把byte字节型转换为对应的16进制值
代码如下:

# 读取数据包, 分字节和功能读取, 数据位为2字节
receive=True
ASSCII_receive_list=[]
while receive:
    receives=ser.read(1) # serial.read() 方法从串口接收到数据是byte字节型,转换要参考 ASCII 与16进制对照表
    if receives == []:
        # 没接受有效数据时,会一直读取到空列表,等于空列表时,利用continue语句再次重新读取
        continue

    elif receives != []:  # 表示当接收数据不等于空列表[]时,将数据存储在列表
        ASSCII_receive_list.append(receives)
        if len(ASSCII_receive_list)==write_len: #读取结束
            print(f'接收到的数据列表(ASSCII):{ASSCII_receive_list}')  # 打印接收到的16进制列表
            hex_receive_list=[]
            for byte_value in ASSCII_receive_list: # byte字节型转换为对应的16进制值
                # hex_receive_list.append(f"{byte[0]:02x}") #方式一:将每个字节单独转换为 16 进制时
                hex_receive_list.append(byte_value.hex().zfill(2)) #方式二:去掉 '0x' 并补齐两位
            print(f'接收到的数据列表(16进制):{hex_receive_list}') #打印接收到的16进制列表
            break

3.5 参数读取

参照通讯协议文档,从接收的数据包列表读取对应的参数,计算后输出
代码如下:

#读取列表中参数(十六进制)
hour=hex_receive_list[4] #读取小时字节
minute=hex_receive_list[5] #读取分钟字节
seconds=hex_receive_list[6] #读取秒字节

high_use_water=hex_receive_list[8] #读取用水量高位字节
low_use_water=hex_receive_list[7] #读取用水量低位字节

high_water_velocity=hex_receive_list[10] #读取水流速高位字节
low_water_velocity=hex_receive_list[9] #读取水流速低位字节


#十六进制字符串转换为十进制
hour_value=int(hour,16)#将字符串按照基数16解析为十进制
minute_value=int(minute,16)
seconds_value=int(seconds,16)

high_use_water_value=int(high_use_water,16)
low_use_water_value=int(low_use_water,16)

high_water_velocity_value=int(high_water_velocity,16)
low_water_velocity_value=int(low_water_velocity,16)

#组合高低字节为一个十进制数
'''
解释高低字节
高字节占据高位(左侧),低字节占据低位(右侧)。
例如:高字节是 b'\xfe',低字节是 b'\x2e',它们组合成一个 16 位的数值de2e,转换十进制为56878。

计算方法
将高字节左移 8 位: 高字节 << 8。
将低字节加到高字节的结果中: (高字节 << 8) | 低字节。
'''
total_use_water_value=(high_use_water_value<<8)|low_use_water_value #总用水量
water_velocity_value=(high_water_velocity_value<<8)|low_water_velocity_value #水流速

# 计算实际用水量的值(根据通讯协议实际定义计算)
total_use_water = total_use_water_value / 10.0

#计算实际水流速的值(根据通讯协议实际定义计算)
if water_velocity_value==0:
    water_velocity=0
else:
    water_velocity = int(str(water_velocity_value)[1:])/10

# 打印接收到的数据
print(f'当前时间:{hour_value}:{minute_value}:{seconds_value}')
print(f'用水量:{total_use_water}')
print(f'水流速:{water_velocity}')

3.6 串口通讯关闭

代码如下:

ser.close()

3.7 运行结果


ASSCII转16进制在线转换:链接