diff --git a/05-mixnets/Drastijk-router-go/README.md b/05-mixnets/Drastijk-router-go/README.md new file mode 100644 index 00000000..481f91cb --- /dev/null +++ b/05-mixnets/Drastijk-router-go/README.md @@ -0,0 +1,8 @@ +### MTU и Congestion threshold + +В рамках задачи реализовно: +1. Механизм MTU +2. Интегрирован алгоритм контроля загрузки сети. +Алгоритм реализован по мотивам этого объяснения: https://www.eventhelix.com/congestion-control/algorithms/ + +Значения для работы алгоритма и MTU выставлены в `константах в peer.go` \ No newline at end of file diff --git a/05-mixnets/Drastijk-router-go/router/node.go b/05-mixnets/Drastijk-router-go/router/node.go index 7336e3ec..24035dd5 100644 --- a/05-mixnets/Drastijk-router-go/router/node.go +++ b/05-mixnets/Drastijk-router-go/router/node.go @@ -105,6 +105,7 @@ func (node *Node) Register(recvd SHA256, address string) error { relay, _ := TLVAppend(nil, 'A', next[:]) for _, p := range node.peers { + p.LoadIndex = 1 if p == nil || p.address == address { continue } @@ -117,7 +118,7 @@ var exposed_cnt int func (node *Node) ExposedKey(address string, recvd []byte) error { exposed_cnt += 1 - err := node.DB.Set('K', "peer" + strconv.Itoa(exposed_cnt), Hex(recvd)) + err := node.DB.Set('K', "peer"+strconv.Itoa(exposed_cnt), Hex(recvd)) if err != nil { return err } @@ -182,7 +183,7 @@ func (node *Node) Init() (err error) { if err != nil { return } - + // Commented. It doesn't work, but We don't need it yet. // { @@ -275,13 +276,13 @@ func (node *Node) RouteMessageTCP(msg Message) error { if err != nil { return err } - + fmt.Printf("\r\nReceived for: %s", Hex(msg.to[:])) fmt.Printf("\r\nACK index: %d", index) fmt.Printf("\r\nSender pubkey: %s", snd_key_hex) fmt.Printf("\r\nBody: %s\r\n", body) - node.SendUDP(keypair.PublicKey, "ACK_" + strconv.FormatUint(uint64(index), 10)) + node.SendUDP(keypair.PublicKey, "ACK_"+strconv.FormatUint(uint64(index), 10)) return nil } if peer != nil { @@ -332,7 +333,7 @@ func (node *Node) SendTCP(key sodium.BoxPublicKey, snd sodium.BoxPublicKey, txt peer, err := node.findFwdPeer(hash) if err == nil && peer != nil { bs := make([]byte, 4) - binary.LittleEndian.PutUint32(bs, ack_cnt) + binary.LittleEndian.PutUint32(bs, ack_cnt) msg, _ := TLVAppend2(nil, 'R', hash[:], bytes.Join([][]byte{bs, snd.Bytes, []byte(txt)}, []byte(""))) peer.Queue(msg) fmt.Fprintf(os.Stderr, "'R' message sent to %s\r\n", hexize(key.Bytes)) diff --git a/05-mixnets/Drastijk-router-go/router/peer.go b/05-mixnets/Drastijk-router-go/router/peer.go index c85948d4..97344c16 100644 --- a/05-mixnets/Drastijk-router-go/router/peer.go +++ b/05-mixnets/Drastijk-router-go/router/peer.go @@ -11,15 +11,24 @@ import ( ) type Peer struct { - address string - node *Node - announces map[SHA256]uint64 - conn net.Conn - outbox [][]byte - boxmx sync.Mutex + address string + node *Node + announces map[SHA256]uint64 + conn net.Conn + outbox [][]byte + boxmx sync.Mutex + LoadIndex float32 + SendDurations []time.Duration } -const RETRY = time.Minute +const ( + RETRY = time.Minute + MTU = 256 + LoadedNetworkThreshold = 1024 + FreeNetworkThreshold = 512 + LoadIncreaseStep = 1.2 + LoadDecreaseStep = 0.8 +) func (node *Node) Connect(addr string, conn net.Conn) { con_backoff := time.Millisecond @@ -116,7 +125,7 @@ func (peer *Peer) Read() (err error) { copy(to[:], body[:32]) msg := Message{to: to, body: body[32:]} fmt.Printf("\r\nMessaged by %s: %s\r\n", peer.address, body) - err = peer.node.RouteMessageTCP(msg) + err = peer.node.RouteMessageTCP(msg) case 'P': fmt.Printf("Pinged by %s: %s\n\r", peer.address, body) pong, _ := TLVAppend2(nil, 'O', []byte("Re: "), body) @@ -169,18 +178,59 @@ func (peer *Peer) doWrite() { time.Sleep(time.Millisecond) // TODO backoff continue } - n, err := conn.Write(buf) + + err := peer.WriteByChunks(buf) + peer.UpdateWorkload() + + if err != nil { + fmt.Printf("failed to send bytes by chunks: %v", err) + break + } + //fmt.Fprintf(os.Stderr, "sent %d bytes\n", n) if err != nil { peer.conn = nil _, _ = fmt.Fprint(os.Stderr, err.Error()) break } - buf = buf[n:] + buf = make([]byte, 0) conn = peer.conn } } +func (peer *Peer) WriteByChunks(buf []byte) error { + // split to chunks + for i := 0; i < len(buf); i += MTU { + start := time.Now() + _, err := peer.conn.Write(buf[i : i+MTU]) + if err != nil { + return err + } + + // save delivery time for controlling system load + peer.SendDurations = append(peer.SendDurations, time.Since(start)) + } + return nil +} + +func (peer *Peer) AverageLoad() time.Duration { + total := time.Duration(0) + for _, d := range peer.SendDurations { + total += d + } + return total / time.Duration(len(peer.SendDurations)) +} + +func (peer *Peer) UpdateWorkload() { + currentLoad := peer.AverageLoad() + if currentLoad >= LoadedNetworkThreshold { + peer.LoadIndex *= LoadIncreaseStep + } else if currentLoad < FreeNetworkThreshold { + peer.LoadIndex *= LoadDecreaseStep + } + fmt.Printf("current workload: %v", peer.LoadIndex) +} + var NoSuchPeer = errors.New("no such peer") func (node *Node) Ping(addr string, msgtxt string) error {