Compare commits

...

19 Commits

Author SHA1 Message Date
Mateusz Piątkowski
f9bf8fa6b1 byte calculation 2022-12-17 17:50:34 +01:00
Mateusz Piątkowski
d4ef97c0c2 del unused pieces 2022-12-17 17:02:58 +01:00
Robert Bendun
c23eae555c musique<->server integration 2022-12-16 16:34:52 +01:00
Robert Bendun
ffc65e0f06 Musique configuration file 2022-12-16 11:38:05 +01:00
Robert Bendun
40ef949dbe integrated new host recognition algorithm with Musique 2022-12-16 02:11:43 +01:00
Mateusz Piątkowski
c49f7ade65 Known hosts knowladge sharing algorithm; nicks and ports 2022-12-15 00:28:36 +01:00
Robert Bendun
67c688d772 cleanup of server directory structure 2022-12-14 17:49:14 +01:00
Robert Bendun
5409aac138 server needs to be before connection 2022-12-09 16:12:33 +01:00
Robert Bendun
6bbb296bb2 Musique-server integration 2022-12-09 16:05:25 +01:00
Mateusz Piątkowski
5e67bf9a2f fix 2022-12-09 15:48:51 +01:00
Mateusz Piątkowski
32765f9e2d fix 2022-12-09 15:34:55 +01:00
Mateusz Piątkowski
c4098de5b4 timesync 2022-12-09 15:04:41 +01:00
Robert Bendun
a56731085d cross platform build of server and Musique 2022-12-09 12:22:50 +01:00
Robert Bendun
3f0014bb2c fix timesync; concurrent scan 2022-12-06 10:20:10 +01:00
Mateusz Piątkowski
3f82212d27 scanner le fix hardcoded ips 2022-12-05 22:54:46 +01:00
Robert Bendun
df68d8fa9d Bodged Musique & server integration 2022-12-05 22:34:47 +01:00
Mateusz Piątkowski
14cddb8051 timesync wip 2022-12-05 21:50:30 +01:00
Mateusz Piątkowski
5eae7b3019 scanner fix, minor changes 2022-11-29 21:15:39 +01:00
Mateusz Piątkowski
80830ac363 Migrated from https://git.wmi.amu.edu.pl/s416496/go-http-server 2022-11-23 22:29:10 +01:00
29 changed files with 1074 additions and 16 deletions

View File

@ -5,4 +5,4 @@ RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt update && apt upgrade -y && apt install -y build-essential software-properties-common zip unzip
RUN add-apt-repository ppa:ubuntu-toolchain-r/test
RUN apt install -y gcc-11 g++-11 libasound2-dev
RUN apt install -y gcc-11 g++-11 libasound2-dev golang

View File

