summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/main.go187
-rw-r--r--server/utils.go6
2 files changed, 127 insertions, 66 deletions
diff --git a/server/main.go b/server/main.go
index e7f47fc..0a785bb 100644
--- a/server/main.go
+++ b/server/main.go
@@ -2,20 +2,46 @@ package main
import (
"encoding/json"
+ "errors"
+ "fmt"
"log"
"net"
"sync"
+ "time"
)
-func handleConnection(conn net.Conn) {
- defer conn.Close()
-
+func readRequest(conn net.Conn) (Message, error) {
// 1 мегабайт
var maxBodySize uint32 = 1048576
+
+ var message Message
+ var err error
+
+ buf := make([]byte, 4096)
+ _, err = conn.Read(buf)
+ if err != nil {
+ return message, err
+ }
+
+ size := fromBytes(buf[:4])
+ if size > maxBodySize {
+ resp, _ := json.Marshal(Response{
+ Status: FAILURE,
+ Data: "Превышен максимальный размер запроса",
+ })
+
+ _ = sendMessage(conn, resp)
+ return message, errors.New("body size exceeded")
+ }
+
+ rawMessage := buf[4 : size+4]
+ return parseMessage(rawMessage)
+}
+
+func handleAuthentication(conn net.Conn) bool {
maxAuthAttempts := 5
authAttempts := 0
authenticated := false
- buf := make([]byte, 4096)
for {
if authAttempts >= maxAuthAttempts {
@@ -28,24 +54,12 @@ func handleConnection(conn net.Conn) {
break
}
- _, err := conn.Read(buf)
+ message, err := readRequest(conn)
if err != nil {
break
}
+ authAttempts += 1
- size := fromBytes(buf[:4])
- if size > maxBodySize {
- resp, _ := json.Marshal(Response{
- Status: FAILURE,
- Data: "Превышен максимальный размер запроса",
- })
-
- _ = sendMessage(conn, resp)
- break
- }
-
- rawMessage := buf[4 : size+4]
- message := parseMessage(rawMessage)
if message.Type == AUTHENTICATE {
// TODO(andrew): Добавить процесс аутентификации
if true {
@@ -55,6 +69,7 @@ func handleConnection(conn net.Conn) {
Data: "Аутентификация прошла успешно",
})
_ = sendMessage(conn, resp)
+ break
} else {
resp, _ := json.Marshal(Response{
Status: FAILURE,
@@ -65,82 +80,128 @@ func handleConnection(conn net.Conn) {
continue
}
- if !authenticated {
- resp, _ := json.Marshal(Response{
- Status: FAILURE,
- Data: "Для работы с сервером необходима аутентификация",
- })
+ resp, _ := json.Marshal(Response{
+ Status: FAILURE,
+ Data: "Для работы с сервером необходима аутентификация",
+ })
- err = sendMessage(conn, resp)
+ err = sendMessage(conn, resp)
+ if err != nil {
+ break
+ }
+ }
+ return authenticated
+}
+
+func handleConnection(conn net.Conn, c chan Message) {
+ defer conn.Close()
+
+ authenticated := handleAuthentication(conn)
+ if authenticated {
+ for {
+ message, err := readRequest(conn)
if err != nil {
break
}
- authAttempts += 1
- continue
- }
+ channelLock.Lock()
+ c <- message
- if message.Data == "exit" {
- break
- }
+ if message.Data == "exit" {
+ break
+ }
- resp, _ := json.Marshal(Response{
- Status: SUCCESS,
- Data: "some data",
- })
- err = sendMessage(conn, resp)
- if err != nil {
- break
+ resp, _ := json.Marshal(Response{
+ Status: SUCCESS,
+ Data: "some data",
+ })
+ err = sendMessage(conn, resp)
+ if err != nil {
+ break
+ }
}
}
+ fmt.Println("Closed connection (receiver handler)")
+ received.RLock()
+ sz := len(received.data)
+ received.RUnlock()
+ fmt.Printf("There is %d messages in storage\n", sz)
}
func reader(c chan Message) {
- const maxMessages uint64 = 1024
- var count uint64 = 0
+ //const maxMessages uint64 = 65536
+ //var count uint64 = 0
for {
+ //fmt.Println("Waiting for message")
data := <-c
+ channelLock.Unlock()
+ //fmt.Println("Received message:", data.Data)
received.Lock()
- if count >= maxMessages {
- received.data = []Message{data}
- received.messageShift += maxMessages
- } else {
- received.data = append(received.data, data)
- count += 1
- }
+ //fmt.Println("Locked received")
+ //if count >= maxMessages {
+ // received.data = []Message{data}
+ // received.messageShift += maxMessages
+ //} else {
+ // received.data = append(received.data, data)
+ // count += 1
+ //}
+ received.data = append(received.data, data)
received.Unlock()
+ //fmt.Println("Unlocked received")
}
}
func handleSender(conn net.Conn) {
- received.Lock()
- lastIndex := received.messageShift
- received.Unlock()
+ defer conn.Close()
+
+ authenticated := handleAuthentication(conn)
+ if !authenticated {
+ fmt.Println("Sender is not authenticated")
+ return
+ }
+ fmt.Println("Sender is authenticated")
+ lastIndex := 0
for {
var message Message
- received.Lock()
- idx := lastIndex - received.messageShift
- if idx <= uint64(len(received.data)) {
+ idx := uint64(lastIndex)
+ if idx < uint64(len(received.data)) {
+ received.RLock()
message = received.data[idx]
- }
- received.Lock()
+ received.RUnlock()
- resp, _ := json.Marshal(Response{
- Status: SUCCESS,
- Data: message.serialize(),
- })
- _ = sendMessage(conn, resp)
+ resp, _ := json.Marshal(Response{
+ Status: SUCCESS,
+ Data: message.serialize(),
+ })
+ err := sendMessage(conn, resp)
+ if err != nil {
+ fmt.Printf("Failed to send %d: %s\n", idx, err)
+ break
+ } else {
+ message, err := readRequest(conn)
+ if err != nil {
+ break
+ }
+ if message.Data == SUCCESS {
+ lastIndex += 1
+ }
+ }
+ } else {
+ time.Sleep(100 * time.Millisecond)
+ continue
+ }
}
}
var received struct {
- sync.Mutex
- data []Message
- messageShift uint64
+ sync.RWMutex
+ data []Message
}
+var channelLock sync.Mutex
+
func main() {
var wg sync.WaitGroup
@@ -155,7 +216,7 @@ func main() {
for {
conn, err := sock.Accept()
if err == nil {
- go handleConnection(conn)
+ go handleConnection(conn, c)
} else {
break
}
diff --git a/server/utils.go b/server/utils.go
index cd2069f..f566fd0 100644
--- a/server/utils.go
+++ b/server/utils.go
@@ -62,9 +62,9 @@ func sendMessage(conn net.Conn, message []byte) error {
return nil
}
-func parseMessage(data []byte) Message {
+func parseMessage(data []byte) (Message, error) {
var m Message
// TODO(andrew): Добавить обработку ошибок
- json.Unmarshal(data, &m)
- return m
+ err := json.Unmarshal(data, &m)
+ return m, err
}