integrated new host recognition algorithm with Musique

This commit is contained in:
Robert Bendun 2022-12-16 02:11:43 +01:00
parent c49f7ade65
commit 40ef949dbe
6 changed files with 127 additions and 138 deletions

View File

@ -334,6 +334,13 @@ static Result<bool> handle_repl_session_commands(std::string_view input, Runner
return std::nullopt; return std::nullopt;
} }
}, },
Command {
"remotes",
+[](Runner&, std::optional<std::string_view>) -> std::optional<Error> {
ListKnownRemotes();
return std::nullopt;
}
}
}; };
if (input.starts_with('!')) { if (input.starts_with('!')) {

View File

@ -2,7 +2,7 @@ Release_Obj=$(addprefix bin/$(os)/,$(Obj))
Server=bin/$(os)/server/server.h bin/$(os)/server/server.o Server=bin/$(os)/server/server.h bin/$(os)/server/server.o
$(Server) &: server/*.go $(Server) &: server/*.go server/**/*.go
cd server/; GOOS="$(GOOS)" GOARCH="$(GOARCH)" CGO_ENABLED=1 CC="$(CC)" \ cd server/; GOOS="$(GOOS)" GOARCH="$(GOARCH)" CGO_ENABLED=1 CC="$(CC)" \
go build -o ../bin/$(os)/server/server.o -buildmode=c-archive go build -o ../bin/$(os)/server/server.o -buildmode=c-archive

View File

@ -2,13 +2,13 @@ package main
import ( import (
"bufio" "bufio"
"encoding/json"
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"log" "log"
"musique/server/proto" "musique/server/proto"
"musique/server/scan" "musique/server/scan"
"musique/server/router"
"net" "net"
"os" "os"
"strings" "strings"
@ -136,22 +136,15 @@ func notifyAll(clients []client) <-chan time.Time {
return startDeadline return startDeadline
} }
func handleIncoming(incoming net.Conn) { func registerRoutes(r *router.Router) {
defer incoming.Close() r.Add("handshake", func(incoming net.Conn, request proto.Request) interface{} {
request := proto.Request{}
json.NewDecoder(incoming).Decode(&request)
log.Printf("%s: %+v\n", incoming.RemoteAddr(), request)
if request.Type == "handshake" {
var response proto.HandshakeResponse var response proto.HandshakeResponse
response.Version = proto.Version response.Version = proto.Version
response.Nick = nick response.Nick = nick
json.NewEncoder(incoming).Encode(response) return response
return })
}
if request.Type == "hosts" { r.Add("hosts", func(incoming net.Conn, request proto.Request) interface{} {
var response proto.HostsResponse var response proto.HostsResponse
for _, remote := range remotes { for _, remote := range remotes {
response.Hosts = append(response.Hosts, proto.HostsResponseEntry{ response.Hosts = append(response.Hosts, proto.HostsResponseEntry{
@ -160,43 +153,18 @@ func handleIncoming(incoming net.Conn) {
Address: remote.Address, Address: remote.Address,
}) })
} }
json.NewEncoder(incoming).Encode(response) return response
return })
}
if request.Type == "synchronize-hosts" { r.Add("synchronize-hosts", func(incoming net.Conn, request proto.Request) interface{} {
response := synchronizeHosts(request.HostsResponse) return synchronizeHosts(request.HostsResponse)
json.NewEncoder(incoming).Encode(response) })
return
}
if request.Type == "synchronize-hosts-with-remotes" {
r.Add("synchronize-hosts-with-remotes", func(incoming net.Conn, request proto.Request) interface{} {
synchronizeHostsWithRemotes() synchronizeHostsWithRemotes()
return return nil
} })
}
func runCommandServer(port uint16) <-chan struct{} {
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", baseIP, port))
if err != nil {
log.Fatalln(err)
}
exit := make(chan struct{})
go func() {
defer listener.Close()
defer close(exit)
for {
incoming, err := listener.Accept()
if err != nil {
log.Fatal(err)
}
go handleIncoming(incoming)
}
}()
return exit
} }
type Remote struct { type Remote struct {
@ -206,9 +174,9 @@ type Remote struct {
} }
var ( var (
baseIP string baseIP string = ""
nick string nick string
port int port int = 8888
remotes map[string]Remote remotes map[string]Remote
) )
@ -309,6 +277,28 @@ func synchronizeHostsWithRemotes() {
} }
} }
func registerRemotes() error {
networks, err := scan.AvailableNetworks()
if err != nil {
return err
}
hosts := scan.TCPHosts(networks, []uint16{8081, 8082, 8083, 8084})
remotes = make(map[string]Remote)
for host := range hosts {
if !isThisMyAddress(host.Address) {
remotes[host.Address] = Remote{
Address: host.Address,
Nick: host.Nick,
Version: host.Version,
}
}
}
return nil
}
func main() { func main() {
var ( var (
logsPath string logsPath string
@ -334,24 +324,15 @@ func main() {
log.Fatalln("Please provide nick via --nick flag") log.Fatalln("Please provide nick via --nick flag")
} }
exit := runCommandServer(uint16(port)) r := router.Router{}
registerRoutes(&r)
networks, err := scan.AvailableNetworks() exit, err := r.Run(baseIP, uint16(port))
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
hosts := scan.TCPHosts(networks, []uint16{8081, 8082, 8083, 8084}) if err := registerRemotes(); err != nil {
log.Fatalln(err)
remotes = make(map[string]Remote)
for host := range hosts {
if !isThisMyAddress(host.Address) {
remotes[host.Address] = Remote{
Address: host.Address,
Nick: host.Nick,
Version: host.Version,
}
}
} }
for range exit { for range exit {

View File

@ -2,69 +2,51 @@ package main
import ( import (
"C" "C"
"bufio"
"fmt" "fmt"
"net" "log"
"strings" "musique/server/router"
"sync" "os"
"time" "sort"
) )
var clients []client
var pinger chan struct{}
//export ServerInit //export ServerInit
func ServerInit() { func ServerInit() {
// scanResult = scan() r := router.Router{}
pinger = make(chan struct{}, 100) registerRoutes(&r)
_, err := r.Run(baseIP, uint16(port))
waitForConnection := sync.WaitGroup{}
waitForConnection.Add(1)
go func() {
l, err := net.Listen("tcp", ":8081")
if err != nil { if err != nil {
return log.Fatalln(err)
}
defer l.Close()
waitForConnection.Done()
for {
conn, err := l.Accept()
if err != nil {
continue
}
go func(c net.Conn) {
defer c.Close()
s := bufio.NewScanner(c)
for s.Scan() {
response := s.Text()
if response == "time" {
fmt.Fprintln(conn, time.Now().UnixMilli())
continue
} }
if strings.HasPrefix(response, "start") { if err := registerRemotes(); err != nil {
startTimeString := strings.TrimSpace(response[len("start"):]) log.Fatalln(err)
startTime := int64(0)
fmt.Sscanf(startTimeString, "%d", &startTime)
time.Sleep(time.Duration(startTime) * time.Millisecond)
pinger <- struct{}{}
continue
} }
} }
}(conn)
}
}()
waitForConnection.Wait()
scanResult := []string{} // scan()
clients = timesync(scanResult)
}
//export ServerBeginProtocol //export ServerBeginProtocol
func ServerBeginProtocol() { func ServerBeginProtocol() {
self := notifyAll(clients)
select {
case <-self:
case <-pinger:
} }
//export ListKnownRemotes
func ListKnownRemotes() {
type nickAddr struct {
nick, addr string
}
list := []nickAddr{}
for _, remote := range remotes {
list = append(list, nickAddr { remote.Nick, remote.Address})
}
sort.Slice(list, func (i, j int) bool {
if list[i].nick == list[j].nick {
return list[i].addr < list[j].addr
}
return list[i].nick < list[j].nick
})
for _, nickAddr := range list {
fmt.Printf("%s@%s\n", nickAddr.nick, nickAddr.addr)
}
os.Stdout.Sync()
} }

View File

@ -2,6 +2,7 @@ package proto
import ( import (
"encoding/json" "encoding/json"
"errors"
"net" "net"
"time" "time"
) )
@ -21,15 +22,31 @@ func Command(target string, request interface{}, response interface{}) error {
} }
func CommandTimeout(target string, request interface{}, response interface{}, timeout time.Duration) error { func CommandTimeout(target string, request interface{}, response interface{}, timeout time.Duration) error {
responseChan := make(chan interface{})
errorChan := make(chan error)
go func() {
conn, err := net.DialTimeout("tcp", target, timeout) conn, err := net.DialTimeout("tcp", target, timeout)
if err != nil { if err != nil {
return err errorChan <- err
return
} }
defer conn.Close() defer conn.Close()
if err = json.NewEncoder(conn).Encode(request); err != nil { if err = json.NewEncoder(conn).Encode(request); err != nil {
return err errorChan <- err
return
} }
return json.NewDecoder(conn).Decode(response) responseChan <- json.NewDecoder(conn).Decode(response)
}()
select {
case response = <-responseChan:
return nil
case err := <-errorChan:
return err
case <-time.After(timeout):
return errors.New("timout")
}
} }

View File

@ -15,12 +15,13 @@ type Network struct {
const timeoutTCPHosts = time.Duration(1) * time.Second const timeoutTCPHosts = time.Duration(1) * time.Second
func nextIP(ip net.IP) net.IP { func nextIP(ip net.IP) (next net.IP) {
// FIXME Proper next IP address in network calculation // FIXME Proper next IP address in network calculation
next := make([]byte, 4) next = make([]byte, 4)
copy(next, ip) bytes := []byte(ip)
next[3]++ bytes = bytes[len(bytes)-4:]
return next next[0], next[1], next[2], next[3] = bytes[0], bytes[1], bytes[2], bytes[3]+1
return
} }
// AvailableNetworks returns all IPv4 networks that are available to the host // AvailableNetworks returns all IPv4 networks that are available to the host
@ -38,10 +39,12 @@ func AvailableNetworks() ([]Network, error) {
continue continue
} }
if ipNet.IP.IsGlobalUnicast() && ipNet.IP.To4() != nil { if ipNet.IP.IsGlobalUnicast() {
if ip := ipNet.IP.To4(); ip != nil {
// FIXME We assume mask /24. This is a reasonable assumption performance wise // FIXME We assume mask /24. This is a reasonable assumption performance wise
// but may lead to inability to recognize some of the host in network // but may lead to inability to recognize some of the host in network
networks = append(networks, Network{nextIP(ipNet.IP).String(), 254}) networks = append(networks, Network{nextIP(ipNet.IP).String(), 253})
}
} }
} }
@ -68,7 +71,6 @@ func TCPHosts(networks []Network, ports []uint16) <-chan Response {
defer wg.Done() defer wg.Done()
target := fmt.Sprintf("%s:%d", ip, port) target := fmt.Sprintf("%s:%d", ip, port)
var hs proto.HandshakeResponse var hs proto.HandshakeResponse
err := proto.CommandTimeout(target, proto.Handshake(), &hs, timeoutTCPHosts) err := proto.CommandTimeout(target, proto.Handshake(), &hs, timeoutTCPHosts)
if err == nil { if err == nil {
ips <- Response{hs, target} ips <- Response{hs, target}