diff options
| -rw-r--r-- | client/go.mod | 3 | ||||
| -rw-r--r-- | client/main.go | 115 | ||||
| -rw-r--r-- | client/utils.go | 13 | ||||
| -rw-r--r-- | server/main.go | 187 | ||||
| -rw-r--r-- | server/utils.go | 6 |
5 files changed, 240 insertions, 84 deletions
diff --git a/client/go.mod b/client/go.mod new file mode 100644 index 0000000..ffafd1f --- /dev/null +++ b/client/go.mod @@ -0,0 +1,3 @@ +module vasthecat.ru/coursework-client + +go 1.16 diff --git a/client/main.go b/client/main.go index 18796ae..f90a018 100644 --- a/client/main.go +++ b/client/main.go @@ -1,13 +1,61 @@ package main import ( + "bufio" "encoding/json" "fmt" "log" "net" + "os" "sync" ) +var storage struct { + sync.Mutex + messages []Message +} + +func parseBuffer(previous []byte, new []byte) ([]Response, []byte) { + buf := append(previous, new...) + bufSize := uint32(len(buf)) + var responses []Response + var idx uint32 = 0 + for { + if idx+4 >= bufSize { + break + } + size := fromBytes(buf[idx : idx+4]) + if size == 0 { + return responses, make([]byte, 0) + } + if idx+4+size >= bufSize { + break + } else { + resp, err := parseResponse(buf[idx+4 : idx+4+size]) + if err == nil { + responses = append(responses, resp) + } + idx += 4 + size + } + } + return responses, buf[idx:] +} + +func readResponse(conn net.Conn) (Response, error) { + var response Response + var err error + + buf := make([]byte, 4096) + _, err = conn.Read(buf) + if err != nil { + return response, err + } + + size := fromBytes(buf[:4]) + rawResponse := buf[4 : size+4] + return parseResponse(rawResponse) +} + func main() { var wg sync.WaitGroup conn, err := net.Dial("tcp", "localhost:8080") @@ -15,20 +63,21 @@ func main() { defer conn.Close() wg.Add(1) go func(conn net.Conn) { - buf := make([]byte, 4096) - authMsg, _ := json.Marshal(Message{ Type: AUTHENTICATE, Data: "auth", User: "andrew", }) _ = sendMessage(conn, authMsg) + reader := bufio.NewReader(os.Stdin) - for i := 0; i < 20; i += 1 { - fmt.Printf("Sending %d request\n", i) + for i := 0; i < 1000; i += 1 { + //fmt.Printf("Sending %d request\n", i) + line, _, _ := reader.ReadLine() request, _ := json.Marshal(Message{ Type: MESSAGE, - Data: fmt.Sprintf("Hello, %d", i), + //Data: fmt.Sprintf("Hello, %d", i), + Data: string(line), User: "andrew", }) err = sendMessage(conn, request) @@ -36,12 +85,8 @@ func main() { break } - fmt.Println("Waiting for response") - _, _ = conn.Read(buf) - - size := fromBytes(buf[:4]) - response := parseResponse(buf[4 : size+4]) - fmt.Printf("Response: %s\n", response.Data) + response, _ := readResponse(conn) + _ = fmt.Sprintf("%s", response) } exitMsg, _ := json.Marshal(Message{ @@ -50,9 +95,57 @@ func main() { User: "andrew", }) _ = sendMessage(conn, exitMsg) + fmt.Println("Closed connection") wg.Done() }(conn) + receiver, err := net.Dial("tcp", "localhost:8081") + handleError(err) + defer receiver.Close() + wg.Add(1) + go func(conn net.Conn) { + req := Message{ + Type: AUTHENTICATE, + Data: AUTHENTICATE, + User: "andrew", + } + err = sendMessage(conn, []byte(req.serialize())) + if err != nil { + fmt.Println(err) + return + } + + _, err := readResponse(conn) + if err != nil { + return + } + + for { + response, err := readResponse(conn) + if err != nil { + break + } + + message, err := parseMessage([]byte(response.Data)) + if err == nil { + storage.Lock() + storage.messages = append(storage.messages, message) + fmt.Printf("Got message: \"%s\"\n", message.Data) + storage.Unlock() + } else { + fmt.Println(err) + } + + req := Message{ + Type: MESSAGE, + Data: SUCCESS, + User: "andrew", + } + _ = sendMessage(conn, []byte(req.serialize())) + } + wg.Done() + }(receiver) + wg.Wait() } diff --git a/client/utils.go b/client/utils.go index c11bf17..4dc64b7 100644 --- a/client/utils.go +++ b/client/utils.go @@ -62,15 +62,14 @@ func sendMessage(conn net.Conn, message []byte) error { return nil } -func parseMessage(data []byte) Message { +func parseMessage(data []byte) (Message, error) { var m Message - // TODO(andrew): Добавить обработку ошибок - json.Unmarshal(data, &m) - return m + err := json.Unmarshal(data, &m) + return m, err } -func parseResponse(data []byte) Response { +func parseResponse(data []byte) (Response, error) { var r Response - json.Unmarshal(data, &r) - return r + err := json.Unmarshal(data, &r) + return r, err } diff --git a/server/main.go b/server/main.go index e7f47fc..0a785bb 100644 --- a/server/main.go +++ b/server/main.go @@ -2,20 +2,46 @@ package main import ( "encoding/json" + "errors" + "fmt" "log" "net" "sync" + "time" ) -func handleConnection(conn net.Conn) { - defer conn.Close() - +func readRequest(conn net.Conn) (Message, error) { // 1 мегабайт var maxBodySize uint32 = 1048576 + + var message Message + var err error + + buf := make([]byte, 4096) + _, err = conn.Read(buf) + if err != nil { + return message, err + } + + size := fromBytes(buf[:4]) + if size > maxBodySize { + resp, _ := json.Marshal(Response{ + Status: FAILURE, + Data: "Превышен максимальный размер запроса", + }) + + _ = sendMessage(conn, resp) + return message, errors.New("body size exceeded") + } + + rawMessage := buf[4 : size+4] + return parseMessage(rawMessage) +} + +func handleAuthentication(conn net.Conn) bool { maxAuthAttempts := 5 authAttempts := 0 authenticated := false - buf := make([]byte, 4096) for { if authAttempts >= maxAuthAttempts { @@ -28,24 +54,12 @@ func handleConnection(conn net.Conn) { break } - _, err := conn.Read(buf) + message, err := readRequest(conn) if err != nil { break } + authAttempts += 1 - size := fromBytes(buf[:4]) - if size > maxBodySize { - resp, _ := json.Marshal(Response{ - Status: FAILURE, - Data: "Превышен максимальный размер запроса", - }) - - _ = sendMessage(conn, resp) - break - } - - rawMessage := buf[4 : size+4] - message := parseMessage(rawMessage) if message.Type == AUTHENTICATE { // TODO(andrew): Добавить процесс аутентификации if true { @@ -55,6 +69,7 @@ func handleConnection(conn net.Conn) { Data: "Аутентификация прошла успешно", }) _ = sendMessage(conn, resp) + break } else { resp, _ := json.Marshal(Response{ Status: FAILURE, @@ -65,82 +80,128 @@ func handleConnection(conn net.Conn) { continue } - if !authenticated { - resp, _ := json.Marshal(Response{ - Status: FAILURE, - Data: "Для работы с сервером необходима аутентификация", - }) + resp, _ := json.Marshal(Response{ + Status: FAILURE, + Data: "Для работы с сервером необходима аутентификация", + }) - err = sendMessage(conn, resp) + err = sendMessage(conn, resp) + if err != nil { + break + } + } + return authenticated +} + +func handleConnection(conn net.Conn, c chan Message) { + defer conn.Close() + + authenticated := handleAuthentication(conn) + if authenticated { + for { + message, err := readRequest(conn) if err != nil { break } - authAttempts += 1 - continue - } + channelLock.Lock() + c <- message - if message.Data == "exit" { - break - } + if message.Data == "exit" { + break + } - resp, _ := json.Marshal(Response{ - Status: SUCCESS, - Data: "some data", - }) - err = sendMessage(conn, resp) - if err != nil { - break + resp, _ := json.Marshal(Response{ + Status: SUCCESS, + Data: "some data", + }) + err = sendMessage(conn, resp) + if err != nil { + break + } } } + fmt.Println("Closed connection (receiver handler)") + received.RLock() + sz := len(received.data) + received.RUnlock() + fmt.Printf("There is %d messages in storage\n", sz) } func reader(c chan Message) { - const maxMessages uint64 = 1024 - var count uint64 = 0 + //const maxMessages uint64 = 65536 + //var count uint64 = 0 for { + //fmt.Println("Waiting for message") data := <-c + channelLock.Unlock() + //fmt.Println("Received message:", data.Data) received.Lock() - if count >= maxMessages { - received.data = []Message{data} - received.messageShift += maxMessages - } else { - received.data = append(received.data, data) - count += 1 - } + //fmt.Println("Locked received") + //if count >= maxMessages { + // received.data = []Message{data} + // received.messageShift += maxMessages + //} else { + // received.data = append(received.data, data) + // count += 1 + //} + received.data = append(received.data, data) received.Unlock() + //fmt.Println("Unlocked received") } } func handleSender(conn net.Conn) { - received.Lock() - lastIndex := received.messageShift - received.Unlock() + defer conn.Close() + + authenticated := handleAuthentication(conn) + if !authenticated { + fmt.Println("Sender is not authenticated") + return + } + fmt.Println("Sender is authenticated") + lastIndex := 0 for { var message Message - received.Lock() - idx := lastIndex - received.messageShift - if idx <= uint64(len(received.data)) { + idx := uint64(lastIndex) + if idx < uint64(len(received.data)) { + received.RLock() message = received.data[idx] - } - received.Lock() + received.RUnlock() - resp, _ := json.Marshal(Response{ - Status: SUCCESS, - Data: message.serialize(), - }) - _ = sendMessage(conn, resp) + resp, _ := json.Marshal(Response{ + Status: SUCCESS, + Data: message.serialize(), + }) + err := sendMessage(conn, resp) + if err != nil { + fmt.Printf("Failed to send %d: %s\n", idx, err) + break + } else { + message, err := readRequest(conn) + if err != nil { + break + } + if message.Data == SUCCESS { + lastIndex += 1 + } + } + } else { + time.Sleep(100 * time.Millisecond) + continue + } } } var received struct { - sync.Mutex - data []Message - messageShift uint64 + sync.RWMutex + data []Message } +var channelLock sync.Mutex + func main() { var wg sync.WaitGroup @@ -155,7 +216,7 @@ func main() { for { conn, err := sock.Accept() if err == nil { - go handleConnection(conn) + go handleConnection(conn, c) } else { break } diff --git a/server/utils.go b/server/utils.go index cd2069f..f566fd0 100644 --- a/server/utils.go +++ b/server/utils.go @@ -62,9 +62,9 @@ func sendMessage(conn net.Conn, message []byte) error { return nil } -func parseMessage(data []byte) Message { +func parseMessage(data []byte) (Message, error) { var m Message // TODO(andrew): Добавить обработку ошибок - json.Unmarshal(data, &m) - return m + err := json.Unmarshal(data, &m) + return m, err } |