Basic VFD and E300 devices. Can RW, but not control.
This commit is contained in:
32
E300_Test.py
Normal file
32
E300_Test.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""Test reading from an E300."""
|
||||
|
||||
from e300 import E300
|
||||
|
||||
olr = E300("10.20.4.9")
|
||||
|
||||
print("\nVOLTAGE VALUES")
|
||||
v_params = olr.read_voltage()
|
||||
for v in v_params:
|
||||
print(v_params[v])
|
||||
|
||||
print("\nCURRENT VALUES")
|
||||
i_params = olr.read_current()
|
||||
for i in i_params:
|
||||
print(i_params[i])
|
||||
|
||||
print("\nPOWER VALUES")
|
||||
p_params = olr.read_power()
|
||||
for p in p_params:
|
||||
print(p_params[p])
|
||||
|
||||
print("\nIO VALUES")
|
||||
io_params = olr.read_io()
|
||||
for io in io_params:
|
||||
print(io_params[io])
|
||||
|
||||
olr.write_parameter("Output 0", "USINT", 1)
|
||||
|
||||
print("\nIO VALUES")
|
||||
io_params = olr.read_io()
|
||||
for io in io_params:
|
||||
print(io_params[io])
|
||||
BIN
__pycache__/common.cpython-36.pyc
Normal file
BIN
__pycache__/common.cpython-36.pyc
Normal file
Binary file not shown.
BIN
__pycache__/e300.cpython-36.pyc
Normal file
BIN
__pycache__/e300.cpython-36.pyc
Normal file
Binary file not shown.
BIN
__pycache__/powerflex.cpython-36.pyc
Normal file
BIN
__pycache__/powerflex.cpython-36.pyc
Normal file
Binary file not shown.
38
common.py
Normal file
38
common.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""Hold common functions for CIP messaging."""
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def reverse_parameter_name(lower_underscore):
|
||||
"""Reverse the parameter lookup process."""
|
||||
lower_split = lower_underscore.split("_")
|
||||
upper_split = [x[0].upper() + x[1:] for x in lower_split]
|
||||
return " ".join(upper_split)
|
||||
|
||||
|
||||
def bitfield(n):
|
||||
"""Distribute integer into bits."""
|
||||
binary_with_padding = "0"*32 + bin(int(n))[2:]
|
||||
binary = binary_with_padding[-32:]
|
||||
bin_array = [1 if digit == '1' else 0 for digit in binary]
|
||||
bin_array.reverse()
|
||||
return bin_array
|
||||
|
||||
|
||||
class CIP_value(object):
|
||||
"""Value of vfd parameter."""
|
||||
|
||||
def __init__(self, name, units):
|
||||
"""Initialize VFD Parameter."""
|
||||
self.name = name
|
||||
self.value = None
|
||||
self.timestamp = 0
|
||||
self.units = units
|
||||
|
||||
def update(self, value):
|
||||
"""Update the current value."""
|
||||
self.value = value
|
||||
self.timestamp = datetime.now()
|
||||
|
||||
def __str__(self):
|
||||
"""Represent as a string."""
|
||||
return "name: {}, value: {} {}, timestamp: {}".format(self.name, self.value, self.units, self.timestamp)
|
||||
218
e300.py
Normal file
218
e300.py
Normal file
@@ -0,0 +1,218 @@
|
||||
"""Class definition for E300 overload relay."""
|
||||
|
||||
from cpppo.server.enip.get_attribute import proxy_simple
|
||||
import traceback
|
||||
from common import reverse_parameter_name, bitfield, CIP_value
|
||||
|
||||
|
||||
class E300(proxy_simple):
|
||||
"""Specific parameters and their addresses, for the E300 Overload Relay."""
|
||||
|
||||
VOLTAGE_PARAMETERS = dict(l1_to_n_voltage=proxy_simple.parameter('@0x4F/1/15', 'REAL', 'Volts'),
|
||||
l2_to_n_voltage=proxy_simple.parameter('@0x4F/1/16', 'REAL', 'Volts'),
|
||||
l3_to_n_voltage=proxy_simple.parameter('@0x4F/1/17', 'REAL', 'Volts'),
|
||||
avg_l_to_n_voltage=proxy_simple.parameter('@0x4F/1/18', 'REAL', 'Volts'),
|
||||
l1_to_l2_voltage=proxy_simple.parameter('@0x4F/1/19', 'REAL', 'Volts'),
|
||||
l2_to_l3_voltage=proxy_simple.parameter('@0x4F/1/20', 'REAL', 'Volts'),
|
||||
l3_to_l1_voltage=proxy_simple.parameter('@0x4F/1/21', 'REAL', 'Volts'),
|
||||
avg_l_to_l_voltage=proxy_simple.parameter('@0x4F/1/22', 'REAL', 'Volts'),
|
||||
percent_voltage_unbalance=proxy_simple.parameter('@0x4F/1/23', 'REAL', '%'),
|
||||
line_frequency=proxy_simple.parameter('@0x4F/1/9', 'REAL', 'Hz'),
|
||||
phase_rotation=proxy_simple.parameter('@0x4F/1/40', 'INT', 'ABC/ACB'),
|
||||
)
|
||||
CURRENT_PARAMETERS = dict(l1_current=proxy_simple.parameter('@0x4F/1/10', 'REAL', 'Amps'),
|
||||
l2_current=proxy_simple.parameter('@0x4F/1/11', 'REAL', 'Amps'),
|
||||
l3_current=proxy_simple.parameter('@0x4F/1/12', 'REAL', 'Amps'),
|
||||
average_current=proxy_simple.parameter('@0x4F/1/13', 'REAL', 'Amps'),
|
||||
percent_current_unbalance=proxy_simple.parameter('@0x4F/1/14', 'REAL', '%'),
|
||||
)
|
||||
|
||||
POWER_PARAMETERS = dict(l1_real_power=proxy_simple.parameter('@0x4F/1/24', 'REAL', 'kW'),
|
||||
l2_real_power=proxy_simple.parameter('@0x4F/1/25', 'REAL', 'kW'),
|
||||
l3_real_power=proxy_simple.parameter('@0x4F/1/26', 'REAL', 'kW'),
|
||||
total_real_power=proxy_simple.parameter('@0x4F/1/27', 'REAL', 'kW'),
|
||||
l1_reactive_power=proxy_simple.parameter('@0x4F/1/28', 'REAL', 'kVAR'),
|
||||
l2_reactive_power=proxy_simple.parameter('@0x4F/1/29', 'REAL', 'kVAR'),
|
||||
l3_reactive_power=proxy_simple.parameter('@0x4F/1/30', 'REAL', 'kVAR'),
|
||||
total_reactive_power=proxy_simple.parameter('@0x4F/1/31', 'REAL', 'kVAR'),
|
||||
l1_apparent_power=proxy_simple.parameter('@0x4F/1/32', 'REAL', 'kVA'),
|
||||
l2_apparent_power=proxy_simple.parameter('@0x4F/1/33', 'REAL', 'kVA'),
|
||||
l3_apparent_power=proxy_simple.parameter('@0x4F/1/34', 'REAL', 'kVA'),
|
||||
total_apparent_power=proxy_simple.parameter('@0x4F/1/35', 'REAL', 'kVA'),
|
||||
l1_true_power_factor=proxy_simple.parameter('@0x4F/1/36', 'REAL', '%'),
|
||||
l2_true_power_factor=proxy_simple.parameter('@0x4F/1/37', 'REAL', '%'),
|
||||
l3_true_power_factor=proxy_simple.parameter('@0x4F/1/38', 'REAL', '%'),
|
||||
three_phase_true_power_factor=proxy_simple.parameter('@0x4F/1/39', 'REAL', '%'),
|
||||
)
|
||||
IO_PARAMETERS = dict(output_0=proxy_simple.parameter('@0x09/1/3', 'SINT', 'I/O'),
|
||||
output_1=proxy_simple.parameter('@0x09/2/3', 'SINT', 'I/O'),
|
||||
output_2=proxy_simple.parameter('@0x09/3/3', 'SINT', 'I/O'),
|
||||
input_0=proxy_simple.parameter('@0x08/1/3', 'SINT', 'I/O'),
|
||||
input_1=proxy_simple.parameter('@0x08/2/3', 'SINT', 'I/O'),
|
||||
input_2=proxy_simple.parameter('@0x08/3/3', 'SINT', 'I/O'),
|
||||
)
|
||||
# CONFIGURATION_PARAMETERS = dict(motor_namplate_volts=proxy_simple.parameter('@0x93/25/9', 'REAL', 'VAC'),
|
||||
# motor_namplate_amps=proxy_simple.parameter('@0x93/26/9', 'REAL', 'Amps'),
|
||||
# motor_namplate_hertz=proxy_simple.parameter('@0x93/27/9', 'REAL', 'Hertz'),
|
||||
# motor_namplate_rpm=proxy_simple.parameter('@0x93/28/9', 'REAL', 'RPM'),
|
||||
# motor_namplate_powerunits=proxy_simple.parameter('@0x93/29/9', 'DINT', 'HP/kW'),
|
||||
# motor_namplate_power=proxy_simple.parameter('@0x93/30/9', 'REAL', 'HP/kW'),
|
||||
# motor_poles=proxy_simple.parameter('@0x93/31/9', 'DINT', 'Poles'),
|
||||
# pwm_frequency=proxy_simple.parameter('@0x93/38/9', 'REAL', 'kHz'),
|
||||
# rated_volts=proxy_simple.parameter('@0x93/20/9', 'REAL', 'VAC'),
|
||||
# rated_amps=proxy_simple.parameter('@0x93/21/9', 'REAL', 'Amps'),
|
||||
# rated_kw=proxy_simple.parameter('@0x93/22/9', 'REAL', 'kW'),
|
||||
# speed_units=proxy_simple.parameter('@0x93/300/9', 'DINT', 'Hz/RPM'),
|
||||
# duty_rating=proxy_simple.parameter('@0x93/306/9', 'DINT', 'ND/HD/LD'),
|
||||
# reset_meters=proxy_simple.parameter('@0x93/336/9', 'DINT', 'Ready/Energy/Time'),
|
||||
# db_resistor_type=proxy_simple.parameter('@0x93/382/9', 'DINT', 'Internal/External'),
|
||||
# db_resistor_ohms=proxy_simple.parameter('@0x93/383/9', 'REAL', 'Ohms'),
|
||||
# db_resistor_watts=proxy_simple.parameter('@0x93/384/9', 'REAL', 'Watts'),
|
||||
# speed_ref_a_sel=proxy_simple.parameter('@0x93/545/9', 'DINT', 'Param'),
|
||||
# )
|
||||
#
|
||||
# COMMAND_PARAMETERS = dict(logic_status=proxy_simple.parameter('@0x07/3/4', 'DINT', 'Bits'),
|
||||
# logic_command=proxy_simple.parameter('@0x07/4/1', 'BOOL', 'Bits'))
|
||||
|
||||
PARAMETERS = proxy_simple.PARAMETERS.copy()
|
||||
PARAMETERS.update(VOLTAGE_PARAMETERS)
|
||||
PARAMETERS.update(CURRENT_PARAMETERS)
|
||||
PARAMETERS.update(POWER_PARAMETERS)
|
||||
PARAMETERS.update(IO_PARAMETERS)
|
||||
# PARAMETERS.update(CONFIGURATION_PARAMETERS)
|
||||
# PARAMETERS.update(COMMAND_PARAMETERS)
|
||||
|
||||
def __init__(self, hostname):
|
||||
"""Intitialize the class."""
|
||||
super().__init__(hostname, route_path=[], send_path='')
|
||||
self.PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.PARAMETERS]
|
||||
self.PARAMETER_NAMES = self.PARAMETER_NAMES[3:]
|
||||
|
||||
self.VOLTAGE_PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.VOLTAGE_PARAMETERS]
|
||||
self.CURRENT_PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.CURRENT_PARAMETERS]
|
||||
self.POWER_PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.POWER_PARAMETERS]
|
||||
self.IO_PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.IO_PARAMETERS]
|
||||
# self.CONFIGURATION_PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.CONFIGURATION_PARAMETERS]
|
||||
# self.COMMAND_PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.COMMAND_PARAMETERS]
|
||||
self.voltage_values = {}
|
||||
self.current_values = {}
|
||||
self.power_values = {}
|
||||
self.io_values = {}
|
||||
# self.configuration_values = {}
|
||||
# self.command_values = {}
|
||||
|
||||
for p in self.VOLTAGE_PARAMETERS:
|
||||
self.voltage_values[reverse_parameter_name(p)] = CIP_value(reverse_parameter_name(p), self.VOLTAGE_PARAMETERS[p][2])
|
||||
|
||||
for p in self.CURRENT_PARAMETERS:
|
||||
self.current_values[reverse_parameter_name(p)] = CIP_value(reverse_parameter_name(p), self.CURRENT_PARAMETERS[p][2])
|
||||
|
||||
for p in self.POWER_PARAMETERS:
|
||||
self.power_values[reverse_parameter_name(p)] = CIP_value(reverse_parameter_name(p), self.POWER_PARAMETERS[p][2])
|
||||
|
||||
for p in self.IO_PARAMETERS:
|
||||
self.io_values[reverse_parameter_name(p)] = CIP_value(reverse_parameter_name(p), self.IO_PARAMETERS[p][2])
|
||||
# for p in self.CONFIGURATION_PARAMETERS:
|
||||
# self.configuration_values[reverse_parameter_name(p)] = CIP_value(reverse_parameter_name(p), self.CONFIGURATION_PARAMETERS[p][2])
|
||||
#
|
||||
# for p in self.COMMAND_PARAMETERS:
|
||||
# self.command_values[reverse_parameter_name(p)] = CIP_value(reverse_parameter_name(p), self.COMMAND_PARAMETERS[p][2])
|
||||
# print("Speed Ref A Set: {}".format(self.write_parameter("Speed Ref A Sel", 'DINT', 546))) # Set the Speed ref to read from Ethernet Adapter
|
||||
|
||||
def get_all_values(self):
|
||||
"""Retrieve all values stored for the VFD."""
|
||||
temp = self.configuration_values.copy()
|
||||
temp.update(self.voltage_values)
|
||||
temp.update(self.current_values)
|
||||
temp.update(self.power_values)
|
||||
temp.update(self.io_values)
|
||||
return temp
|
||||
|
||||
def write_parameter(self, param_name, param_type, value):
|
||||
"""Write a parameter to the VFD."""
|
||||
param = '%s = (%s)%s' % (param_name, param_type, value)
|
||||
print(param)
|
||||
try:
|
||||
via = self
|
||||
with via: # establish gateway, detects Exception (closing gateway)
|
||||
val, = via.write(
|
||||
via.parameter_substitution(param), checking=True)
|
||||
if val:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except Exception as exc:
|
||||
print("Exception writing Parameter %s: %s, %s", param, exc, traceback.format_exc())
|
||||
return False
|
||||
|
||||
def read_all(self):
|
||||
"""Read all values from the list of parameter names."""
|
||||
self.read_voltage()
|
||||
self.read_current()
|
||||
self.read_power()
|
||||
# self.read_configuration()
|
||||
# self.read_command()
|
||||
return self.get_all_values()
|
||||
|
||||
def read_voltage(self):
|
||||
"""Read Voltage Parameter values from the list of parameter names."""
|
||||
values = list(self.read(self.parameter_substitution(self.VOLTAGE_PARAMETER_NAMES)))
|
||||
for x in range(0, len(values)):
|
||||
if values[x] is not None:
|
||||
self.voltage_values[self.VOLTAGE_PARAMETER_NAMES[x]].update(values[x][0])
|
||||
return self.voltage_values
|
||||
|
||||
def read_current(self):
|
||||
"""Read Current Parameter values from the list of parameter names."""
|
||||
values = list(self.read(self.parameter_substitution(self.CURRENT_PARAMETER_NAMES)))
|
||||
for x in range(0, len(values)):
|
||||
if values[x] is not None:
|
||||
self.current_values[self.CURRENT_PARAMETER_NAMES[x]].update(values[x][0])
|
||||
return self.current_values
|
||||
|
||||
def read_power(self):
|
||||
"""Read Power Parameter values from the list of parameter names."""
|
||||
values = list(self.read(self.parameter_substitution(self.POWER_PARAMETER_NAMES)))
|
||||
for x in range(0, len(values)):
|
||||
if values[x] is not None:
|
||||
self.power_values[self.POWER_PARAMETER_NAMES[x]].update(values[x][0])
|
||||
return self.power_values
|
||||
|
||||
def read_io(self):
|
||||
"""Read IO Parameter values from the list of parameter names."""
|
||||
values = list(self.read(self.parameter_substitution(self.IO_PARAMETER_NAMES)))
|
||||
for x in range(0, len(values)):
|
||||
if values[x] is not None:
|
||||
self.io_values[self.IO_PARAMETER_NAMES[x]].update(values[x][0])
|
||||
return self.io_values
|
||||
|
||||
# def read_configuration(self):
|
||||
# """Read Running Parameter values from the list of parameter names."""
|
||||
# values = list(self.read(self.parameter_substitution(self.CONFIGURATION_PARAMETER_NAMES)))
|
||||
# for x in range(0, len(values)):
|
||||
# if values[x] is not None:
|
||||
# self.configuration_values[self.CONFIGURATION_PARAMETER_NAMES[x]].update(values[x][0])
|
||||
# return self.configuration_values
|
||||
#
|
||||
# def read_command(self):
|
||||
# """Read Command Parameter values from the list of parameter names."""
|
||||
# values = list(self.read(self.parameter_substitution(self.COMMAND_PARAMETER_NAMES)))
|
||||
# for x in range(0, len(values)):
|
||||
# if values[x] is not None:
|
||||
# bit_map = bitfield(values[x][0])
|
||||
# value_map = {}
|
||||
# for i in range(0, len(bit_map)):
|
||||
# try:
|
||||
# value_map[COMMAND_MAP[self.COMMAND_PARAMETER_NAMES[x]][i]] = bit_map[i]
|
||||
# except KeyError:
|
||||
# pass
|
||||
# self.command_values[self.COMMAND_PARAMETER_NAMES[x]].update(value_map)
|
||||
# return self.command_values
|
||||
#
|
||||
# def start(self):
|
||||
# """Start the VFD."""
|
||||
# return self.write_parameter("Logic Command", "INT", 2)
|
||||
#
|
||||
# def stop(self):
|
||||
# """Start the VFD."""
|
||||
# return self.write_parameter("Logic Command", "INT", 1)
|
||||
190
powerflex.py
Normal file
190
powerflex.py
Normal file
@@ -0,0 +1,190 @@
|
||||
"""Class definition of PowerFlex Drives."""
|
||||
|
||||
from cpppo.server.enip.get_attribute import proxy_simple
|
||||
import traceback
|
||||
from common import reverse_parameter_name, bitfield, CIP_value
|
||||
|
||||
|
||||
class powerflex(proxy_simple):
|
||||
"""Generic powerflex drive."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
COMMAND_MAP = {"Logic Status": {
|
||||
0: "Run Ready",
|
||||
1: "Active",
|
||||
2: "Command Direction",
|
||||
3: "Actual Direction",
|
||||
4: "Accelerating",
|
||||
5: "Decelerating",
|
||||
6: "Alarm",
|
||||
7: "Fault",
|
||||
8: "At Setpoint Speed",
|
||||
9: "Manual",
|
||||
16: "Running",
|
||||
17: "Jogging",
|
||||
18: "Stopping",
|
||||
19: "DC Brake",
|
||||
20: "DB Active",
|
||||
21: "Speed Mode",
|
||||
22: "Position Mode",
|
||||
23: "Torque Mode",
|
||||
24: "At Zero Speed",
|
||||
25: "At Home",
|
||||
26: "At Limit",
|
||||
27: "Current Limit",
|
||||
28: "Bus Freq Reg",
|
||||
29: "Enable On",
|
||||
30: "Motor Overload",
|
||||
31: "Regen",
|
||||
}, "Logic Command": {
|
||||
0: "Normal Stop",
|
||||
1: "Start",
|
||||
2: "Jog 1",
|
||||
3: "Clear Fault",
|
||||
6: "Manual",
|
||||
16: "Coast Stop",
|
||||
17: "Current Limit Stop",
|
||||
18: "Run",
|
||||
19: "Jog 2",
|
||||
}}
|
||||
|
||||
|
||||
class powerflex755(powerflex):
|
||||
"""Specific parameters and their addresses, for the PowerFlex 750 Series AC drives."""
|
||||
|
||||
RUNNING_PARAMETERS = dict(output_frequency=powerflex.parameter('@0x93/1/9', 'REAL', 'Hz'),
|
||||
speed_ref=powerflex.parameter('@0x93/2/9', 'REAL', 'Hz'),
|
||||
motor_velocity=powerflex.parameter('@0x93/3/9', 'REAL', 'Hz/RPM'),
|
||||
output_current=powerflex.parameter('@0x93/7/9', 'REAL', 'Amps'),
|
||||
output_voltage=powerflex.parameter('@0x93/8/9', 'REAL', 'VAC'),
|
||||
output_power=powerflex.parameter('@0x93/9/9', 'REAL', 'kW'),
|
||||
output_powerfactor=powerflex.parameter('@0x93/10/9', 'REAL', 'kW'),
|
||||
dc_bus_volts=powerflex.parameter('@0x93/11/9', 'REAL', 'VDC'),
|
||||
elapsed_mwh=powerflex.parameter('@0x93/13/9', 'REAL', 'MWh'),
|
||||
elapsed_kwh=powerflex.parameter('@0x93/14/9', 'REAL', 'kWh'),
|
||||
elapsed_run_time=powerflex.parameter('@0x93/15/9', 'REAL', 'Hrs'),
|
||||
speed_ref_a_stpt=powerflex.parameter('@0x93/546/9', 'REAL', 'Hz'),
|
||||
)
|
||||
|
||||
CONFIGURATION_PARAMETERS = dict(motor_namplate_volts=powerflex.parameter('@0x93/25/9', 'REAL', 'VAC'),
|
||||
motor_namplate_amps=powerflex.parameter('@0x93/26/9', 'REAL', 'Amps'),
|
||||
motor_namplate_hertz=powerflex.parameter('@0x93/27/9', 'REAL', 'Hertz'),
|
||||
motor_namplate_rpm=powerflex.parameter('@0x93/28/9', 'REAL', 'RPM'),
|
||||
motor_namplate_powerunits=powerflex.parameter('@0x93/29/9', 'DINT', 'HP/kW'),
|
||||
motor_namplate_power=powerflex.parameter('@0x93/30/9', 'REAL', 'HP/kW'),
|
||||
motor_poles=powerflex.parameter('@0x93/31/9', 'DINT', 'Poles'),
|
||||
pwm_frequency=powerflex.parameter('@0x93/38/9', 'REAL', 'kHz'),
|
||||
rated_volts=powerflex.parameter('@0x93/20/9', 'REAL', 'VAC'),
|
||||
rated_amps=powerflex.parameter('@0x93/21/9', 'REAL', 'Amps'),
|
||||
rated_kw=powerflex.parameter('@0x93/22/9', 'REAL', 'kW'),
|
||||
speed_units=powerflex.parameter('@0x93/300/9', 'DINT', 'Hz/RPM'),
|
||||
duty_rating=powerflex.parameter('@0x93/306/9', 'DINT', 'ND/HD/LD'),
|
||||
reset_meters=powerflex.parameter('@0x93/336/9', 'DINT', 'Ready/Energy/Time'),
|
||||
db_resistor_type=powerflex.parameter('@0x93/382/9', 'DINT', 'Internal/External'),
|
||||
db_resistor_ohms=powerflex.parameter('@0x93/383/9', 'REAL', 'Ohms'),
|
||||
db_resistor_watts=powerflex.parameter('@0x93/384/9', 'REAL', 'Watts'),
|
||||
speed_ref_a_sel=powerflex.parameter('@0x93/545/9', 'DINT', 'Param'),
|
||||
)
|
||||
|
||||
COMMAND_PARAMETERS = dict(logic_status=powerflex.parameter('@0x07/3/4', 'DINT', 'Bits'),
|
||||
logic_command=powerflex.parameter('@0x07/4/1', 'BOOL', 'Bits'))
|
||||
|
||||
PARAMETERS = powerflex.PARAMETERS.copy()
|
||||
PARAMETERS.update(RUNNING_PARAMETERS)
|
||||
PARAMETERS.update(CONFIGURATION_PARAMETERS)
|
||||
PARAMETERS.update(COMMAND_PARAMETERS)
|
||||
# PARAMETERS = dict(powerflex.PARAMETERS + RUNNING_PARAMETERS + CONFIGURATION_PARAMETERS)
|
||||
|
||||
def __init__(self, hostname):
|
||||
"""Intitialize the class."""
|
||||
super().__init__(hostname, route_path=[], send_path='')
|
||||
self.PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.PARAMETERS]
|
||||
self.PARAMETER_NAMES = self.PARAMETER_NAMES[3:]
|
||||
|
||||
self.RUNNING_PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.RUNNING_PARAMETERS]
|
||||
self.CONFIGURATION_PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.CONFIGURATION_PARAMETERS]
|
||||
self.COMMAND_PARAMETER_NAMES = [reverse_parameter_name(p) for p in self.COMMAND_PARAMETERS]
|
||||
self.running_values = {}
|
||||
self.configuration_values = {}
|
||||
self.command_values = {}
|
||||
|
||||
for p in self.RUNNING_PARAMETERS:
|
||||
self.running_values[reverse_parameter_name(p)] = CIP_value(reverse_parameter_name(p), self.RUNNING_PARAMETERS[p][2])
|
||||
|
||||
for p in self.CONFIGURATION_PARAMETERS:
|
||||
self.configuration_values[reverse_parameter_name(p)] = CIP_value(reverse_parameter_name(p), self.CONFIGURATION_PARAMETERS[p][2])
|
||||
|
||||
for p in self.COMMAND_PARAMETERS:
|
||||
self.command_values[reverse_parameter_name(p)] = CIP_value(reverse_parameter_name(p), self.COMMAND_PARAMETERS[p][2])
|
||||
# print("Speed Ref A Set: {}".format(self.write_parameter("Speed Ref A Sel", 'DINT', 546))) # Set the Speed ref to read from Ethernet Adapter
|
||||
|
||||
def get_all_values(self):
|
||||
"""Retrieve all values stored for the VFD."""
|
||||
temp = self.configuration_values.copy()
|
||||
temp.update(self.running_values)
|
||||
return temp
|
||||
|
||||
def write_parameter(self, param_name, param_type, value):
|
||||
"""Write a parameter to the VFD."""
|
||||
param = '%s = (%s)%s' % (param_name, param_type, value)
|
||||
print(param)
|
||||
try:
|
||||
via = self
|
||||
with via: # establish gateway, detects Exception (closing gateway)
|
||||
val, = via.write(
|
||||
via.parameter_substitution(param), checking=True)
|
||||
if val:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except Exception as exc:
|
||||
print("Exception writing Parameter %s: %s, %s", param, exc, traceback.format_exc())
|
||||
return False
|
||||
|
||||
def read_all(self):
|
||||
"""Read all values from the list of parameter names."""
|
||||
self.read_running()
|
||||
self.read_configuration()
|
||||
self.read_command()
|
||||
return self.get_all_values()
|
||||
|
||||
def read_running(self):
|
||||
"""Read Running Parameter values from the list of parameter names."""
|
||||
values = list(self.read(self.parameter_substitution(self.RUNNING_PARAMETER_NAMES)))
|
||||
for x in range(0, len(values)):
|
||||
if values[x] is not None:
|
||||
self.running_values[self.RUNNING_PARAMETER_NAMES[x]].update(values[x][0])
|
||||
return self.running_values
|
||||
|
||||
def read_configuration(self):
|
||||
"""Read Running Parameter values from the list of parameter names."""
|
||||
values = list(self.read(self.parameter_substitution(self.CONFIGURATION_PARAMETER_NAMES)))
|
||||
for x in range(0, len(values)):
|
||||
if values[x] is not None:
|
||||
self.configuration_values[self.CONFIGURATION_PARAMETER_NAMES[x]].update(values[x][0])
|
||||
return self.configuration_values
|
||||
|
||||
def read_command(self):
|
||||
"""Read Command Parameter values from the list of parameter names."""
|
||||
values = list(self.read(self.parameter_substitution(self.COMMAND_PARAMETER_NAMES)))
|
||||
for x in range(0, len(values)):
|
||||
if values[x] is not None:
|
||||
bit_map = bitfield(values[x][0])
|
||||
value_map = {}
|
||||
for i in range(0, len(bit_map)):
|
||||
try:
|
||||
value_map[COMMAND_MAP[self.COMMAND_PARAMETER_NAMES[x]][i]] = bit_map[i]
|
||||
except KeyError:
|
||||
pass
|
||||
self.command_values[self.COMMAND_PARAMETER_NAMES[x]].update(value_map)
|
||||
return self.command_values
|
||||
|
||||
def start(self):
|
||||
"""Start the VFD."""
|
||||
return self.write_parameter("Logic Command", "INT", 2)
|
||||
|
||||
def stop(self):
|
||||
"""Start the VFD."""
|
||||
return self.write_parameter("Logic Command", "INT", 1)
|
||||
BIN
powerflex.pyc
Normal file
BIN
powerflex.pyc
Normal file
Binary file not shown.
34
vfd_test.py
Normal file
34
vfd_test.py
Normal file
@@ -0,0 +1,34 @@
|
||||
|
||||
"""Test reading data from a Rockwell VFD."""
|
||||
from powerflex import powerflex755
|
||||
from time import sleep
|
||||
|
||||
|
||||
pf755 = powerflex755("10.20.4.11")
|
||||
|
||||
config_values = pf755.read_configuration()
|
||||
print("CONFIGURATION VALUES")
|
||||
for x in config_values:
|
||||
print(config_values[x])
|
||||
|
||||
running_vals = pf755.read_running()
|
||||
print("RUNNING VALUES")
|
||||
for x in running_vals:
|
||||
print(running_vals[x])
|
||||
|
||||
command_vals = pf755.read_command()
|
||||
print("COMMAND VALUES")
|
||||
for x in command_vals:
|
||||
print(command_vals[x])
|
||||
|
||||
# pf755.start()
|
||||
i = 0
|
||||
while(i < 10):
|
||||
running_vals = pf755.read_running()
|
||||
print("RUNNING VALUES")
|
||||
for x in running_vals:
|
||||
print(running_vals[x])
|
||||
i += 1
|
||||
sleep(1)
|
||||
|
||||
# pf755.stop()
|
||||
Reference in New Issue
Block a user