136 lines
4.7 KiB
Python
136 lines
4.7 KiB
Python
"""Simulate Tag values for the web server."""
|
|
|
|
import requests
|
|
import json
|
|
import logging
|
|
import time
|
|
from random import random
|
|
|
|
API_URL = "http://localhost:8000/api/v1"
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
def get_tags(url):
|
|
"""Get tag configuration from the web server."""
|
|
get_tag_request = requests.get("{}/tags".format(url))
|
|
tags = json.loads(get_tag_request.text)
|
|
return tags
|
|
|
|
|
|
def generate_random_values(last_val, min_val, max_val):
|
|
"""Generate a random value within 10 percent of the last value, but between the max and min expected."""
|
|
val_change = (random() - 0.5) * (max_val - min_val) * 0.1
|
|
intended_val = last_val + val_change
|
|
if (intended_val > max_val) or (intended_val < min_val):
|
|
return last_val - val_change
|
|
return intended_val
|
|
|
|
|
|
class Tag(object):
|
|
"""Holds the configuration for a tag."""
|
|
|
|
def __init__(self, tag_id, name, class_type, tag, device_type, device_address, plc_data_type, change_threshold, guarantee_sec, min_expected, max_expected, units):
|
|
"""Initialize the Tag object."""
|
|
self.tag_id = tag_id
|
|
self.name = name
|
|
self.class_type = class_type
|
|
self.tag = tag
|
|
self.device_type = device_type
|
|
self.device_address = device_address
|
|
self.plc_data_type = plc_data_type
|
|
self.change_threshold = change_threshold
|
|
self.guarantee_sec = guarantee_sec
|
|
self.min_expected = min_expected
|
|
self.max_expected = max_expected
|
|
self.timestamp = 0
|
|
|
|
if self.plc_data_type == "REAL":
|
|
self.value = float("-inf")
|
|
elif self.plc_data_type == "INT":
|
|
self.value = int("-inf")
|
|
elif self.plc_data_type == "BOOL":
|
|
self.value = None
|
|
else:
|
|
raise AttributeError("bad PLC type")
|
|
self.live_value = self.value
|
|
self.no_value = self.value
|
|
|
|
def send(self):
|
|
"""Send the value to the web API."""
|
|
self.value = self.live_value
|
|
send_obj = {
|
|
"tagId": self.tag_id,
|
|
"val": str(self.value)
|
|
}
|
|
|
|
post_req = requests.post("{}/tagValue".format(API_URL), data=json.dumps(send_obj))
|
|
if post_req.status_code == 201:
|
|
self.timestamp = time.time()
|
|
logging.info("Sent {} for {} at {}".format(self.value, self.name, self.timestamp))
|
|
return True
|
|
logging.error("Unable to send {} to {}/tagValues".format(json.dumps(send_obj), API_URL))
|
|
raise EnvironmentError("Did not receive status code 201 on POST request, got {}\n{}".format(post_req.status_code, post_req.text))
|
|
return False
|
|
|
|
def read(self):
|
|
"""Read the value of the tag."""
|
|
pass
|
|
|
|
def check(self):
|
|
"""Check the value and send if necessary."""
|
|
if self.read():
|
|
self.send()
|
|
|
|
|
|
class SimTag(Tag):
|
|
"""Simulated tag."""
|
|
|
|
def read(self):
|
|
"""Read the value of the tag."""
|
|
self.live_value = self.generate_random_values()
|
|
|
|
if self.value == self.no_value:
|
|
logging.info("Need to send {} = {} for first value.".format(self.name, self.live_value))
|
|
return True
|
|
elif abs(self.live_value - self.value) > self.change_threshold:
|
|
logging.info("Need to send {} = {} for value change.".format(self.name, self.live_value))
|
|
return True
|
|
elif (time.time() - self.timestamp) > self.change_threshold:
|
|
logging.info("Need to send {} = {} for periodic send.".format(self.name, self.live_value))
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def generate_random_values(self):
|
|
"""Generate a random value within 10 percent of the last value, but between the max and min expected."""
|
|
if self.value == self.no_value:
|
|
return self.min_expected + (self.max_expected - self.min_expected) * random()
|
|
|
|
val_change = (random() - 0.5) * (self.max_expected - self.min_expected) * 0.1
|
|
intended_val = self.live_value + val_change
|
|
if (intended_val > self.max_expected) or (intended_val < self.min_expected):
|
|
return self.live_value - val_change
|
|
return intended_val
|
|
|
|
|
|
def main():
|
|
"""Main function."""
|
|
tags_json = get_tags(API_URL)
|
|
tag_list = []
|
|
for t in tags_json:
|
|
newSimTag = SimTag(t['id'], t['name'], t['tagClass']['classType'], t['tagName'],
|
|
t['device']['deviceType']['name'], t['device']['address'],
|
|
t['dataType']['plcType'], t['changeThreshold'], t['guaranteeSec'],
|
|
t['minExpected'], t['maxExpected'], t['units'])
|
|
tag_list.append(newSimTag)
|
|
|
|
while True:
|
|
for tag in tag_list:
|
|
tag.check()
|
|
time.sleep(5)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|