Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions 05-mixnets/Drastijk-router-go/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
### MTU и Congestion threshold

В рамках задачи реализовно:
1. Механизм MTU
2. Интегрирован алгоритм контроля загрузки сети.
Алгоритм реализован по мотивам этого объяснения: https://www.eventhelix.com/congestion-control/algorithms/

Значения для работы алгоритма и MTU выставлены в `константах в peer.go`
11 changes: 6 additions & 5 deletions 05-mixnets/Drastijk-router-go/router/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (node *Node) Register(recvd SHA256, address string) error {

relay, _ := TLVAppend(nil, 'A', next[:])
for _, p := range node.peers {
p.LoadIndex = 1
if p == nil || p.address == address {
continue
}
Expand All @@ -117,7 +118,7 @@ var exposed_cnt int

func (node *Node) ExposedKey(address string, recvd []byte) error {
exposed_cnt += 1
err := node.DB.Set('K', "peer" + strconv.Itoa(exposed_cnt), Hex(recvd))
err := node.DB.Set('K', "peer"+strconv.Itoa(exposed_cnt), Hex(recvd))
if err != nil {
return err
}
Expand Down Expand Up @@ -182,7 +183,7 @@ func (node *Node) Init() (err error) {
if err != nil {
return
}

// Commented. It doesn't work, but We don't need it yet.

// {
Expand Down Expand Up @@ -275,13 +276,13 @@ func (node *Node) RouteMessageTCP(msg Message) error {
if err != nil {
return err
}

fmt.Printf("\r\nReceived for: %s", Hex(msg.to[:]))
fmt.Printf("\r\nACK index: %d", index)
fmt.Printf("\r\nSender pubkey: %s", snd_key_hex)
fmt.Printf("\r\nBody: %s\r\n", body)

node.SendUDP(keypair.PublicKey, "ACK_" + strconv.FormatUint(uint64(index), 10))
node.SendUDP(keypair.PublicKey, "ACK_"+strconv.FormatUint(uint64(index), 10))
return nil
}
if peer != nil {
Expand Down Expand Up @@ -332,7 +333,7 @@ func (node *Node) SendTCP(key sodium.BoxPublicKey, snd sodium.BoxPublicKey, txt
peer, err := node.findFwdPeer(hash)
if err == nil && peer != nil {
bs := make([]byte, 4)
binary.LittleEndian.PutUint32(bs, ack_cnt)
binary.LittleEndian.PutUint32(bs, ack_cnt)
msg, _ := TLVAppend2(nil, 'R', hash[:], bytes.Join([][]byte{bs, snd.Bytes, []byte(txt)}, []byte("")))
peer.Queue(msg)
fmt.Fprintf(os.Stderr, "'R' message sent to %s\r\n", hexize(key.Bytes))
Expand Down
70 changes: 60 additions & 10 deletions 05-mixnets/Drastijk-router-go/router/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,24 @@ import (
)

type Peer struct {
address string
node *Node
announces map[SHA256]uint64
conn net.Conn
outbox [][]byte
boxmx sync.Mutex
address string
node *Node
announces map[SHA256]uint64
conn net.Conn
outbox [][]byte
boxmx sync.Mutex
LoadIndex float32
SendDurations []time.Duration
}

const RETRY = time.Minute
const (
RETRY = time.Minute
MTU = 256
LoadedNetworkThreshold = 1024
FreeNetworkThreshold = 512
LoadIncreaseStep = 1.2
LoadDecreaseStep = 0.8
)

func (node *Node) Connect(addr string, conn net.Conn) {
con_backoff := time.Millisecond
Expand Down Expand Up @@ -116,7 +125,7 @@ func (peer *Peer) Read() (err error) {
copy(to[:], body[:32])
msg := Message{to: to, body: body[32:]}
fmt.Printf("\r\nMessaged by %s: %s\r\n", peer.address, body)
err = peer.node.RouteMessageTCP(msg)
err = peer.node.RouteMessageTCP(msg)
case 'P':
fmt.Printf("Pinged by %s: %s\n\r", peer.address, body)
pong, _ := TLVAppend2(nil, 'O', []byte("Re: "), body)
Expand Down Expand Up @@ -169,18 +178,59 @@ func (peer *Peer) doWrite() {
time.Sleep(time.Millisecond) // TODO backoff
continue
}
n, err := conn.Write(buf)

err := peer.WriteByChunks(buf)
peer.UpdateWorkload()

if err != nil {
fmt.Printf("failed to send bytes by chunks: %v", err)
break
}

//fmt.Fprintf(os.Stderr, "sent %d bytes\n", n)
if err != nil {
peer.conn = nil
_, _ = fmt.Fprint(os.Stderr, err.Error())
break
}
buf = buf[n:]
buf = make([]byte, 0)
conn = peer.conn
}
}

func (peer *Peer) WriteByChunks(buf []byte) error {
// split to chunks
for i := 0; i < len(buf); i += MTU {
start := time.Now()
_, err := peer.conn.Write(buf[i : i+MTU])
if err != nil {
return err
}

// save delivery time for controlling system load
peer.SendDurations = append(peer.SendDurations, time.Since(start))
}
return nil
}

func (peer *Peer) AverageLoad() time.Duration {
total := time.Duration(0)
for _, d := range peer.SendDurations {
total += d
}
return total / time.Duration(len(peer.SendDurations))
}

func (peer *Peer) UpdateWorkload() {
currentLoad := peer.AverageLoad()
if currentLoad >= LoadedNetworkThreshold {
peer.LoadIndex *= LoadIncreaseStep
} else if currentLoad < FreeNetworkThreshold {
peer.LoadIndex *= LoadDecreaseStep
}
fmt.Printf("current workload: %v", peer.LoadIndex)
}

var NoSuchPeer = errors.New("no such peer")

func (node *Node) Ping(addr string, msgtxt string) error {
Expand Down