@ -48,3 +48,4 @@ doc/wprowadzenie.html: doc/wprowadzenie.md
$(shell mkdir -p $(subst musique/,bin/$(os)/,$(shell find musique/* -type d)))
$(shell mkdir -p $(subst musique/,bin/$(os)/debug/,$(shell find musique/* -type d)))
$(shell mkdir -p bin/$(os)/server/)

View File

@ -1,16 +1,17 @@
MAKEFLAGS="-j $(grep -c ^processor /proc/cpuinfo)"
CXXFLAGS:=$(CXXFLAGS) -std=c++20 -Wall -Wextra -Werror=switch -Werror=return-type -Werror=unused-result
CPPFLAGS:=$(CPPFLAGS) -Ilib/expected/ -I. -Ilib/bestline/ -Ilib/rtmidi/
LDFLAGS=-flto
LDLIBS= -lpthread
RELEASE_FLAGS=-O2
DEBUG_FLAGS=-O0 -ggdb -fsanitize=undefined -DDebug
ifeq ($(shell uname),Darwin)
os=macos
else
os=linux
endif
CXXFLAGS:=$(CXXFLAGS) -std=c++20 -Wall -Wextra -Werror=switch -Werror=return-type -Werror=unused-result
CPPFLAGS:=$(CPPFLAGS) -Ilib/expected/ -I. -Ilib/bestline/ -Ilib/rtmidi/ -Ibin/$(os)/server/
LDFLAGS=-flto
LDLIBS= -lpthread
RELEASE_FLAGS=-O2
DEBUG_FLAGS=-O0 -ggdb -fsanitize=undefined -DDebug

1
lib/midi Submodule

@ -0,0 +1 @@
Subproject commit f42b663f0d08fc629c7deb26cd32ee06fba76d83

101
musique/config.cc Normal file
View File

@ -0,0 +1,101 @@
#include <fstream>
#include <iterator>
#include <musique/config.hh>
#include <musique/errors.hh>
#include <musique/platform.hh>
#include <iostream>
#ifdef MUSIQUE_WINDOWS
#include <shlobj.h>
#include <knownfolders.h>
#include <cstring>
#endif
// FIXME Move trim to some header
extern void trim(std::string_view &s);
// FIXME Report errors when parsing fails
config::Sections config::from_file(std::string const& path)
{
Sections sections;
Key_Value *kv = &sections[""];
std::ifstream in(path);
for (std::string linebuf; std::getline(in, linebuf); ) {
std::string_view line = linebuf;
trim(line);
if (auto comment = line.find('#'); comment != std::string_view::npos) {
line = line.substr(0, comment);
}
if (line.starts_with('[') && line.ends_with(']')) {
kv = &sections[std::string(line.begin()+1, line.end()-1)];
continue;
}
if (auto split = line.find('='); split != std::string_view::npos) {
auto key = line.substr(0, split);
auto val = line.substr(split+1);
trim(key);
trim(val);
(*kv)[std::string(key)] = std::string(val);
}
}
return sections;
}
void config::to_file(std::string const& path, Sections const& sections)
{
std::ofstream out(path, std::ios_base::trunc | std::ios_base::out);
if (!out.is_open()) {
std::cout << "Failed to save configuration at " << path << std::endl;
return;
}
for (auto const& [section, kv] : sections) {
out << '[' << section << "]\n";
for (auto const& [key, val] : kv) {
out << key << " = " << val << '\n';
}
}
}
std::string config::location()
{
#ifdef MUSIQUE_LINUX
if (auto config_dir = getenv("XDG_CONFIG_HOME")) {
return config_dir + std::string("/musique.conf");
} else if (auto home_dir = getenv("HOME")) {
return std::string(home_dir) + "/.config/musique.conf";
} else {
unimplemented("Neither XDG_CONFIG_HOME nor HOME enviroment variable are defined");
}
#endif
#ifdef MUSIQUE_WINDOWS
wchar_t *resultPath = nullptr;
SHGetKnownFolderPath(FOLDERID_RoamingAppData, 0, nullptr, &resultPath);
auto const len = std::strlen((char*)resultPath);
if (!resultPath && len < 3) {
auto drive = getenv("HOMEDRIVE");
auto path = getenv("HOMEPATH");
ensure(drive && path, "Windows failed to provide HOMEDRIVE & HOMEPATH variables");
return std::string(drive) + std::string(path) + "\\Documents\\musique.conf";
}
return std::string((char*)resultPath, len) + "\\musique.conf";
#endif
#ifdef MUSIQUE_DARWIN
if (auto home_dir = getenv("HOME")) {
return std::string(home_dir) + "/Library/Application Support/musique.conf";
} else {
unimplemented("HOME enviroment variable is not defined");
}
#endif
unimplemented();
}

20
musique/config.hh Normal file
View File

@ -0,0 +1,20 @@
#ifndef MUSIQUE_CONFIG_HH
#define MUSIQUE_CONFIG_HH
#include <optional>
#include <string>
#include <unordered_map>
// Parses INI files
namespace config
{
using Key_Value = std::unordered_map<std::string, std::string>;
using Sections = std::unordered_map<std::string, Key_Value>;
Sections from_file(std::string const& path);
void to_file(std::string const& path, Sections const& sections);
std::string location();
}
#endif // MUSIQUE_CONFIG_HH

View File

@ -4,6 +4,8 @@
#include <musique/interpreter/interpreter.hh>
#include <musique/try.hh>
#include <server.h>
#include <random>
#include <memory>
#include <iostream>
@ -1109,6 +1111,25 @@ static Result<Value> builtin_call(Interpreter &i, std::vector<Value> args)
return callable(i, std::move(args));
}
static Result<Value> builtin_start(Interpreter &interpreter, std::span<Ast> args)
{
Value ret{};
auto begin = std::chrono::steady_clock::now();
ServerBeginProtocol();
for (auto const& ast : args) {
ret = Try(interpreter.eval((Ast)ast));
}
auto end = std::chrono::steady_clock::now();
std::cout << "Start took " << std::chrono::duration_cast<std::chrono::duration<float>>(end - begin) << "s" << std::endl;
return ret;
}
void Interpreter::register_builtin_functions()
{
auto &global = *Env::global;
@ -1152,6 +1173,7 @@ void Interpreter::register_builtin_functions()
global.force_define("shuffle", builtin_shuffle);
global.force_define("sim", builtin_sim);
global.force_define("sort", builtin_sort);
global.force_define("start", builtin_start);
global.force_define("try", builtin_try);
global.force_define("typeof", builtin_typeof);
global.force_define("uniq", builtin_uniq);

View File

@ -17,6 +17,9 @@
#include <musique/try.hh>
#include <musique/unicode.hh>
#include <musique/value/block.hh>
#include <musique/config.hh>
#include <server.h>
#ifdef _WIN32
extern "C" {
@ -35,6 +38,8 @@ static bool quiet_mode = false;
static bool ast_only_mode = false;
static bool enable_repl = false;
static unsigned repl_line_number = 1;
static std::string nick;
static int port;
#define Ignore(Call) do { auto const ignore_ ## __LINE__ = (Call); (void) ignore_ ## __LINE__; } while(0)
@ -101,11 +106,13 @@ void print_repl_help()
":!<command> - allows for execution of any shell command\n"
":clear - clears screen\n"
":load <file> - loads file into Musique session\n"
":discover - tries to discover new remotes\n"
":remotes - list all known remotes\n"
;
}
/// Trim spaces from left an right
static void trim(std::string_view &s)
void trim(std::string_view &s)
{
// left trim
if (auto const i = std::find_if_not(s.begin(), s.end(), unicode::is_space); i != s.begin()) {
@ -171,6 +178,11 @@ struct Runner
ensure(the == nullptr, "Only one instance of runner is supported");
the = this;
GoInt goport = port;
GoString gonick { .p = nick.data(), .n = ptrdiff_t(nick.size()) };
ServerInit(gonick, goport);
interpreter.midi_connection = &midi;
if (output_port) {
midi.connect_output(*output_port);
@ -197,6 +209,16 @@ struct Runner
std::cout << std::endl;
return {};
});
Env::global->force_define("timeout", +[](Interpreter&, std::vector<Value> args) -> Result<Value> {
if (auto a = match<Number>(args); a) {
auto [timeout] = *a;
auto gotimeout = GoInt64((timeout * Number(1000)).floor().as_int());
SetTimeout(gotimeout);
return Value{};
}
unimplemented();
});
}
Runner(Runner const&) = delete;
@ -330,6 +352,21 @@ static Result<bool> handle_repl_session_commands(std::string_view input, Runner
return std::nullopt;
}
},
Command {
"remotes",
+[](Runner&, std::optional<std::string_view>) -> std::optional<Error> {
ListKnownRemotes();
return std::nullopt;
}
},
Command {
"discover",
+[](Runner&, std::optional<std::string_view>) -> std::optional<Error> {
Discover();
ListKnownRemotes();
return std::nullopt;
}
},
};
if (input.starts_with('!')) {
@ -441,6 +478,37 @@ static std::optional<Error> Main(std::span<char const*> args)
std::exit(1);
}
bool save_new_config = false;
// TODO Write configuration
// TODO Nicer configuration interface (maybe paths?)
auto config = config::from_file(config::location());
if (config.contains("net") && config["net"].contains("nick")) {
nick = config["net"]["nick"];
} else {
std::cout << "Please enter your nick: ";
std::getline(std::cin, nick);
auto nick_view = std::string_view(nick);
trim(nick_view); // FIXME Trim in place
nick = nick_view;
config["net"]["nick"] = nick;
save_new_config = true;
}
if (config.contains("net") && config["net"].contains("port")) {
auto const& port_str = config["net"]["port"];
// FIXME Handle port number parsing errors
std::from_chars(port_str.data(), port_str.data() + port_str.size(), port);
} else {
port = 8081;
config["net"]["port"] = std::to_string(port);
save_new_config = true;
}
if (save_new_config) {
config::to_file(config::location(), config);
}
Runner runner{output_port};
for (auto const& [type, argument] : runnables) {

22
musique/platform.hh Normal file
View File

@ -0,0 +1,22 @@
#ifndef MUSIQUE_PLATFORM_HH
#define MUSIQUE_PLATFORM_HH
enum class Platform
{
Darwin,
Linux,
Windows,
};
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__)
static constexpr Platform Current_Platform = Platform::Windows;
#define MUSIQUE_WINDOWS
#elif __APPLE__
static constexpr Platform Current_Platform = Platform::Darwin;
#define MUSIQUE_DARWIN
#else
static constexpr Platform Current_Platform = Platform::Linux;
#define MUSIQUE_LINUX
#endif
#endif // MUSIQUE_PLATFORM_HH

View File

@ -1,16 +1,22 @@
Release_Obj=$(addprefix bin/$(os)/,$(Obj))
Server=bin/$(os)/server/server.h bin/$(os)/server/server.o
$(Server) &: server/*.go server/**/*.go
cd server/; GOOS="$(GOOS)" GOARCH="$(GOARCH)" CGO_ENABLED=1 CC="$(CC)" \
go build -o ../bin/$(os)/server/server.o -buildmode=c-archive
bin/$(os)/bestline.o: lib/bestline/bestline.c lib/bestline/bestline.h
@echo "CC $@"
@$(CC) $< -c -O3 -o $@
bin/$(os)/%.o: musique/%.cc
bin/$(os)/%.o: musique/%.cc $(Server)
@echo "CXX $@"
@$(CXX) $(CXXFLAGS) $(RELEASE_FLAGS) $(CPPFLAGS) -o $@ $< -c
bin/$(os)/$(Target): $(Release_Obj) bin/$(os)/main.o bin/$(os)/rtmidi.o $(Bestline)
bin/$(os)/$(Target): $(Release_Obj) bin/$(os)/main.o bin/$(os)/rtmidi.o $(Bestline) $(Server)
@echo "CXX $@"
@$(CXX) $(CXXFLAGS) $(RELEASE_FLAGS) $(CPPFLAGS) -o $@ $(Release_Obj) bin/$(os)/rtmidi.o $(Bestline) $(LDFLAGS) $(LDLIBS)
@$(CXX) $(CXXFLAGS) $(RELEASE_FLAGS) $(CPPFLAGS) -o $@ $(Release_Obj) bin/$(os)/rtmidi.o $(Bestline) $(LDFLAGS) $(LDLIBS) bin/$(os)/server/server.o
Debug_Obj=$(addprefix bin/$(os)/debug/,$(Obj))

View File

@ -4,3 +4,5 @@ CPPFLAGS:=$(CPPFLAGS) -D __LINUX_ALSA__
LDLIBS:=-lasound $(LDLIBS) -static-libgcc -static-libstdc++
Bestline=bin/$(os)/bestline.o
Target=musique
GOOS=linux
GOARCH=amd64

View File

@ -5,3 +5,5 @@ LDLIBS:=-framework CoreMIDI -framework CoreAudio -framework CoreFoundation $(LDL
Release_Obj=$(addprefix bin/,$(Obj))
Bestline=bin/$(os)/bestline.o
Target=musique
GOOS=darwin
GOARCH=amd64

View File

@ -19,7 +19,9 @@ mkdir -p "$Target"
if [[ "$(docker images -q "$Image")" == "" ]]; then
docker build -t "$Image" .
fi
sudo rm -rf bin/
# make os=linux CC=gcc CXX=g++ >/dev/null
docker run -it --rm -v "$(pwd):/musique" -w /musique "$Image" make os=linux CC=gcc-11 CXX=g++-11 >/dev/null
cp bin/musique "$Target"/musique-x86_64-linux

View File

@ -1,5 +1,7 @@
CC=i686-w64-mingw32-gcc
CXX=i686-w64-mingw32-g++
CC=x86_64-w64-mingw32-cc
CXX=x86_64-w64-mingw32-c++
CPPFLAGS:=$(CPPFLAGS) -D__WINDOWS_MM__
LDLIBS:=-lwinmm $(LDLIBS) -static-libgcc -static-libstdc++ -static
LDLIBS:=-lwinmm -luuid $(LDLIBS) -static-libgcc -static-libstdc++ -static
Target=musique.exe
GOOS=windows
GOARCH=amd64

2
server/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
test*.sh
server

35
server/README.md Normal file
View File

@ -0,0 +1,35 @@
# Server
## Development notes
### Testing server list synchronisation (2022-12-14)
For ease of testing you can launch N instances of server in N tmux panes.
```console
$ while true; do sleep {n}; echo "======="; ./server -nick {nick} -port {port}; done
```
where `n` is increasing for each server to ensure that initial scan would not cover entire task of network scanning.
Next you can use this script to test:
```bash
go build
killall server
sleep 4
# Repeat line below for all N servers
echo '{"version":1, "type":"hosts"}' | nc localhost {port} | jq
# Choose one or few that will request synchronization with their remotes
echo '{"version":1, "type":"synchronize-hosts-with-remotes"}' | nc localhost {port} | jq
# Ensure that all synchronisation propagated with enough sleep time
sleep 2
# Repeat line below for all N servers
echo '{"version":1, "type":"hosts"}' | nc localhost {port} | jq
```

3
server/go.mod Normal file
View File

@ -0,0 +1,3 @@
module musique/server
go 1.19

23
server/go.sum Normal file
View File

@ -0,0 +1,23 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw=
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/schollz/progressbar v1.0.0 h1:gbyFReLHDkZo8mxy/dLWMr+Mpb1MokGJ1FqCiqacjZM=
github.com/schollz/progressbar v1.0.0/go.mod h1:/l9I7PC3L3erOuz54ghIRKUEFcosiWfLvJv+Eq26UMs=
github.com/schollz/progressbar/v3 v3.12.2 h1:yLqqqpQNMxGxHY8uEshRihaHWwa0rf0yb7/Zrpgq2C0=
github.com/schollz/progressbar/v3 v3.12.2/go.mod h1:HFJYIYQQJX32UJdyoigUl19xoV6aMwZt6iX/C30RWfg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.3.0 h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=

40
server/handlers.go Normal file
View File

@ -0,0 +1,40 @@
package main
import (
"fmt"
"net/http"
)
func cmdHandler(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/cmd" {
http.Error(w, "404 not found.", http.StatusNotFound)
return
}
if r.Method != "POST" {
http.Error(w, "Method is not supported.", http.StatusNotFound)
return
}
if err := r.ParseForm(); err != nil {
fmt.Fprintf(w, "ParseForm() err: %v", err)
return
}
fmt.Fprintf(w, "POST request successful\n")
cmd := r.FormValue("cmd")
fmt.Fprintf(w, "Command = %s\n", cmd)
}
func helloHandler(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/hello" {
http.Error(w, "404 not found.", http.StatusNotFound)
return
}
if r.Method != "GET" {
http.Error(w, "Method is not supported.", http.StatusNotFound)
return
}
fmt.Fprintf(w, "Hello!")
}

336
server/main.go Normal file
View File

@ -0,0 +1,336 @@
package main
import (
"errors"
"flag"
"fmt"
"log"
"musique/server/proto"
"musique/server/router"
"musique/server/scan"
"net"
"os"
"strings"
"sync"
"time"
)
func scanError(scanResult []string, conn net.Conn) {
if scanResult == nil {
conn.Write([]byte("Empty scan result, run 'scan' first.\n"))
}
}
type TimeExchange struct {
before, after, remote int64
}
func (e *TimeExchange) estimateFor(host string) bool {
e.before = time.Now().UnixMilli()
var response proto.TimeResponse
if err := proto.Command(host, proto.Time(), &response); err != nil {
log.Printf("failed to send time command: %v\n", err)
return false
}
e.after = time.Now().UnixMilli()
return true
}
func timesync() {
wg := sync.WaitGroup{}
wg.Add(len(remotes))
type response struct {
TimeExchange
key string
}
// Gather time from each host
responses := make(chan response, len(remotes))
for key, remote := range remotes {
key, remote := key, remote
go func() {
defer wg.Done()
exchange := TimeExchange{}
if exchange.estimateFor(remote.Address) {
responses <- response{exchange, key}
}
}()
}
wg.Wait()
close(responses)
for response := range responses {
remote := remotes[response.key]
remote.TimeExchange = response.TimeExchange
}
}
var maxReactionTime = int64(1_000)
func isThisMyAddress(address string) bool {
addrs, err := net.InterfaceAddrs()
if err != nil {
return false
}
for _, addr := range addrs {
ip, _, err := net.ParseCIDR(addr.String())
if err == nil && ip.To4() != nil && fmt.Sprintf("%s:%d", ip, port) == address {
return true
}
}
return false
}
func notifyAll() <-chan time.Time {
wg := sync.WaitGroup{}
wg.Add(len(remotes))
startDeadline := time.After(time.Duration(maxReactionTime) * time.Millisecond)
for _, remote := range remotes {
remote := remote
go func() {
startTime := maxReactionTime - (remote.after-remote.before)/2
var response proto.StartResponse
err := proto.CommandTimeout(
remote.Address,
proto.Start(startTime),
&response,
time.Duration(startTime)*time.Millisecond,
)
if err != nil {
log.Printf("failed to notify %s: %v\n", remote.Address, err)
}
wg.Done()
}()
}
return startDeadline
}
var (
pinger chan struct{}
)
func registerRoutes(r *router.Router) {
r.Add("handshake", func(incoming net.Conn, request proto.Request) interface{} {
var response proto.HandshakeResponse
response.Version = proto.Version
response.Nick = nick
return response
})
r.Add("hosts", func(incoming net.Conn, request proto.Request) interface{} {
var response proto.HostsResponse
for _, remote := range remotes {
response.Hosts = append(response.Hosts, proto.HostsResponseEntry{
Nick: remote.Nick,
Version: remote.Version,
Address: remote.Address,
})
}
return response
})
r.Add("synchronize-hosts", func(incoming net.Conn, request proto.Request) interface{} {
return synchronizeHosts(request.HostsResponse)
})
r.Add("synchronize-hosts-with-remotes", func(incoming net.Conn, request proto.Request) interface{} {
synchronizeHostsWithRemotes()
return nil
})
r.Add("time", func(incoming net.Conn, request proto.Request) interface{} {
return proto.TimeResponse{
Time: time.Now().UnixMilli(),
}
})
pinger = make(chan struct{}, 12)
r.Add("start", func(incoming net.Conn, request proto.Request) interface{} {
go func() {
time.Sleep(time.Duration(request.StartTime) * time.Millisecond)
pinger <- struct{}{}
}()
return proto.StartResponse{
Succeded: true,
}
})
}
type Remote struct {
TimeExchange
Address string
Nick string
Version string
}
var (
baseIP string = ""
nick string
port int = 8888
remotes map[string]*Remote
)
func synchronizeHosts(incoming proto.HostsResponse) (response proto.HostsResponse) {
visitedHosts := make(map[string]struct{})
// Add all hosts that are in incoming to our list of remotes
// Additionaly build set of all hosts that remote knows
for _, incomingHost := range incoming.Hosts {
if _, ok := remotes[incomingHost.Address]; !ok && !isThisMyAddress(incomingHost.Address) {
remotes[incomingHost.Address] = &Remote{
Address: incomingHost.Address,
Nick: incomingHost.Nick,
Version: incomingHost.Version,
}
}
visitedHosts[incomingHost.Address] = struct{}{}
}
// Build list of hosts that incoming doesn't know
for _, remote := range remotes {
if _, ok := visitedHosts[remote.Address]; !ok {
response.Hosts = append(response.Hosts, proto.HostsResponseEntry{
Address: remote.Address,
Version: remote.Version,
Nick: remote.Nick,
})
}
}
return
}
func myAddressInTheSameNetwork(remote string) (string, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", fmt.Errorf("myAddressInTheSameNetwork: %v", err)
}
remoteInParts := strings.Split(remote, ":")
if len(remoteInParts) == 2 {
remote = remoteInParts[0]
}
remoteIP := net.ParseIP(remote)
if remoteIP == nil {
// TODO Hoist error to global variable
return "", errors.New("Cannot parse remote IP")
}
for _, addr := range addrs {
ip, ipNet, err := net.ParseCIDR(addr.String())
if err == nil && ipNet.Contains(remoteIP) {
return ip.String(), nil
}
}
// TODO Hoist error to global variable
return "", errors.New("Cannot find matching IP addr")
}
func synchronizeHostsWithRemotes() {
previousResponseLength := -1
var response proto.HostsResponse
for previousResponseLength != len(response.Hosts) {
response = proto.HostsResponse{}
// Add all known remotes
for _, remote := range remotes {
response.Hosts = append(response.Hosts, proto.HostsResponseEntry{
Address: remote.Address,
Nick: remote.Nick,
Version: remote.Version,
})
}
// Send constructed list to each remote
previousResponseLength = len(response.Hosts)
for _, remote := range response.Hosts {
var localResponse proto.HostsResponse
localResponse.Hosts = make([]proto.HostsResponseEntry, len(response.Hosts))
copy(localResponse.Hosts, response.Hosts)
myAddress, err := myAddressInTheSameNetwork(remote.Address)
// TODO Report when err != nil
if err == nil {
localResponse.Hosts = append(localResponse.Hosts, proto.HostsResponseEntry{
Address: myAddress,
Nick: nick,
Version: proto.Version,
})
}
var remoteResponse proto.HostsResponse
proto.Command(remote.Address, proto.SynchronizeHosts(localResponse), &remoteResponse)
synchronizeHosts(remoteResponse)
}
}
}
func registerRemotes() error {
networks, err := scan.AvailableNetworks()
if err != nil {
return err
}
hosts := scan.TCPHosts(networks, []uint16{8081, 8082, 8083, 8084})
remotes = make(map[string]*Remote)
for host := range hosts {
if !isThisMyAddress(host.Address) {
remotes[host.Address] = &Remote{
Address: host.Address,
Nick: host.Nick,
Version: host.Version,
}
}
}
return nil
}
func main() {
var (
logsPath string
)
flag.StringVar(&baseIP, "ip", "", "IP where server will listen")
flag.StringVar(&nick, "nick", "", "Name that is going to be used to recognize this server")
flag.IntVar(&port, "port", 8081, "TCP port where server receives connections")
flag.StringVar(&logsPath, "logs", "", "Target file for logs from server. By default stdout")
flag.Parse()
if len(logsPath) != 0 {
// TODO Is defer logFile.Close() needed here? Dunno
logFile, err := os.OpenFile(logsPath, os.O_WRONLY|os.O_APPEND, 0o640)
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot open log file: %v\n", err)
os.Exit(1)
}
log.SetOutput(logFile)
}
if len(nick) == 0 {
log.Fatalln("Please provide nick via --nick flag")
}
r := router.Router{}
registerRoutes(&r)
exit, err := r.Run(baseIP, uint16(port))
if err != nil {
log.Fatalln(err)
}
if err := registerRemotes(); err != nil {
log.Fatalln(err)
}
for range exit {
}
}

