Протокол Modbus предполагает наличие двух типов устройств: одного ведущего (master) и ведомого (slave). Две самые популярные реализации протокола - это Modbus RTU (предназначен для связи через последовательные интерфейсы), Modbus TCP (предназначен для связи через сетевые интерфейсы). В любом устройстве регистры Modbus представляют собой массивы uint16_t (целое число 16 бит неотрицательное), в последовательности которых могут быть настройки и другие параметры, доступные для чтения/записи.
Собственную реализацию протокола Modbus для последовательной передачи данных можно реализовать с помощью любого фреймворка для работы с последовательными подключениями (для python - pyserial). Сетевая реализация протокола может быть построена на веб-сокетах (для python - websockets). Для этого необходимо детально изучить спецификацию протоколов Modbus.
Готовые модули с реализацией протокола на Python:
2. Официальное руководство по использованию библиотеки MinimalModbus (последовательный обмен).
Ниже рассмотрены примеры работы с ведомыми устройствами (опрос значений регистров с фиксированным временным интервалом). Ведущее устройство - на котором выполняется данный код.
В данном листинге представлено подключение модулей, задание основных параметров подключения устройства (к какому порту подключен Ваш прибор можно посмотреть в диспетчере задач в ОС Microsoft Windows, а в ОС Linux выполнив в терминале команду: ls /dev/
), пример периодического чтения значений регистров Input прибора. В представленном примере данные показаны в формате float32, порядок big endian). Для подключения необходимо задать порт подключения и параметры устройства, которые указываются в руководстве по эксплуатации.
from pymodbus.client import ModbusSerialClient
from datetime import datetime
import time
import struct
# Define object for COM port assignment
com_port = "COM7" # Set up COM port assigned on PC
slave_id = 1 # Define slave node number
# Function to get current time
def current_time():
return datetime.now().isoformat()
# Function to convert two 16-bit registers to a float, trying both endianness
def registers_to_float(register1, register2):
try:
# Try little-endian byte order
packed_le = struct.pack('<HH', register1, register2)
value_le = struct.unpack('<f', packed_le)[0]
# Try big-endian byte order
packed_be = struct.pack('>HH', register1, register2)
value_be = struct.unpack('>f', packed_be)[0]
# Choose the most plausible value
if abs(value_le) < 1e10 and not struct.unpack('<I', packed_le)[0] == 0x7FC00000:
return value_le
elif abs(value_be) < 1e10 and not struct.unpack('>I', packed_be)[0] == 0x7FC00000:
return value_be
else:
print("Both float interpretations seem invalid")
return None
except Exception as e:
print(f"Error interpreting float: {e}")
return None
# Initialize Modbus client
client = ModbusSerialClient(
port=com_port,
timeout=2,
stopbits=1,
bytesize=8,
parity='N',
baudrate=9600
)
try:
# Attempt to connect to the Modbus client
if not client.connect():
print(f"Failed to connect to {com_port}")
else:
print(f"Connected to {com_port}")
# Read holding registers
holding_regs = client.read_holding_registers(0, count=7)
input_regs = client.read_input_registers(0, count=6)
print("Input Registers:", input_regs.registers)
print("Holding Registers:", holding_regs.registers)
# Interpret input registers as floats
current1 = registers_to_float(input_regs.registers[0], input_regs.registers[1])
current2 = registers_to_float(input_regs.registers[2], input_regs.registers[3])
current3 = registers_to_float(input_regs.registers[4], input_regs.registers[5])
# Print the currents with 4 decimal places if they are valid
if current1 is not None:
print(f"Current 1: {current1:.6f}")
if current2 is not None:
print(f"Current 2: {current2:.6f}")
if current3 is not None:
print(f"Current 3: {current3:.6f}")
# Sleep for a second
time.sleep(1)
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Ensure the client connection is closed
client.close()
print("Connection closed.")
В данном листинге представлено подключение модулей, задание основных параметров подключения устройства (найти устройство в сети можно с помощью программы Advanced IP Scanner или в веб-приложении Вашего роутера), пример периодического чтения значений регистров Input прибора. В представленном примере данные показаны в формате float32, порядок big endian). Для подключения необходимо задать IP-адрес устройства или имя хоста (hostname), указав стандартный порт для Modbus TCP (502).
from pymodbus.client import ModbusTcpClient
from datetime import datetime
import time
import struct
# Define the Modbus TCP server IP address
ip_address = "192.168.0.190"
port = 502 # Default Modbus TCP port
# Function to get current time
def current_time():
return datetime.now().isoformat()
# Function to convert two 16-bit registers to a float, trying both endianness
def registers_to_float(register1, register2):
try:
# Try little-endian byte order
packed_le = struct.pack('<HH', register1, register2)
value_le = struct.unpack('<f', packed_le)[0]
# Try big-endian byte order
packed_be = struct.pack('>HH', register1, register2)
value_be = struct.unpack('>f', packed_be)[0]
# Choose the most plausible value
if abs(value_le) < 1e10 and not struct.unpack('<I', packed_le)[0] == 0x7FC00000:
return value_le
elif abs(value_be) < 1e10 and not struct.unpack('>I', packed_be)[0] == 0x7FC00000:
return value_be
else:
print("Both float interpretations seem invalid")
return None
except Exception as e:
print(f"Error interpreting float: {e}")
return None
# Initialize Modbus TCP client
client = ModbusTcpClient(ip_address, port=port)
try:
# Attempt to connect to the Modbus client
if not client.connect():
print(f"Failed to connect to {ip_address}")
else:
print(f"Connected to {ip_address}")
while True:
# Read input registers
input_regs = client.read_input_registers(0, count=6)
print("Input Registers:", input_regs.registers)
# Interpret input registers as floats
current1 = registers_to_float(input_regs.registers[0], input_regs.registers[1])
current2 = registers_to_float(input_regs.registers[2], input_regs.registers[3])
current3 = registers_to_float(input_regs.registers[4], input_regs.registers[5])
# Print the currents with 6 decimal places if they are valid
if current1 is not None:
print(f"Current 1: {current1:.6f}")
if current2 is not None:
print(f"Current 2: {current2:.6f}")
if current3 is not None:
print(f"Current 3: {current3:.6f}")
# Sleep for a second
time.sleep(1)
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Ensure the client connection is closed
client.close()
print("Connection closed.")