288 lines
12 KiB
Python
288 lines
12 KiB
Python
#!/usr/bin/env python
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# import the various server implementations
|
|
# ---------------------------------------------------------------------------#
|
|
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# import python packages
|
|
# ---------------------------------------------------------------------------#
|
|
from pymongo import MongoClient
|
|
import pytest
|
|
import struct
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# configure the client logging
|
|
# ---------------------------------------------------------------------------#
|
|
import logging
|
|
logging.basicConfig()
|
|
log = logging.getLogger()
|
|
log.setLevel(logging.INFO)
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# Modbus Client Setup
|
|
# ---------------------------------------------------------------------------#
|
|
modbus_client = ModbusClient('localhost', port=502)
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# Mongo DB Setup
|
|
# ---------------------------------------------------------------------------#
|
|
mongo_client = MongoClient()
|
|
mongo_db = mongo_client.tag_data
|
|
mongo_tags = mongo_db.tag_vals
|
|
|
|
|
|
def close_enough(a, b):
|
|
return abs(a - b) < 0.001
|
|
|
|
|
|
def float_to_bytes(float_val):
|
|
'''
|
|
Converts a float to little-endian bytes
|
|
'''
|
|
packed_string = struct.pack('f', float_val)
|
|
unpacked_list = list(struct.unpack('HH', packed_string))
|
|
return unpacked_list
|
|
|
|
|
|
def integer_to_byte(integer_val):
|
|
'''
|
|
Converts an integer to its byte
|
|
'''
|
|
packed_string = struct.pack('h', integer_val)
|
|
unpacked = list(struct.unpack('H', packed_string))
|
|
return unpacked
|
|
|
|
|
|
def lebyte_to_float(word_list):
|
|
'''
|
|
Converts list of little-endian bytes to float
|
|
'''
|
|
packed_string = struct.pack("HH", *word_list)
|
|
unpacked_float = struct.unpack("f", packed_string)[0]
|
|
return unpacked_float
|
|
|
|
|
|
def lebyte_to_integer(word_list):
|
|
'''
|
|
Converts list(size = 1) of little-endian bytes to Integer
|
|
'''
|
|
try:
|
|
packed_string = struct.pack("H", *word_list)
|
|
unpacked_int = struct.unpack("h", packed_string)[0]
|
|
except Exception as e:
|
|
print("Unable to convert {} to integer".format(word_list))
|
|
return False
|
|
return unpacked_int
|
|
|
|
|
|
# ---------------------------------------------------------------------------#
|
|
# specify slave to query
|
|
# ---------------------------------------------------------------------------#
|
|
class TestModbusDatabaseValues:
|
|
def test_read_holding_registers(self):
|
|
modbus_client.connect()
|
|
tag_in_db = mongo_tags.find({"register_type": 'hr'})
|
|
for db_tag in tag_in_db:
|
|
try:
|
|
database_value = db_tag['val']
|
|
except KeyError:
|
|
print("No value in DB for {}".format(db_tag['tag_name']))
|
|
reg = []
|
|
try:
|
|
if db_tag['tag_type'][-3:] == 'INT':
|
|
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 1).registers
|
|
elif db_tag['tag_type'] == 'REAL':
|
|
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 2).registers
|
|
# print(reg)
|
|
except AttributeError:
|
|
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
|
|
continue
|
|
if reg:
|
|
if len(reg) == 2:
|
|
# r = reg.reverse()
|
|
modbus_value = lebyte_to_float(reg)
|
|
else:
|
|
modbus_value = lebyte_to_integer(reg)
|
|
|
|
# print("Modbus Actual: {}".format(modbus_actual_value))
|
|
# print("Database Actual: {}".format(database_actual_value))
|
|
# print("Database Calc: {}".format(database_calc_value))
|
|
if not modbus_value == database_value:
|
|
print("---- {}: {} ----".format(db_tag['tag_name'], db_tag['register_number']))
|
|
print(reg)
|
|
print("Modbus Scaled: {}".format(modbus_value))
|
|
print("Database Scaled: {}".format(database_value))
|
|
assert modbus_value == database_value
|
|
modbus_client.close()
|
|
|
|
def test_read_input_registers(self):
|
|
modbus_client.connect()
|
|
tag_in_db = mongo_tags.find({"register_type": 'ir'})
|
|
for db_tag in tag_in_db:
|
|
try:
|
|
database_value = db_tag['val']
|
|
except KeyError:
|
|
print("No value in DB for {}".format(db_tag['tag_name']))
|
|
reg = []
|
|
try:
|
|
if db_tag['tag_type'][-3:] == 'INT':
|
|
reg = modbus_client.read_input_registers(db_tag['register_number'] - 1, 1).registers
|
|
elif db_tag['tag_type'] == 'REAL':
|
|
reg = modbus_client.read_input_registers(db_tag['register_number'] - 1, 2).registers
|
|
elif db_tag['tag_type'] == 'ARRAY':
|
|
continue
|
|
|
|
# print(reg)
|
|
except AttributeError:
|
|
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
|
|
continue
|
|
if reg:
|
|
if len(reg) > 0:
|
|
if len(reg) == 2:
|
|
# r = reg.reverse()
|
|
modbus_value = lebyte_to_float(reg)
|
|
elif len(reg) == 1:
|
|
modbus_value = lebyte_to_integer(reg)
|
|
|
|
# print("Modbus Actual: {}".format(modbus_actual_value))
|
|
# print("Database Actual: {}".format(database_actual_value))
|
|
# print("Database Calc: {}".format(database_calc_value))
|
|
if not modbus_value == database_value:
|
|
print("---- {}: {} ----".format(db_tag['tag_name'], db_tag['register_number']))
|
|
print(reg)
|
|
print("Modbus Scaled: {}".format(modbus_value))
|
|
print("Database Scaled: {}".format(database_value))
|
|
assert modbus_value == database_value
|
|
modbus_client.close()
|
|
|
|
def test_read_arrays(self):
|
|
modbus_client.connect()
|
|
tag_in_db = mongo_tags.find({"register_type": 'ir'})
|
|
for db_tag in tag_in_db:
|
|
try:
|
|
database_value = db_tag['val']
|
|
except KeyError:
|
|
print("No value in DB for {}".format(db_tag['tag_name']))
|
|
reg = []
|
|
array_values = []
|
|
try:
|
|
if db_tag['tag_type'] == 'ARRAY':
|
|
array_regs = []
|
|
db_vals = db_tag['val']
|
|
reg_len = db_tag['arr_len'] * 2
|
|
get_len = 125
|
|
num_gets = int(reg_len) / get_len
|
|
last_get = reg_len % get_len
|
|
for x in range(0, num_gets):
|
|
inp_register = (int(db_tag['register_number']) - 1) + (get_len * x)
|
|
read_registers = modbus_client.read_input_registers(inp_register, get_len).registers
|
|
array_regs = array_regs + read_registers
|
|
if last_get > 0:
|
|
array_regs = array_regs + modbus_client.read_input_registers((db_tag['register_number'] - 1) + (get_len * num_gets), last_get).registers
|
|
|
|
for i in range(0, reg_len, 2):
|
|
array_values.append(lebyte_to_float([array_regs[i], array_regs[i + 1]]))
|
|
else:
|
|
continue
|
|
|
|
except AttributeError:
|
|
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
|
|
continue
|
|
if not array_values == db_vals:
|
|
if len(array_values) > 0:
|
|
for i in range(0, len(array_values)):
|
|
print("{} = {} - {}".format(i, array_values[i], db_vals[i]))
|
|
assert array_values == db_vals
|
|
modbus_client.close()
|
|
|
|
def test_read_coils(self):
|
|
modbus_client.connect()
|
|
tag_in_db = mongo_tags.find({"register_type": 'co'})
|
|
for db_tag in tag_in_db:
|
|
try:
|
|
database_value = db_tag['val']
|
|
except KeyError:
|
|
print("No value in DB for {}".format(db_tag['tag_name']))
|
|
|
|
try:
|
|
reg = modbus_client.read_coils(db_tag['register_number'] - 1, 1).bits
|
|
if not database_value == reg[0]:
|
|
print("{} =/= {}".format(database_value, reg[0]))
|
|
assert database_value == reg[0]
|
|
|
|
except AttributeError:
|
|
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
|
|
continue
|
|
|
|
def test_read_digital_inputs(self):
|
|
modbus_client.connect()
|
|
tag_in_db = mongo_tags.find({"register_type": 'di'})
|
|
for db_tag in tag_in_db:
|
|
try:
|
|
database_value = db_tag['val']
|
|
except KeyError:
|
|
print("No value in DB for {}".format(db_tag['tag_name']))
|
|
|
|
try:
|
|
reg = modbus_client.read_discrete_inputs(db_tag['register_number'] - 1, 1).bits
|
|
if not database_value == reg[0]:
|
|
print("{} =/= {}".format(database_value, reg[0]))
|
|
assert database_value == reg[0]
|
|
|
|
except AttributeError:
|
|
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
|
|
continue
|
|
|
|
def test_write_coils(self):
|
|
modbus_client.connect()
|
|
tag_in_db = mongo_tags.find({"register_type": 'co'})
|
|
for db_tag in tag_in_db:
|
|
db_val_before = db_tag['val']
|
|
if db_val_before == 0:
|
|
write_val = 1
|
|
else:
|
|
write_val = 0
|
|
modbus_client.write_coil(db_tag['register_number'] - 1, write_val)
|
|
reg = modbus_client.read_coils(db_tag['register_number'] - 1, 1).bits
|
|
assert write_val == reg[0]
|
|
|
|
modbus_client.write_coil(db_tag['register_number'] - 1, db_val_before)
|
|
reg = modbus_client.read_coils(db_tag['register_number'] - 1, 1).bits
|
|
assert db_val_before == reg[0]
|
|
|
|
def test_write_coils(self):
|
|
modbus_client.connect()
|
|
tag_in_db = mongo_tags.find({"register_type": 'hr'})
|
|
for db_tag in tag_in_db:
|
|
db_val_before = db_tag['val']
|
|
if db_tag['tag_type'] == "REAL":
|
|
write_val = db_val_before + 1.0
|
|
db_val_before_bytes = float_to_bytes(db_val_before)
|
|
write_val_bytes = float_to_bytes(write_val)
|
|
|
|
modbus_client.write_registers(db_tag['register_number'] - 1, write_val_bytes)
|
|
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 2).registers
|
|
read_val = lebyte_to_float(reg)
|
|
assert close_enough(write_val, read_val)
|
|
|
|
modbus_client.write_registers(db_tag['register_number'] - 1, db_val_before_bytes)
|
|
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 2).registers
|
|
read_val = lebyte_to_float(reg)
|
|
assert close_enough(db_val_before, read_val)
|
|
else:
|
|
write_val = db_val_before + 1
|
|
db_val_before_bytes = integer_to_byte(db_val_before)
|
|
write_val_bytes = integer_to_byte(write_val)
|
|
|
|
modbus_client.write_registers(db_tag['register_number'] - 1, write_val_bytes)
|
|
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 1).registers
|
|
read_val = lebyte_to_integer(reg)
|
|
assert write_val == read_val
|
|
|
|
modbus_client.write_registers(db_tag['register_number'] - 1, db_val_before_bytes)
|
|
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 1).registers
|
|
read_val = lebyte_to_integer(reg)
|
|
assert db_val_before == read_val
|