83
server/musique-bridge.go Normal file
View File

@ -0,0 +1,83 @@
package main
import (
"C"
"fmt"
"log"
"musique/server/router"
"os"
"sort"
)
//export ServerInit
func ServerInit(inputNick string, inputPort int) {
logFile, err := os.OpenFile("musique.log", os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o640)
if err != nil {
log.Fatalln(err)
}
log.SetOutput(logFile)
nick = inputNick
port = inputPort
r := router.Router{}
registerRoutes(&r)
_, err = r.Run(baseIP, uint16(port))
if err != nil {
log.Fatalln(err)
}
if err := registerRemotes(); err != nil {
log.Fatalln(err)
}
timesync()
}
//export SetTimeout
func SetTimeout(timeout int64) {
maxReactionTime = timeout
}
//export ServerBeginProtocol
func ServerBeginProtocol() {
self := notifyAll()
select {
case <-self:
case <-pinger:
}
}
//export Discover
func Discover() {
if len(remotes) == 0 {
if err := registerRemotes(); err != nil {
log.Println("discover:", err)
}
}
synchronizeHostsWithRemotes()
}
//export ListKnownRemotes
func ListKnownRemotes() {
type nickAddr struct {
nick, addr string
}
list := []nickAddr{}
for _, remote := range remotes {
list = append(list, nickAddr{remote.Nick, remote.Address})
}
sort.Slice(list, func(i, j int) bool {
if list[i].nick == list[j].nick {
return list[i].addr < list[j].addr
}
return list[i].nick < list[j].nick
})
for _, nickAddr := range list {
fmt.Printf("%s@%s\n", nickAddr.nick, nickAddr.addr)
}
os.Stdout.Sync()
}

