【python】根据RS485通讯协议,读写数据--完整代码(嵌入式软件自动化测试)
2025-01-29
目录
- 一、通讯连接工具
- 二、通讯协议
- 2.1 数据格式
- 2.2 协议指令
- 三、代码实现RS485通讯读写
- 3.1 完整代码
- 3.2 串口通讯打开
- 3.3 发送数据包
- 3.4 接收数据包
- 3.5 参数读取
- 3.6 串口通讯关闭
- 3.7 运行结果
一、通讯连接工具
1.USB转RS485通讯转换器1个
2.PCB线路板1个
3.转换器的AB口与线路板AB口相连,A连接A,B连接B
二、通讯协议
2.1 数据格式
根据通讯协议文档,可以看出只需要关注【命令/状态】和【返回数据/参数设置】两列,其他列固定。
2.2 协议指令
1、举例:查询指令
(1)根据命令/状态表,查询指令为0x02,即D3位为0x02
(2)发送查询指令:FE 0C A5 02 00 00 00 00 00 00 00 00 00 00 DA
(3)返回数据:D4-D6为当前时间的时分秒,D7D8为用水量,D9D10为用水流速。
解释高低字节:
高字节占据高位(左侧),低字节占据低位(右侧)。
例如:高字节是 b’\xfe’,低字节是 b’\x2e’,它们组合成一个 16 位的数值de2e,转换十进制为56878。
计算方法
将高字节左移 8 位: 高字节 << 8。
将低字节加到高字节的结果中: (高字节 << 8) | 低字节。
协议指令测试:
三、代码实现RS485通讯读写
3.1 完整代码
先附上完整代码:
import serial
import binascii
# 打开串口COM5, 9600波特率, 8数据位, 0停止位
ser = serial.Serial(port='COM5', baudrate=9600, bytesize=8, parity='N', stopbits=1, # 1.根据实际通讯协议要求,实例化串口及各项参数
dsrdtr=False, rtscts=False, xonxoff=False, timeout=2)
# 检查串口是否已经打开
if ser.is_open:
print('串口已成功打开.')
# 发送的数据包
data_to_send1 = 'FE 0C A5 02 00 00 00 00 00 00 00 00 00 00 DA' #16进制数据
data_to_send1=data_to_send1.replace(' ', '') # 去除空格
data_to_send=binascii.a2b_hex(data_to_send1) # 转换为16进制字符串
# 发送16进制字符串
write_len=ser.write(data_to_send)
# 读取数据包, 分字节和功能读取, 数据位为2字节
receive=True
ASSCII_receive_list=[]
while receive:
receives=ser.read(1) # serial.read() 方法从串口接收到数据是byte字节型,转换要参考 ASCII 与16进制对照表
if receives == []:
# 没接受有效数据时,会一直读取到空列表,等于空列表时,利用continue语句再次重新读取
continue
elif receives != []: # 表示当接收数据不等于空列表[]时,将数据存储在列表
ASSCII_receive_list.append(receives)
if len(ASSCII_receive_list)==write_len: #读取结束
print(f'接收到的数据列表(ASSCII):{ASSCII_receive_list}') # 打印接收到的16进制列表
hex_receive_list=[]
for byte_value in ASSCII_receive_list: # byte字节型转换为对应的16进制值
# hex_receive_list.append(f"{byte[0]:02x}") #方式一:将每个字节单独转换为 16 进制时
hex_receive_list.append(byte_value.hex().zfill(2)) #方式二:去掉 '0x' 并补齐两位
print(f'接收到的数据列表(16进制):{hex_receive_list}') #打印接收到的16进制列表
break
#读取列表中参数(十六进制)
hour=hex_receive_list[4] #读取小时字节
minute=hex_receive_list[5] #读取分钟字节
seconds=hex_receive_list[6] #读取秒字节
high_use_water=hex_receive_list[8] #读取用水量高位字节
low_use_water=hex_receive_list[7] #读取用水量低位字节
high_water_velocity=hex_receive_list[10] #读取水流速高位字节
low_water_velocity=hex_receive_list[9] #读取水流速低位字节
#十六进制字符串转换为十进制
hour_value=int(hour,16)#将字符串按照基数16解析为十进制
minute_value=int(minute,16)
seconds_value=int(seconds,16)
high_use_water_value=int(high_use_water,16)
low_use_water_value=int(low_use_water,16)
high_water_velocity_value=int(high_water_velocity,16)
low_water_velocity_value=int(low_water_velocity,16)
#组合高低字节为一个十进制数
'''
解释高低字节
高字节占据高位(左侧),低字节占据低位(右侧)。
例如:高字节是 b'\xfe',低字节是 b'\x2e',它们组合成一个 16 位的数值de2e,转换十进制为56878。
计算方法
将高字节左移 8 位: 高字节 << 8。
将低字节加到高字节的结果中: (高字节 << 8) | 低字节。
'''
total_use_water_value=(high_use_water_value<<8)|low_use_water_value #总用水量
water_velocity_value=(high_water_velocity_value<<8)|low_water_velocity_value #水流速
# 计算实际用水量的值(根据通讯协议实际定义计算)
total_use_water = total_use_water_value / 10.0
#计算实际水流速的值(根据通讯协议实际定义计算)
if water_velocity_value==0:
water_velocity=0
else:
water_velocity = int(str(water_velocity_value)[1:])/10
# 打印接收到的数据
print(f'当前时间:{hour_value}:{minute_value}:{seconds_value}')
print(f'用水量:{total_use_water}')
print(f'水流速:{water_velocity}')
# 关闭串口
ser.close()
3.2 串口通讯打开
通讯协议对接口定义:9600波特率、8个数据位、无校验位、1个停止位。
代码如下:
# 打开串口COM5, 9600波特率, 8数据位, 0停止位
ser = serial.Serial(port='COM5', baudrate=9600, bytesize=8, parity='N', stopbits=1, # 1.根据实际通讯协议要求,实例化串口及各项参数
dsrdtr=False, rtscts=False, xonxoff=False, timeout=2)
# 检查串口是否已经打开
if ser.is_open:
print('串口已成功打开.')
3.3 发送数据包
对发送数据进行处理,转换为16进制字符串发送,代码如下:
# 发送的数据包
data_to_send1 = 'FE 0C A5 02 00 00 00 00 00 00 00 00 00 00 DA' #16进制数据
data_to_send1=data_to_send1.replace(' ', '') # 去除空格
data_to_send=binascii.a2b_hex(data_to_send1) # 转换为16进制字符串
# 发送16进制字符串
write_len=ser.write(data_to_send)
3.4 接收数据包
注意ser.read()方法串口接收到数据是byte字节型,转换可以参考 ASCII 与16进制对照表,所以要把byte字节型转换为对应的16进制值
代码如下:
# 读取数据包, 分字节和功能读取, 数据位为2字节
receive=True
ASSCII_receive_list=[]
while receive:
receives=ser.read(1) # serial.read() 方法从串口接收到数据是byte字节型,转换要参考 ASCII 与16进制对照表
if receives == []:
# 没接受有效数据时,会一直读取到空列表,等于空列表时,利用continue语句再次重新读取
continue
elif receives != []: # 表示当接收数据不等于空列表[]时,将数据存储在列表
ASSCII_receive_list.append(receives)
if len(ASSCII_receive_list)==write_len: #读取结束
print(f'接收到的数据列表(ASSCII):{ASSCII_receive_list}') # 打印接收到的16进制列表
hex_receive_list=[]
for byte_value in ASSCII_receive_list: # byte字节型转换为对应的16进制值
# hex_receive_list.append(f"{byte[0]:02x}") #方式一:将每个字节单独转换为 16 进制时
hex_receive_list.append(byte_value.hex().zfill(2)) #方式二:去掉 '0x' 并补齐两位
print(f'接收到的数据列表(16进制):{hex_receive_list}') #打印接收到的16进制列表
break
3.5 参数读取
参照通讯协议文档,从接收的数据包列表读取对应的参数,计算后输出
代码如下:
#读取列表中参数(十六进制)
hour=hex_receive_list[4] #读取小时字节
minute=hex_receive_list[5] #读取分钟字节
seconds=hex_receive_list[6] #读取秒字节
high_use_water=hex_receive_list[8] #读取用水量高位字节
low_use_water=hex_receive_list[7] #读取用水量低位字节
high_water_velocity=hex_receive_list[10] #读取水流速高位字节
low_water_velocity=hex_receive_list[9] #读取水流速低位字节
#十六进制字符串转换为十进制
hour_value=int(hour,16)#将字符串按照基数16解析为十进制
minute_value=int(minute,16)
seconds_value=int(seconds,16)
high_use_water_value=int(high_use_water,16)
low_use_water_value=int(low_use_water,16)
high_water_velocity_value=int(high_water_velocity,16)
low_water_velocity_value=int(low_water_velocity,16)
#组合高低字节为一个十进制数
'''
解释高低字节
高字节占据高位(左侧),低字节占据低位(右侧)。
例如:高字节是 b'\xfe',低字节是 b'\x2e',它们组合成一个 16 位的数值de2e,转换十进制为56878。
计算方法
将高字节左移 8 位: 高字节 << 8。
将低字节加到高字节的结果中: (高字节 << 8) | 低字节。
'''
total_use_water_value=(high_use_water_value<<8)|low_use_water_value #总用水量
water_velocity_value=(high_water_velocity_value<<8)|low_water_velocity_value #水流速
# 计算实际用水量的值(根据通讯协议实际定义计算)
total_use_water = total_use_water_value / 10.0
#计算实际水流速的值(根据通讯协议实际定义计算)
if water_velocity_value==0:
water_velocity=0
else:
water_velocity = int(str(water_velocity_value)[1:])/10
# 打印接收到的数据
print(f'当前时间:{hour_value}:{minute_value}:{seconds_value}')
print(f'用水量:{total_use_water}')
print(f'水流速:{water_velocity}')
3.6 串口通讯关闭
代码如下:
ser.close()
3.7 运行结果
ASSCII转16进制在线转换:链接