This repository was archived by the owner on Apr 8, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutils.py
More file actions
120 lines (105 loc) · 3.25 KB
/
utils.py
File metadata and controls
120 lines (105 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import json
from jsonschema import validate
from jsonschema.exceptions import ValidationError
import numpy as np
import sys
import os
# Global Variables
def initGlobals():
global signals
signals = {}
global plot_x_range_sec
plot_x_range_sec = 10
global events
events = []
global b_str
b_str = "VCAN"
global data_types
data_types = {
'uint8_t':np.dtype('<u1'),
'uint16_t':np.dtype('<u2'),
'uint32_t':np.dtype('<u4'),
'uint64_t':np.dtype('<u8'),
'int8_t':np.dtype('<i1'),
'int16_t':np.dtype('<i2'),
'int32_t':np.dtype('<i4'),
'int64_t':np.dtype('<i8'),
'float':np.dtype('<f4') # 32 bit
}
global data_type_length
data_type_length = {'uint8_t':8, 'uint16_t':16, 'uint32_t':32, 'uint64_t':64,
'int8_t':8, 'int16_t':16, 'int32_t':32, 'int64_t':64,
'float':32}
global debug_mode
debug_mode = False
global dark_mode
dark_mode = False
global daqProt
daqProt = None
global logging_paused
logging_paused = True
# Logging helper functions
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def log_error(phrase):
print(f"{bcolors.FAIL}ERROR: {phrase}{bcolors.ENDC}")
def log_warning(phrase):
log(f"{bcolors.WARNING}WARNING: {phrase}{bcolors.ENDC}")
def log_success(phrase):
log(f"{bcolors.OKGREEN}{phrase}{bcolors.ENDC}")
def log(phrase):
global debug_mode
if debug_mode: print(phrase)
def load_json_config(config_path, schema_path):
""" loads config from json and validates with schema """
config = json.load(open(config_path))
schema = json.load(open(schema_path))
# compare with schema
try:
validate(config, schema)
except ValidationError as e:
log_error("Invalid JSON!")
print(e)
sys.exit(1)
return config
def load_json_nodes(nodes_fpath):
""" loads each node config from a folder and combines it into one dict """
can_config = {}
for file in os.listdir(nodes_fpath):
if file.endswith(".json"):
node_config = json.load(open(os.path.join(nodes_fpath, file)))
node_busses = node_config['busses']
existing_busses = can_config.get('busses', [])
for bus in node_busses:
found = False
for existing_bus in existing_busses:
if bus['bus_name'] == existing_bus['bus_name']:
found = True
existing_bus['nodes'].extend(bus['nodes'])
break
if not found:
existing_busses.append(bus)
can_config['busses'] = existing_busses
return can_config
def clearDictItems(dictionary:dict):
""" recursively calls clear on items in multidimensional dict"""
for key, value in dictionary.items():
if type(value) is dict:
clearDictItems(value)
else:
value.clear()
def logEvent(time, msg):
global events
events.append([time, msg])
print(events)
def clearEvents():
global events
events = []