10
server/proto/basic.go Normal file
View File

@ -0,0 +1,10 @@
package proto
const Version = "1"
type Request struct {
Version string
Type string
HostsResponse
StartTime int64
}

12
server/proto/handshake.go Normal file
View File

@ -0,0 +1,12 @@
package proto
type HandshakeResponse struct {
Nick string
Version string
}
func Handshake() (req Request) {
req.Type = "handshake"
req.Version = Version
return
}

24
server/proto/hosts.go Normal file
View File

@ -0,0 +1,24 @@
package proto
type HostsResponseEntry struct {
Nick string
Address string
Version string
}
type HostsResponse struct {
Hosts []HostsResponseEntry
}
func Hosts() (req Request) {
req.Version = Version
req.Type = "hosts"
return
}
func SynchronizeHosts(response HostsResponse) (req Request) {
req.HostsResponse = response
req.Version = Version
req.Type = "synchronize-hosts"
return
}

52
server/proto/net.go Normal file
View File

@ -0,0 +1,52 @@
package proto
import (
"encoding/json"
"errors"
"net"
"time"
)
func Command(target string, request interface{}, response interface{}) error {
conn, err := net.Dial("tcp", target)
if err != nil {
return err
}
defer conn.Close()
if err = json.NewEncoder(conn).Encode(request); err != nil {
return err
}
return json.NewDecoder(conn).Decode(response)
}
func CommandTimeout(target string, request interface{}, response interface{}, timeout time.Duration) error {
responseChan := make(chan interface{})
errorChan := make(chan error)
go func() {
conn, err := net.DialTimeout("tcp", target, timeout)
if err != nil {
errorChan <- err
return
}
defer conn.Close()
if err = json.NewEncoder(conn).Encode(request); err != nil {
errorChan <- err
return
}
responseChan <- json.NewDecoder(conn).Decode(response)
}()
select {
case response = <-responseChan:
return nil
case err := <-errorChan:
return err
case <-time.After(timeout):
return errors.New("timout")
}
}

