POC-108 work.

Adds unit testing. In the process of converting from scaled integers to using floating point values with a BinaryPayloadBuilder class
This commit is contained in:
Patrick McDonagh
2017-01-06 18:43:06 -06:00
parent 726b00ad9a
commit c6248bae95
3 changed files with 125 additions and 67 deletions

Binary file not shown.

View File

@@ -15,6 +15,8 @@ from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSparseDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.transaction import ModbusRtuFramer, ModbusAsciiFramer
from pymodbus.payload import BinaryPayloadBuilder
from pymodbus.constants import Endian
# ---------------------------------------------------------------------------#
# import the python libraries we need
@@ -33,6 +35,11 @@ log.setLevel(logging.DEBUG)
PLC_IP_ADDRESS = '10.20.4.7'
# ---------------------------------------------------------------------------#
# Datatype Mapping
# ---------------------------------------------------------------------------#
def getTagsFromDB():
client = MongoClient()
@@ -126,19 +133,23 @@ class AnalogTagDataBlock(ModbusSparseDataBlock):
'''
values = {}
self.register_type = register_type
register_values = {}
for t in tag_list:
builder = BinaryPayloadBuilder(endian=Endian.Little)
try:
values[t['register_number']] = t['val']
values[t['register_number'] + 1] = t['scale_type']
values[t['register_number'] + 2] = t['scale_multiplier']
values[t['register_number'] + 3] = 0
if t['tag_type'] == 'SINT':
builder.add_8bit_int(t['val_actual'])
elif t['tag_type'][:3] == 'INT':
builder.add_16bit_int(t['val_actual'])
elif t['tag_type'] == 'REAL':
builder.add_32bit_float(t['val_actual'])
register_values[t['register_number']] = builder.to_registers
except KeyError:
values[t['register_number']] = 911
values[t['register_number'] + 1] = 2
values[t['register_number'] + 2] = 0
values[t['register_number'] + 3] = 0
values[t['register_number'] + 4] = 123
register_values[t['register_number']] = []
print("No value for register #{} - {}".format(t['register_number'], t['tag_name']))
# print("Initialized AnalogTagDataBlock for {} with values {}".format(self.register_type, values))
super(AnalogTagDataBlock, self).__init__(values)

161
test.py
View File

@@ -1,21 +1,15 @@
#!/usr/bin/env python
'''
Pymodbus Synchronous Client Examples
--------------------------------------------------------------------------
The following is an example of how to use the synchronous modbus client
implementation from pymodbus.
It should be noted that the client can also be used with
the guard construct that is available in python 2.5 and up::
with ModbusClient('127.0.0.1') as client:
result = client.read_coils(1,10)
print result
'''
# ---------------------------------------------------------------------------#
# import the various server implementations
# ---------------------------------------------------------------------------#
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
# from pymodbus.client.sync import ModbusUdpClient as ModbusClient
# from pymodbus.client.sync import ModbusSerialClient as ModbusClient
# ---------------------------------------------------------------------------#
# import python packages
# ---------------------------------------------------------------------------#
from pymongo import MongoClient
import unittest
# ---------------------------------------------------------------------------#
# configure the client logging
@@ -24,56 +18,109 @@ import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)
# ---------------------------------------------------------------------------#
# choose the client you want
# Modbus Client Setup
# ---------------------------------------------------------------------------#
# make sure to start an implementation to hit against. For this
# you can use an existing device, the reference implementation in the tools
# directory, or start a pymodbus server.
#
# If you use the UDP or TCP clients, you can override the framer being used
# to use a custom implementation (say RTU over TCP). By default they use the
# socket framer::
#
# client = ModbusClient('localhost', port=5020, framer=ModbusRtuFramer)
#
# It should be noted that you can supply an ipv4 or an ipv6 host address for
# both the UDP and TCP clients.
#
# There are also other options that can be set on the client that controls
# how transactions are performed. The current ones are:
#
# * retries - Specify how many retries to allow per transaction (default = 3)
# * retry_on_empty - Is an empty response a retry (default = False)
# * source_address - Specifies the TCP source address to bind to
#
# Here is an example of using these options::
#
# client = ModbusClient('localhost', retries=3, retry_on_empty=True)
modbus_client = ModbusClient('localhost', port=5020)
# ---------------------------------------------------------------------------#
client = ModbusClient('localhost', port=5020)
# client = ModbusClient(method='ascii', port='/dev/pts/2', timeout=1)
# client = ModbusClient(method='rtu', port='/dev/pts/2', timeout=1)
client.connect()
# Mongo DB Setup
# ---------------------------------------------------------------------------#
mongo_client = MongoClient()
mongo_db = mongo_client.tag_data
mongo_tags = mongo_db.tag_vals
# ---------------------------------------------------------------------------#
# close enough function
# ---------------------------------------------------------------------------#
def close_enough(a, b):
'''
Determines if two values a and b are close enough to eachother to be considered equal
'''
if abs(a - b) < 0.01:
return True
return False
# ---------------------------------------------------------------------------#
# specify slave to query
# ---------------------------------------------------------------------------#
# The slave to query is specified in an optional parameter for each
# individual request. This can be done by specifying the `unit` parameter
# which defaults to `0x00`
# ---------------------------------------------------------------------------#
print("HOLDING REGISTERS")
for i in range(0, 21):
rd = client.read_holding_registers(i * 5, 5)
print rd.registers
class TestStringMethods(unittest.TestCase):
def test_holding_registers(self):
modbus_client.connect()
tag_in_db = mongo_tags.find({"register_type": 'hr'})
for db_tag in tag_in_db:
try:
database_actual_value = float(db_tag['val_actual'])
database_value = db_tag['val']
if db_tag['scale_type'] == 0:
database_calc_value = float(db_tag['val']) / float(db_tag['scale_multiplier'])
else:
database_calc_value = float(db_tag['val']) * float(db_tag['scale_multiplier'])
except KeyError:
print("No value in DB for {}".format(db_tag['tag_name']))
print("INPUT REGISTERS")
for i in range(2, 125):
rd = client.read_input_registers(i * 5, 5)
print rd.registers
try:
reg = modbus_client.read_holding_registers(db_tag['register_number'] - 1, 5).registers
# print(reg)
except AttributeError:
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
self.assertTrue(1 == 0)
next()
if reg:
modbus_value = reg[0]
if int(reg[1]) == 0:
modbus_actual_value = float(modbus_value) / float(reg[2])
elif int(reg[1]) == 1:
modbus_actual_value = float(modbus_value) * float(reg[2])
# ---------------------------------------------------------------------------#
# close the client
# ---------------------------------------------------------------------------#
client.close()
# print("Modbus Actual: {}".format(modbus_actual_value))
# print("Database Actual: {}".format(database_actual_value))
# print("Database Calc: {}".format(database_calc_value))
# print("Modbus Scaled: {}".format(modbus_value))
# print("Database Scaled: {}".format(database_value))
self.assertTrue(modbus_actual_value == database_calc_value)
self.assertTrue(modbus_value == database_value)
modbus_client.close()
def test_input_registers(self):
modbus_client.connect()
tag_in_db = mongo_tags.find({"register_type": 'ir'})
for db_tag in tag_in_db:
try:
database_actual_value = float(db_tag['val_actual'])
database_value = db_tag['val']
if db_tag['scale_type'] == 0:
database_calc_value = float(db_tag['val']) / float(db_tag['scale_multiplier'])
else:
database_calc_value = float(db_tag['val']) * float(db_tag['scale_multiplier'])
except KeyError:
print("No value in DB for {}".format(db_tag['tag_name']))
try:
reg = modbus_client.read_input_registers(db_tag['register_number'] - 1, 5).registers
# print(reg)
except AttributeError:
print("Could not get register {} for {}".format(db_tag['register_number'], db_tag['tag_name']))
continue
if reg:
modbus_value = reg[0]
if int(reg[1]) == 0:
modbus_actual_value = float(modbus_value) / float(reg[2])
elif int(reg[1]) == 1:
modbus_actual_value = float(modbus_value) * float(reg[2])
# print("Modbus Actual: {}".format(modbus_actual_value))
# print("Database Actual: {}".format(database_actual_value))
# print("Database Calc: {}".format(database_calc_value))
# print("Modbus Scaled: {}".format(modbus_value))
# print("Database Scaled: {}".format(database_value))
self.assertTrue(modbus_actual_value == database_calc_value)
self.assertTrue(modbus_value == database_value)
modbus_client.close()
if __name__ == '__main__':
unittest.main()