This repository was archived by the owner on Nov 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathReceiver.py
More file actions
107 lines (85 loc) · 3 KB
/
Receiver.py
File metadata and controls
107 lines (85 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
##########################################################
# Based on Announce.py and Echo.py #
# Rnode Setting Test receiver #
# github.com/faragher/RNode_Setting_Test/ #
##########################################################
import argparse
import random
import RNS
APP_NAME = "setting_test"
rcd_packets = 0
rcd_announce = 0
# This initialisation is executed when the program is started
def program_setup(configpath):
reticulum = RNS.Reticulum(configpath)
identity = RNS.Identity()
destination_1 = RNS.Destination(
identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
"GP"
)
RNS.log(
"Server addess: "+
RNS.prettyhexrep(destination_1.hash)
)
announce_handler = ExampleAnnounceHandler(
aspect_filter=None
)
# We register the announce handler with Reticulum
RNS.Transport.register_announce_handler(announce_handler)
destination_1.set_packet_callback(server_callback)
# Everything's ready!
# Let's hand over control to the announce loop
announceLoop()
def announceLoop():
global rcd_packets
global rcd_announce
print("Setting test receiver: Expects 10 trials.")
print("Waiting for data. (Enter to end test, Ctrl-C to abort)")
entered = input()
print("Recieved "+str(rcd_announce)+" announces. "+str(((10-rcd_announce)/10)*100)+"% lost")
print("Recieved "+str(rcd_packets)+" packets. "+str(((10-rcd_packets)/10)*100)+"% lost")
class ExampleAnnounceHandler:
def __init__(self, aspect_filter=None):
self.aspect_filter = aspect_filter
def received_announce(self, destination_hash, announced_identity, app_data):
global rcd_announce
if app_data:
rcd_announce = rcd_announce + 1
print(
"Received announce "+
app_data.decode("utf-8")
)
def server_callback(message, packet):
global rcd_packets
rcd_packets = rcd_packets + 1
print("Received packet "+message.decode("utf-8"))
##########################################################
#### Program Startup #####################################
##########################################################
# This part of the program gets run at startup,
# and parses input from the user, and then starts
# the desired program mode.
if __name__ == "__main__":
try:
parser = argparse.ArgumentParser(
description="Reticulum setting test receiver. Expects 10 announces and packets from a single source."
)
parser.add_argument(
"--config",
action="store",
default=None,
help="path to alternative Reticulum config directory",
type=str
)
args = parser.parse_args()
if args.config:
configarg = args.config
else:
configarg = None
program_setup(configarg)
except KeyboardInterrupt:
print("")
exit()