12
server/proto/start.go Normal file
View File

@ -0,0 +1,12 @@
package proto
type StartResponse struct {
Succeded bool
}
func Start(startTime int64) (req Request) {
req.Type = "start"
req.Version = Version
req.StartTime = startTime
return
}

11
server/proto/time.go Normal file
View File

@ -0,0 +1,11 @@
package proto
type TimeResponse struct {
Time int64
}
func Time() (req Request) {
req.Type = "time"
req.Version = Version
return
}

67
server/router/router.go Normal file
View File

@ -0,0 +1,67 @@
package router
import (
"encoding/json"
"musique/server/proto"
"net"
"log"
"fmt"
)
type Route func(incoming net.Conn, request proto.Request) interface{}
type Router struct {
routes map[string]Route
port uint16
baseIP string
}
func (router *Router) Add(name string, route Route) {
if router.routes == nil {
router.routes = make(map[string]Route)
}
router.routes[name] = route
}
func (router *Router) Run(ip string, port uint16) (<-chan struct{}, error) {
router.port = port
router.baseIP = ip
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", router.baseIP, router.port))
if err != nil {
return nil, err
}
exit := make(chan struct{})
go func() {
defer listener.Close()
defer close(exit)
for {
incoming, err := listener.Accept()
if err != nil {
log.Println("failed to accept connection:", err)
}
go router.handleIncoming(incoming)
}
}()
return exit, nil
}
func (router *Router) handleIncoming(incoming net.Conn) {
defer incoming.Close()
request := proto.Request{}
json.NewDecoder(incoming).Decode(&request)
log.Printf("%s: %+v\n", incoming.RemoteAddr(), request)
if handler, ok := router.routes[request.Type]; ok {
if response := handler(incoming, request); response != nil {
json.NewEncoder(incoming).Encode(response)
}
} else {
log.Printf("unrecongized route: %v\n", request.Type)
}
}

