#!/usr/bin/env python # ---------------------------------------------------------------------------# # import the various server implementations # ---------------------------------------------------------------------------# from pymodbus.client.sync import ModbusTcpClient as ModbusClient # ---------------------------------------------------------------------------# # import python packages # ---------------------------------------------------------------------------# from pymongo import MongoClient import pytest import struct # ---------------------------------------------------------------------------# # configure the client logging # ---------------------------------------------------------------------------# import logging logging.basicConfig() log = logging.getLogger() log.setLevel(logging.INFO) # ---------------------------------------------------------------------------# # Modbus Client Setup # ---------------------------------------------------------------------------# modbus_client = ModbusClient('localhost', port=502) # ---------------------------------------------------------------------------# # Mongo DB Setup # ---------------------------------------------------------------------------# mongo_client = MongoClient() mongo_db = mongo_client.tag_data mongo_tags = mongo_db.tag_vals def close_enough(a, b): return abs(a - b) < 0.001 def float_to_bytes(float_val): ''' Converts a float to little-endian bytes ''' packed_string = struct.pack('f', float_val) unpacked_list = list(struct.unpack('HH', packed_string)) return unpacked_list def integer_to_byte(integer_val): ''' Converts an integer to its byte ''' packed_string = struct.pack('h', integer_val) unpacked = list(struct.unpack('H', packed_string)) return unpacked def lebyte_to_float(word_list): ''' Converts list of little-endian bytes to float ''' packed_string = struct.pack("HH", *word_list) unpacked_float = struct.unpack("f", packed_string)[0] return unpacked_float def lebyte_to_integer(word_list): ''' Converts list(size = 1) of little-endian bytes to Integer ''' try: packed_string = struct.pack("H", *word_list) unpacked_int = struct.unpack("h", packed_string)[0] except Exception as e: print("Unable to convert {} to integer".format(word_list)) return False return unpacked_int # ---------------------------------------------------------------------------# # specify slave to query # ---------------------------------------------------------------------------# class TestModbusDatabaseValues: def test_read_holding_registers(self): modbus_client.connect() tag_in_db = mongo_tags.find({"register_type": 'hr'}) for db_tag in tag_in_db: try: database_value = db_tag['val'] except KeyError: print("No value in DB for {}".format(db_tag['tag_name'])) reg = [] try: if db_tag['tag_type'][-3:] == 'INT': reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 1).registers elif db_tag['tag_type'] == 'REAL': reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 2).registers # print(reg) except AttributeError: print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name'])) continue if reg: if len(reg) == 2: # r = reg.reverse() modbus_value = lebyte_to_float(reg) else: modbus_value = lebyte_to_integer(reg) # print("Modbus Actual: {}".format(modbus_actual_value)) # print("Database Actual: {}".format(database_actual_value)) # print("Database Calc: {}".format(database_calc_value)) if not modbus_value == database_value: print("---- {}: {} ----".format(db_tag['tag_name'], db_tag['register_number'])) print(reg) print("Modbus Scaled: {}".format(modbus_value)) print("Database Scaled: {}".format(database_value)) assert modbus_value == database_value modbus_client.close() def test_read_input_registers(self): modbus_client.connect() tag_in_db = mongo_tags.find({"register_type": 'ir'}) for db_tag in tag_in_db: try: database_value = db_tag['val'] except KeyError: print("No value in DB for {}".format(db_tag['tag_name'])) reg = [] try: if db_tag['tag_type'][-3:] == 'INT': reg = modbus_client.read_input_registers(db_tag['register_number'] - 1, 1).registers elif db_tag['tag_type'] == 'REAL': reg = modbus_client.read_input_registers(db_tag['register_number'] - 1, 2).registers elif db_tag['tag_type'] == 'ARRAY': continue # print(reg) except AttributeError: print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name'])) continue if reg: if len(reg) > 0: if len(reg) == 2: # r = reg.reverse() modbus_value = lebyte_to_float(reg) elif len(reg) == 1: modbus_value = lebyte_to_integer(reg) # print("Modbus Actual: {}".format(modbus_actual_value)) # print("Database Actual: {}".format(database_actual_value)) # print("Database Calc: {}".format(database_calc_value)) if not modbus_value == database_value: print("---- {}: {} ----".format(db_tag['tag_name'], db_tag['register_number'])) print(reg) print("Modbus Scaled: {}".format(modbus_value)) print("Database Scaled: {}".format(database_value)) assert modbus_value == database_value modbus_client.close() def test_read_arrays(self): modbus_client.connect() tag_in_db = mongo_tags.find({"register_type": 'ir'}) for db_tag in tag_in_db: try: database_value = db_tag['val'] except KeyError: print("No value in DB for {}".format(db_tag['tag_name'])) reg = [] array_values = [] try: if db_tag['tag_type'] == 'ARRAY': array_regs = [] db_vals = db_tag['val'] reg_len = db_tag['arr_len'] * 2 get_len = 125 num_gets = int(reg_len) / get_len last_get = reg_len % get_len for x in range(0, num_gets): inp_register = (int(db_tag['register_number']) - 1) + (get_len * x) read_registers = modbus_client.read_input_registers(inp_register, get_len).registers array_regs = array_regs + read_registers if last_get > 0: array_regs = array_regs + modbus_client.read_input_registers((db_tag['register_number'] - 1) + (get_len * num_gets), last_get).registers for i in range(0, reg_len, 2): array_values.append(lebyte_to_float([array_regs[i], array_regs[i + 1]])) else: continue except AttributeError: print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name'])) continue if not array_values == db_vals: if len(array_values) > 0: for i in range(0, len(array_values)): print("{} = {} - {}".format(i, array_values[i], db_vals[i])) assert array_values == db_vals modbus_client.close() def test_read_coils(self): modbus_client.connect() tag_in_db = mongo_tags.find({"register_type": 'co'}) for db_tag in tag_in_db: try: database_value = db_tag['val'] except KeyError: print("No value in DB for {}".format(db_tag['tag_name'])) try: reg = modbus_client.read_coils(db_tag['register_number'] - 1, 1).bits if not database_value == reg[0]: print("{} =/= {}".format(database_value, reg[0])) assert database_value == reg[0] except AttributeError: print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name'])) continue def test_read_digital_inputs(self): modbus_client.connect() tag_in_db = mongo_tags.find({"register_type": 'di'}) for db_tag in tag_in_db: try: database_value = db_tag['val'] except KeyError: print("No value in DB for {}".format(db_tag['tag_name'])) try: reg = modbus_client.read_discrete_inputs(db_tag['register_number'] - 1, 1).bits if not database_value == reg[0]: print("{} =/= {}".format(database_value, reg[0])) assert database_value == reg[0] except AttributeError: print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name'])) continue def test_write_coils(self): modbus_client.connect() tag_in_db = mongo_tags.find({"register_type": 'co'}) for db_tag in tag_in_db: db_val_before = db_tag['val'] if db_val_before == 0: write_val = 1 else: write_val = 0 modbus_client.write_coil(db_tag['register_number'] - 1, write_val) reg = modbus_client.read_coils(db_tag['register_number'] - 1, 1).bits assert write_val == reg[0] modbus_client.write_coil(db_tag['register_number'] - 1, db_val_before) reg = modbus_client.read_coils(db_tag['register_number'] - 1, 1).bits assert db_val_before == reg[0] def test_write_coils(self): modbus_client.connect() tag_in_db = mongo_tags.find({"register_type": 'hr'}) for db_tag in tag_in_db: db_val_before = db_tag['val'] if db_tag['tag_type'] == "REAL": write_val = db_val_before + 1.0 db_val_before_bytes = float_to_bytes(db_val_before) write_val_bytes = float_to_bytes(write_val) modbus_client.write_registers(db_tag['register_number'] - 1, write_val_bytes) reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 2).registers read_val = lebyte_to_float(reg) assert close_enough(write_val, read_val) modbus_client.write_registers(db_tag['register_number'] - 1, db_val_before_bytes) reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 2).registers read_val = lebyte_to_float(reg) assert close_enough(db_val_before, read_val) else: write_val = db_val_before + 1 db_val_before_bytes = integer_to_byte(db_val_before) write_val_bytes = integer_to_byte(write_val) modbus_client.write_registers(db_tag['register_number'] - 1, write_val_bytes) reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 1).registers read_val = lebyte_to_integer(reg) assert write_val == read_val modbus_client.write_registers(db_tag['register_number'] - 1, db_val_before_bytes) reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 1).registers read_val = lebyte_to_integer(reg) assert db_val_before == read_val