55 lines
2.0 KiB
Python
55 lines
2.0 KiB
Python
from pycomm.ab_comm.clx import Driver as ClxDriver
|
|
from pycomm.cip.cip_base import CommError, DataError
|
|
import time
|
|
import sys
|
|
TAG_DATAERROR_SLEEPTIME = 5
|
|
|
|
def read_tag(addr, tag, plc_type="CLX"):
|
|
"""Read a tag from the PLC."""
|
|
direct = plc_type == "Micro800"
|
|
clx = ClxDriver()
|
|
try:
|
|
if clx.open(addr, direct_connection=direct):
|
|
try:
|
|
val = clx.read_tag(tag)
|
|
clx.close()
|
|
return val
|
|
except DataError as err:
|
|
clx.close()
|
|
time.sleep(TAG_DATAERROR_SLEEPTIME)
|
|
print("Data Error during readTag({}, {}): {}".format(addr, tag, err))
|
|
except CommError:
|
|
# err = c.get_status()
|
|
#clx.close()
|
|
print("Could not connect during readTag({}, {})".format(addr, tag))
|
|
except AttributeError as err:
|
|
clx.close()
|
|
print("AttributeError during readTag({}, {}): \n{}".format(addr, tag, err))
|
|
clx.close()
|
|
return False
|
|
|
|
|
|
def write_tag(addr, tag, val, plc_type="CLX"):
|
|
"""Write a tag value to the PLC."""
|
|
direct = plc_type == "Micro800"
|
|
clx = ClxDriver()
|
|
try:
|
|
if clx.open(addr, direct_connection=direct):
|
|
try:
|
|
initial_val = clx.read_tag(tag)
|
|
write_status = clx.write_tag(tag, val, initial_val[1])
|
|
clx.close()
|
|
return write_status
|
|
except DataError as err:
|
|
clx_err = clx.get_status()
|
|
clx.close()
|
|
print("--\nDataError during writeTag({}, {}, {}, plc_type={}) -- {}\n{}\n".format(addr, tag, val, plc_type, err, clx_err))
|
|
except CommError as err:
|
|
clx_err = clx.get_status()
|
|
print("--\nCommError during write_tag({}, {}, {}, plc_type={})\n{}\n--".format(addr, tag, val, plc_type, err))
|
|
#clx.close()
|
|
return False
|
|
|
|
resp = write_tag(sys.argv[1], "Lifetime_Flow_Meter_Gal", float(sys.argv[2]), "Micro800")
|
|
if resp:
|
|
print(read_tag(sys.argv[1], "Lifetime_Flow_Meter_Gal", "Micro800")) |