98
server/scan/scanner.go Normal file
View File

@ -0,0 +1,98 @@
package scan
import (
"fmt"
"log"
"musique/server/proto"
"net"
"sync"
"time"
)
type Network struct {
FirstAddress string
MaxHostsCount int
}
const timeoutTCPHosts = time.Duration(1) * time.Second
func nextIP(ip net.IP) (next net.IP) {
// FIXME Proper next IP address in network calculation
ip = ip.To4()
o := (uint(ip[0]) << 24) + (uint(ip[1]) << 16) + (uint(ip[2]) << 8) + uint(ip[3])
o++
o3 := byte(o & 0xFF)
o2 := byte((o >> 8) & 0xFF)
o1 := byte((o >> 16) & 0xFF)
o0 := byte((o >> 24) & 0xFF)
return net.IPv4(o0, o1, o2, o3)
}
// AvailableNetworks returns all IPv4 networks that are available to the host
func AvailableNetworks() ([]Network, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, fmt.Errorf("getting interfaces info: %v", err)
}
networks := []Network{}
for _, addr := range addrs {
_, ipNet, err := net.ParseCIDR(addr.String())
if err != nil {
continue
}
if ipNet.IP.IsGlobalUnicast() {
if ip := ipNet.IP.To4(); ip != nil {
// FIXME We assume mask /24. This is a reasonable assumption performance wise
// but may lead to inability to recognize some of the host in network
networks = append(networks, Network{nextIP(ipNet.IP).String(), 253})
}
}
}
return networks, nil
}
type Response struct {
proto.HandshakeResponse
Address string
}
// TCPHosts returns all TCP hosts that are in given networks on one of given ports
func TCPHosts(networks []Network, ports []uint16) <-chan Response {
ips := make(chan Response, 256)
log.Printf("tcphosts: %+v\n", networks)
wg := sync.WaitGroup{}
for _, network := range networks {
ip := net.ParseIP(network.FirstAddress)
for i := 0; i < network.MaxHostsCount; i++ {
for _, port := range ports {
wg.Add(1)
go func(ip net.IP, port uint16) {
defer wg.Done()
target := fmt.Sprintf("%s:%d", ip, port)
var hs proto.HandshakeResponse
err := proto.CommandTimeout(target, proto.Handshake(), &hs, timeoutTCPHosts)
if err == nil {
ips <- Response{hs, target}
}
}(ip, port)
}
ip = nextIP(ip)
}
}
go func() {
wg.Wait()
close(ips)
}()
return ips
}