Files
POC-to-ModbusTCP/test.py

288 lines
12 KiB
Python

#!/usr/bin/env python
# ---------------------------------------------------------------------------#
# import the various server implementations
# ---------------------------------------------------------------------------#
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
# ---------------------------------------------------------------------------#
# import python packages
# ---------------------------------------------------------------------------#
from pymongo import MongoClient
import pytest
import struct
# ---------------------------------------------------------------------------#
# configure the client logging
# ---------------------------------------------------------------------------#
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)
# ---------------------------------------------------------------------------#
# Modbus Client Setup
# ---------------------------------------------------------------------------#
modbus_client = ModbusClient('localhost', port=5020)
# ---------------------------------------------------------------------------#
# Mongo DB Setup
# ---------------------------------------------------------------------------#
mongo_client = MongoClient()
mongo_db = mongo_client.tag_data
mongo_tags = mongo_db.tag_vals
def close_enough(a, b):
return abs(a - b) < 0.001
def float_to_bytes(float_val):
'''
Converts a float to little-endian bytes
'''
packed_string = struct.pack('f', float_val)
unpacked_list = list(struct.unpack('HH', packed_string))
return unpacked_list
def integer_to_byte(integer_val):
'''
Converts an integer to its byte
'''
packed_string = struct.pack('h', integer_val)
unpacked = list(struct.unpack('H', packed_string))
return unpacked
def lebyte_to_float(word_list):
'''
Converts list of little-endian bytes to float
'''
packed_string = struct.pack("HH", *word_list)
unpacked_float = struct.unpack("f", packed_string)[0]
return unpacked_float
def lebyte_to_integer(word_list):
'''
Converts list(size = 1) of little-endian bytes to Integer
'''
try:
packed_string = struct.pack("H", *word_list)
unpacked_int = struct.unpack("h", packed_string)[0]
except Exception as e:
print("Unable to convert {} to integer".format(word_list))
return False
return unpacked_int
# ---------------------------------------------------------------------------#
# specify slave to query
# ---------------------------------------------------------------------------#
class TestModbusDatabaseValues:
def test_read_holding_registers(self):
modbus_client.connect()
tag_in_db = mongo_tags.find({"register_type": 'hr'})
for db_tag in tag_in_db:
try:
database_value = db_tag['val']
except KeyError:
print("No value in DB for {}".format(db_tag['tag_name']))
reg = []
try:
if db_tag['tag_type'][-3:] == 'INT':
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 1).registers
elif db_tag['tag_type'] == 'REAL':
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 2).registers
# print(reg)
except AttributeError:
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
continue
if reg:
if len(reg) == 2:
# r = reg.reverse()
modbus_value = lebyte_to_float(reg)
else:
modbus_value = lebyte_to_integer(reg)
# print("Modbus Actual: {}".format(modbus_actual_value))
# print("Database Actual: {}".format(database_actual_value))
# print("Database Calc: {}".format(database_calc_value))
if not modbus_value == database_value:
print("---- {}: {} ----".format(db_tag['tag_name'], db_tag['register_number']))
print(reg)
print("Modbus Scaled: {}".format(modbus_value))
print("Database Scaled: {}".format(database_value))
assert modbus_value == database_value
modbus_client.close()
def test_read_input_registers(self):
modbus_client.connect()
tag_in_db = mongo_tags.find({"register_type": 'ir'})
for db_tag in tag_in_db:
try:
database_value = db_tag['val']
except KeyError:
print("No value in DB for {}".format(db_tag['tag_name']))
reg = []
try:
if db_tag['tag_type'][-3:] == 'INT':
reg = modbus_client.read_input_registers(db_tag['register_number'] - 1, 1).registers
elif db_tag['tag_type'] == 'REAL':
reg = modbus_client.read_input_registers(db_tag['register_number'] - 1, 2).registers
elif db_tag['tag_type'] == 'ARRAY':
continue
# print(reg)
except AttributeError:
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
continue
if reg:
if len(reg) > 0:
if len(reg) == 2:
# r = reg.reverse()
modbus_value = lebyte_to_float(reg)
elif len(reg) == 1:
modbus_value = lebyte_to_integer(reg)
# print("Modbus Actual: {}".format(modbus_actual_value))
# print("Database Actual: {}".format(database_actual_value))
# print("Database Calc: {}".format(database_calc_value))
if not modbus_value == database_value:
print("---- {}: {} ----".format(db_tag['tag_name'], db_tag['register_number']))
print(reg)
print("Modbus Scaled: {}".format(modbus_value))
print("Database Scaled: {}".format(database_value))
assert modbus_value == database_value
modbus_client.close()
def test_read_arrays(self):
modbus_client.connect()
tag_in_db = mongo_tags.find({"register_type": 'ir'})
for db_tag in tag_in_db:
try:
database_value = db_tag['val']
except KeyError:
print("No value in DB for {}".format(db_tag['tag_name']))
reg = []
array_values = []
try:
if db_tag['tag_type'] == 'ARRAY':
array_regs = []
db_vals = db_tag['val']
reg_len = db_tag['arr_len'] * 2
get_len = 125
num_gets = int(reg_len) / get_len
last_get = reg_len % get_len
for x in range(0, num_gets):
inp_register = (int(db_tag['register_number']) - 1) + (get_len * x)
read_registers = modbus_client.read_input_registers(inp_register, get_len).registers
array_regs = array_regs + read_registers
if last_get > 0:
array_regs = array_regs + modbus_client.read_input_registers((db_tag['register_number'] - 1) + (get_len * num_gets), last_get).registers
for i in range(0, reg_len, 2):
array_values.append(lebyte_to_float([array_regs[i], array_regs[i + 1]]))
else:
continue
except AttributeError:
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
continue
if not array_values == db_vals:
if len(array_values) > 0:
for i in range(0, len(array_values)):
print("{} = {} - {}".format(i, array_values[i], db_vals[i]))
assert array_values == db_vals
modbus_client.close()
def test_read_coils(self):
modbus_client.connect()
tag_in_db = mongo_tags.find({"register_type": 'co'})
for db_tag in tag_in_db:
try:
database_value = db_tag['val']
except KeyError:
print("No value in DB for {}".format(db_tag['tag_name']))
try:
reg = modbus_client.read_coils(db_tag['register_number'] - 1, 1).bits
if not database_value == reg[0]:
print("{} =/= {}".format(database_value, reg[0]))
assert database_value == reg[0]
except AttributeError:
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
continue
def test_read_digital_inputs(self):
modbus_client.connect()
tag_in_db = mongo_tags.find({"register_type": 'di'})
for db_tag in tag_in_db:
try:
database_value = db_tag['val']
except KeyError:
print("No value in DB for {}".format(db_tag['tag_name']))
try:
reg = modbus_client.read_discrete_inputs(db_tag['register_number'] - 1, 1).bits
if not database_value == reg[0]:
print("{} =/= {}".format(database_value, reg[0]))
assert database_value == reg[0]
except AttributeError:
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
continue
def test_write_coils(self):
modbus_client.connect()
tag_in_db = mongo_tags.find({"register_type": 'co'})
for db_tag in tag_in_db:
db_val_before = db_tag['val']
if db_val_before == 0:
write_val = 1
else:
write_val = 0
modbus_client.write_coil(db_tag['register_number'] - 1, write_val)
reg = modbus_client.read_coils(db_tag['register_number'] - 1, 1).bits
assert write_val == reg[0]
modbus_client.write_coil(db_tag['register_number'] - 1, db_val_before)
reg = modbus_client.read_coils(db_tag['register_number'] - 1, 1).bits
assert db_val_before == reg[0]
def test_write_coils(self):
modbus_client.connect()
tag_in_db = mongo_tags.find({"register_type": 'hr'})
for db_tag in tag_in_db:
db_val_before = db_tag['val']
if db_tag['tag_type'] == "REAL":
write_val = db_val_before + 1.0
db_val_before_bytes = float_to_bytes(db_val_before)
write_val_bytes = float_to_bytes(write_val)
modbus_client.write_registers(db_tag['register_number'] - 1, write_val_bytes)
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 2).registers
read_val = lebyte_to_float(reg)
assert close_enough(write_val, read_val)
modbus_client.write_registers(db_tag['register_number'] - 1, db_val_before_bytes)
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 2).registers
read_val = lebyte_to_float(reg)
assert close_enough(db_val_before, read_val)
else:
write_val = db_val_before + 1
db_val_before_bytes = integer_to_byte(db_val_before)
write_val_bytes = integer_to_byte(write_val)
modbus_client.write_registers(db_tag['register_number'] - 1, write_val_bytes)
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 1).registers
read_val = lebyte_to_integer(reg)
assert write_val == read_val
modbus_client.write_registers(db_tag['register_number'] - 1, db_val_before_bytes)
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 1).registers
read_val = lebyte_to_integer(reg)
assert db_val_before == read_val