Skip to content
This repository was archived by the owner on Apr 7, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.io/golang:1.17 as builder
FROM docker.io/golang:1.21 as builder

WORKDIR /app

Expand Down
47 changes: 24 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
module github.com/c16a/hermes

go 1.17
go 1.19

require (
github.com/dgraph-io/badger/v2 v2.2007.4
github.com/eclipse/paho.golang v0.10.0
github.com/go-ldap/ldap/v3 v3.4.1
github.com/go-redis/redis/v8 v8.11.3
github.com/gorilla/websocket v1.4.2
github.com/eclipse/paho.golang v0.12.0
github.com/go-ldap/ldap/v3 v3.4.6
github.com/go-redis/redis/v8 v8.11.5
github.com/gorilla/websocket v1.5.0
github.com/satori/go.uuid v1.2.0
go.uber.org/zap v1.19.1
go.uber.org/zap v1.26.0
)

require (
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de // indirect
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/klauspost/compress v1.12.3 // indirect
github.com/pkg/errors v0.8.1 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
google.golang.org/protobuf v1.26.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.5 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)
207 changes: 88 additions & 119 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lib/mqtt/conn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import "net"
func HandleMqttConnection(conn net.Conn, ctx *ServerContext) {
handler := &MqttHandler{base: ctx, logger: ctx.logger}

for true {
for {
handler.Handle(conn)
}
}
10 changes: 2 additions & 8 deletions lib/mqtt/mqtt_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package mqtt

import (
"errors"
"io"

"github.com/eclipse/paho.golang/packets"
"github.com/eclipse/paho.golang/paho"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
"io"
)

type MqttHandler struct {
Expand All @@ -30,21 +31,16 @@ func (handler *MqttHandler) Handle(readWriter io.ReadWriter) {
switch cPacket.Type {
case packets.CONNECT:
packetHandler = handleConnect
break
case packets.PUBLISH:
packetHandler = handlePublish
break
case packets.PUBREL:
packetHandler = handlePubRel
case packets.SUBSCRIBE:
packetHandler = handleSubscribe
break
case packets.UNSUBSCRIBE:
packetHandler = handleUnsubscribe
break
case packets.DISCONNECT:
packetHandler = handleDisconnect
break
case packets.PINGREQ:
packetHandler = handlePingRequest
default:
Expand All @@ -60,8 +56,6 @@ func (handler *MqttHandler) Handle(readWriter io.ReadWriter) {
zap.Uint16("packetID", cPacket.PacketID()),
zap.String("type", cPacket.PacketType()),
).Info("Writing packet")

return
}

func handleConnect(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) error {
Expand Down
25 changes: 8 additions & 17 deletions lib/mqtt/server_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package mqtt
import (
"errors"
"fmt"
"io"
"math/rand"
"sync"
"time"

"github.com/c16a/hermes/lib/auth"
"github.com/c16a/hermes/lib/config"
"github.com/c16a/hermes/lib/persistence"
"github.com/c16a/hermes/lib/utils"
"github.com/eclipse/paho.golang/packets"
"go.uber.org/zap"
"io"
"math/rand"
"sync"
"time"
)

// ServerContext stores the state of the cluster node
Expand Down Expand Up @@ -131,7 +132,7 @@ func (ctx *ServerContext) Publish(publish *packets.Publish) {
var shareNameClientMap = make(map[string][]*ConnectedClient, 0)
for _, client := range ctx.connectedClientsMap {
topicToTarget := publish.Topic
for topicFilter, _ := range client.Subscriptions {
for topicFilter := range client.Subscriptions {
matches, isShared, shareName := utils.TopicMatches(topicToTarget, topicFilter)
if matches {
if !isShared {
Expand Down Expand Up @@ -160,13 +161,6 @@ func (ctx *ServerContext) Publish(publish *packets.Publish) {
}

for _, clients := range shareNameClientMap {
onlineClients := make([]*ConnectedClient, 0)
for _, c := range clients {
if c.IsConnected {
onlineClients = append(onlineClients, c)
}
}

var client *ConnectedClient
if len(clients) == 1 {
client = clients[0]
Expand All @@ -188,8 +182,8 @@ func (ctx *ServerContext) Subscribe(conn io.Writer, subscribe *packets.Subscribe
var subAckBytes []byte
for _, client := range ctx.connectedClientsMap {
if conn == client.Connection {
for topic, options := range subscribe.Subscriptions {
client.Subscriptions[topic] = options
for _, options := range subscribe.Subscriptions {
client.Subscriptions[options.Topic] = options
var subAckByte byte

if options.QoS > ctx.config.Server.MaxQos {
Expand All @@ -198,13 +192,10 @@ func (ctx *ServerContext) Subscribe(conn io.Writer, subscribe *packets.Subscribe
switch options.QoS {
case 0:
subAckByte = packets.SubackGrantedQoS0
break
case 1:
subAckByte = packets.SubackGrantedQoS1
break
case 2:
subAckByte = packets.SubackGrantedQoS2
break
default:
subAckByte = packets.SubackUnspecifiederror
}
Expand Down
39 changes: 22 additions & 17 deletions lib/mqtt/server_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package mqtt

import (
"errors"
"github.com/c16a/hermes/lib/auth"
"github.com/c16a/hermes/lib/config"
"github.com/c16a/hermes/lib/persistence"
"github.com/eclipse/paho.golang/packets"
"go.uber.org/zap"
"io"
"io/ioutil"
"reflect"
"sync"
"testing"

"github.com/c16a/hermes/lib/auth"
"github.com/c16a/hermes/lib/config"
"github.com/c16a/hermes/lib/persistence"
"github.com/eclipse/paho.golang/packets"
"go.uber.org/zap"
)

func TestNewServerContext(t *testing.T) {
Expand Down Expand Up @@ -445,9 +446,10 @@ func TestServerContext_Subscribe(t *testing.T) {
args{
ioutil.Discard,
&packets.Subscribe{
Subscriptions: map[string]packets.SubOptions{
"foo": {
QoS: 0,
Subscriptions: []packets.SubOptions{
{
Topic: "foo",
QoS: 0,
},
},
},
Expand Down Expand Up @@ -478,9 +480,10 @@ func TestServerContext_Subscribe(t *testing.T) {
args{
ioutil.Discard,
&packets.Subscribe{
Subscriptions: map[string]packets.SubOptions{
"foo": {
QoS: 1,
Subscriptions: []packets.SubOptions{
{
Topic: "foo",
QoS: 1,
},
},
},
Expand Down Expand Up @@ -511,9 +514,10 @@ func TestServerContext_Subscribe(t *testing.T) {
args{
ioutil.Discard,
&packets.Subscribe{
Subscriptions: map[string]packets.SubOptions{
"foo": {
QoS: 2,
Subscriptions: []packets.SubOptions{
{
Topic: "foo",
QoS: 2,
},
},
},
Expand Down Expand Up @@ -544,9 +548,10 @@ func TestServerContext_Subscribe(t *testing.T) {
args{
ioutil.Discard,
&packets.Subscribe{
Subscriptions: map[string]packets.SubOptions{
"foo": {
QoS: 2,
Subscriptions: []packets.SubOptions{
{
Topic: "foo",
QoS: 2,
},
},
},
Expand Down