From 1fb7ca2bd298cbe340f7790cc40a808b1a4ab245 Mon Sep 17 00:00:00 2001 From: Andrew Date: Tue, 27 Apr 2021 13:41:24 +0400 Subject: Finished main application loop between server and client. --- server/main.go | 187 +++++++++++++++++++++++++++++++++++++------------------- server/utils.go | 6 +- 2 files changed, 127 insertions(+), 66 deletions(-) (limited to 'server') diff --git a/server/main.go b/server/main.go index e7f47fc..0a785bb 100644 --- a/server/main.go +++ b/server/main.go @@ -2,20 +2,46 @@ package main import ( "encoding/json" + "errors" + "fmt" "log" "net" "sync" + "time" ) -func handleConnection(conn net.Conn) { - defer conn.Close() - +func readRequest(conn net.Conn) (Message, error) { // 1 мегабайт var maxBodySize uint32 = 1048576 + + var message Message + var err error + + buf := make([]byte, 4096) + _, err = conn.Read(buf) + if err != nil { + return message, err + } + + size := fromBytes(buf[:4]) + if size > maxBodySize { + resp, _ := json.Marshal(Response{ + Status: FAILURE, + Data: "Превышен максимальный размер запроса", + }) + + _ = sendMessage(conn, resp) + return message, errors.New("body size exceeded") + } + + rawMessage := buf[4 : size+4] + return parseMessage(rawMessage) +} + +func handleAuthentication(conn net.Conn) bool { maxAuthAttempts := 5 authAttempts := 0 authenticated := false - buf := make([]byte, 4096) for { if authAttempts >= maxAuthAttempts { @@ -28,24 +54,12 @@ func handleConnection(conn net.Conn) { break } - _, err := conn.Read(buf) + message, err := readRequest(conn) if err != nil { break } + authAttempts += 1 - size := fromBytes(buf[:4]) - if size > maxBodySize { - resp, _ := json.Marshal(Response{ - Status: FAILURE, - Data: "Превышен максимальный размер запроса", - }) - - _ = sendMessage(conn, resp) - break - } - - rawMessage := buf[4 : size+4] - message := parseMessage(rawMessage) if message.Type == AUTHENTICATE { // TODO(andrew): Добавить процесс аутентификации if true { @@ -55,6 +69,7 @@ func handleConnection(conn net.Conn) { Data: "Аутентификация прошла успешно", }) _ = sendMessage(conn, resp) + break } else { resp, _ := json.Marshal(Response{ Status: FAILURE, @@ -65,82 +80,128 @@ func handleConnection(conn net.Conn) { continue } - if !authenticated { - resp, _ := json.Marshal(Response{ - Status: FAILURE, - Data: "Для работы с сервером необходима аутентификация", - }) + resp, _ := json.Marshal(Response{ + Status: FAILURE, + Data: "Для работы с сервером необходима аутентификация", + }) - err = sendMessage(conn, resp) + err = sendMessage(conn, resp) + if err != nil { + break + } + } + return authenticated +} + +func handleConnection(conn net.Conn, c chan Message) { + defer conn.Close() + + authenticated := handleAuthentication(conn) + if authenticated { + for { + message, err := readRequest(conn) if err != nil { break } - authAttempts += 1 - continue - } + channelLock.Lock() + c <- message - if message.Data == "exit" { - break - } + if message.Data == "exit" { + break + } - resp, _ := json.Marshal(Response{ - Status: SUCCESS, - Data: "some data", - }) - err = sendMessage(conn, resp) - if err != nil { - break + resp, _ := json.Marshal(Response{ + Status: SUCCESS, + Data: "some data", + }) + err = sendMessage(conn, resp) + if err != nil { + break + } } } + fmt.Println("Closed connection (receiver handler)") + received.RLock() + sz := len(received.data) + received.RUnlock() + fmt.Printf("There is %d messages in storage\n", sz) } func reader(c chan Message) { - const maxMessages uint64 = 1024 - var count uint64 = 0 + //const maxMessages uint64 = 65536 + //var count uint64 = 0 for { + //fmt.Println("Waiting for message") data := <-c + channelLock.Unlock() + //fmt.Println("Received message:", data.Data) received.Lock() - if count >= maxMessages { - received.data = []Message{data} - received.messageShift += maxMessages - } else { - received.data = append(received.data, data) - count += 1 - } + //fmt.Println("Locked received") + //if count >= maxMessages { + // received.data = []Message{data} + // received.messageShift += maxMessages + //} else { + // received.data = append(received.data, data) + // count += 1 + //} + received.data = append(received.data, data) received.Unlock() + //fmt.Println("Unlocked received") } } func handleSender(conn net.Conn) { - received.Lock() - lastIndex := received.messageShift - received.Unlock() + defer conn.Close() + + authenticated := handleAuthentication(conn) + if !authenticated { + fmt.Println("Sender is not authenticated") + return + } + fmt.Println("Sender is authenticated") + lastIndex := 0 for { var message Message - received.Lock() - idx := lastIndex - received.messageShift - if idx <= uint64(len(received.data)) { + idx := uint64(lastIndex) + if idx < uint64(len(received.data)) { + received.RLock() message = received.data[idx] - } - received.Lock() + received.RUnlock() - resp, _ := json.Marshal(Response{ - Status: SUCCESS, - Data: message.serialize(), - }) - _ = sendMessage(conn, resp) + resp, _ := json.Marshal(Response{ + Status: SUCCESS, + Data: message.serialize(), + }) + err := sendMessage(conn, resp) + if err != nil { + fmt.Printf("Failed to send %d: %s\n", idx, err) + break + } else { + message, err := readRequest(conn) + if err != nil { + break + } + if message.Data == SUCCESS { + lastIndex += 1 + } + } + } else { + time.Sleep(100 * time.Millisecond) + continue + } } } var received struct { - sync.Mutex - data []Message - messageShift uint64 + sync.RWMutex + data []Message } +var channelLock sync.Mutex + func main() { var wg sync.WaitGroup @@ -155,7 +216,7 @@ func main() { for { conn, err := sock.Accept() if err == nil { - go handleConnection(conn) + go handleConnection(conn, c) } else { break } diff --git a/server/utils.go b/server/utils.go index cd2069f..f566fd0 100644 --- a/server/utils.go +++ b/server/utils.go @@ -62,9 +62,9 @@ func sendMessage(conn net.Conn, message []byte) error { return nil } -func parseMessage(data []byte) Message { +func parseMessage(data []byte) (Message, error) { var m Message // TODO(andrew): Добавить обработку ошибок - json.Unmarshal(data, &m) - return m + err := json.Unmarshal(data, &m) + return m, err } -- cgit v1.2.3