summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew <saintruler@gmail.com>2021-04-27 13:41:24 +0400
committerAndrew <saintruler@gmail.com>2021-04-27 13:41:24 +0400
commit1fb7ca2bd298cbe340f7790cc40a808b1a4ab245 (patch)
tree57214f0aa8e676fa2a470f8955fa190099bff3ae
parent82e4202ba125d4ebc1e7ab75e52938c0b7a97071 (diff)
Finished main application loop between server and client.
-rw-r--r--client/go.mod3
-rw-r--r--client/main.go115
-rw-r--r--client/utils.go13
-rw-r--r--server/main.go187
-rw-r--r--server/utils.go6
5 files changed, 240 insertions, 84 deletions
diff --git a/client/go.mod b/client/go.mod
new file mode 100644
index 0000000..ffafd1f
--- /dev/null
+++ b/client/go.mod
@@ -0,0 +1,3 @@
+module vasthecat.ru/coursework-client
+
+go 1.16
diff --git a/client/main.go b/client/main.go
index 18796ae..f90a018 100644
--- a/client/main.go
+++ b/client/main.go
@@ -1,13 +1,61 @@
package main
import (
+ "bufio"
"encoding/json"
"fmt"
"log"
"net"
+ "os"
"sync"
)
+var storage struct {
+ sync.Mutex
+ messages []Message
+}
+
+func parseBuffer(previous []byte, new []byte) ([]Response, []byte) {
+ buf := append(previous, new...)
+ bufSize := uint32(len(buf))
+ var responses []Response
+ var idx uint32 = 0
+ for {
+ if idx+4 >= bufSize {
+ break
+ }
+ size := fromBytes(buf[idx : idx+4])
+ if size == 0 {
+ return responses, make([]byte, 0)
+ }
+ if idx+4+size >= bufSize {
+ break
+ } else {
+ resp, err := parseResponse(buf[idx+4 : idx+4+size])
+ if err == nil {
+ responses = append(responses, resp)
+ }
+ idx += 4 + size
+ }
+ }
+ return responses, buf[idx:]
+}
+
+func readResponse(conn net.Conn) (Response, error) {
+ var response Response
+ var err error
+
+ buf := make([]byte, 4096)
+ _, err = conn.Read(buf)
+ if err != nil {
+ return response, err
+ }
+
+ size := fromBytes(buf[:4])
+ rawResponse := buf[4 : size+4]
+ return parseResponse(rawResponse)
+}
+
func main() {
var wg sync.WaitGroup
conn, err := net.Dial("tcp", "localhost:8080")
@@ -15,20 +63,21 @@ func main() {
defer conn.Close()
wg.Add(1)
go func(conn net.Conn) {
- buf := make([]byte, 4096)
-
authMsg, _ := json.Marshal(Message{
Type: AUTHENTICATE,
Data: "auth",
User: "andrew",
})
_ = sendMessage(conn, authMsg)
+ reader := bufio.NewReader(os.Stdin)
- for i := 0; i < 20; i += 1 {
- fmt.Printf("Sending %d request\n", i)
+ for i := 0; i < 1000; i += 1 {
+ //fmt.Printf("Sending %d request\n", i)
+ line, _, _ := reader.ReadLine()
request, _ := json.Marshal(Message{
Type: MESSAGE,
- Data: fmt.Sprintf("Hello, %d", i),
+ //Data: fmt.Sprintf("Hello, %d", i),
+ Data: string(line),
User: "andrew",
})
err = sendMessage(conn, request)
@@ -36,12 +85,8 @@ func main() {
break
}
- fmt.Println("Waiting for response")
- _, _ = conn.Read(buf)
-
- size := fromBytes(buf[:4])
- response := parseResponse(buf[4 : size+4])
- fmt.Printf("Response: %s\n", response.Data)
+ response, _ := readResponse(conn)
+ _ = fmt.Sprintf("%s", response)
}
exitMsg, _ := json.Marshal(Message{
@@ -50,9 +95,57 @@ func main() {
User: "andrew",
})
_ = sendMessage(conn, exitMsg)
+ fmt.Println("Closed connection")
wg.Done()
}(conn)
+ receiver, err := net.Dial("tcp", "localhost:8081")
+ handleError(err)
+ defer receiver.Close()
+ wg.Add(1)
+ go func(conn net.Conn) {
+ req := Message{
+ Type: AUTHENTICATE,
+ Data: AUTHENTICATE,
+ User: "andrew",
+ }
+ err = sendMessage(conn, []byte(req.serialize()))
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+
+ _, err := readResponse(conn)
+ if err != nil {
+ return
+ }
+
+ for {
+ response, err := readResponse(conn)
+ if err != nil {
+ break
+ }
+
+ message, err := parseMessage([]byte(response.Data))
+ if err == nil {
+ storage.Lock()
+ storage.messages = append(storage.messages, message)
+ fmt.Printf("Got message: \"%s\"\n", message.Data)
+ storage.Unlock()
+ } else {
+ fmt.Println(err)
+ }
+
+ req := Message{
+ Type: MESSAGE,
+ Data: SUCCESS,
+ User: "andrew",
+ }
+ _ = sendMessage(conn, []byte(req.serialize()))
+ }
+ wg.Done()
+ }(receiver)
+
wg.Wait()
}
diff --git a/client/utils.go b/client/utils.go
index c11bf17..4dc64b7 100644
--- a/client/utils.go
+++ b/client/utils.go
@@ -62,15 +62,14 @@ func sendMessage(conn net.Conn, message []byte) error {
return nil
}
-func parseMessage(data []byte) Message {
+func parseMessage(data []byte) (Message, error) {
var m Message
- // TODO(andrew): Добавить обработку ошибок
- json.Unmarshal(data, &m)
- return m
+ err := json.Unmarshal(data, &m)
+ return m, err
}
-func parseResponse(data []byte) Response {
+func parseResponse(data []byte) (Response, error) {
var r Response
- json.Unmarshal(data, &r)
- return r
+ err := json.Unmarshal(data, &r)
+ return r, err
}
diff --git a/server/main.go b/server/main.go
index e7f47fc..0a785bb 100644
--- a/server/main.go
+++ b/server/main.go
@@ -2,20 +2,46 @@ package main
import (
"encoding/json"
+ "errors"
+ "fmt"
"log"
"net"
"sync"
+ "time"
)
-func handleConnection(conn net.Conn) {
- defer conn.Close()
-
+func readRequest(conn net.Conn) (Message, error) {
// 1 мегабайт
var maxBodySize uint32 = 1048576
+
+ var message Message
+ var err error
+
+ buf := make([]byte, 4096)
+ _, err = conn.Read(buf)
+ if err != nil {
+ return message, err
+ }
+
+ size := fromBytes(buf[:4])
+ if size > maxBodySize {
+ resp, _ := json.Marshal(Response{
+ Status: FAILURE,
+ Data: "Превышен максимальный размер запроса",
+ })
+
+ _ = sendMessage(conn, resp)
+ return message, errors.New("body size exceeded")
+ }
+
+ rawMessage := buf[4 : size+4]
+ return parseMessage(rawMessage)
+}
+
+func handleAuthentication(conn net.Conn) bool {
maxAuthAttempts := 5
authAttempts := 0
authenticated := false
- buf := make([]byte, 4096)
for {
if authAttempts >= maxAuthAttempts {
@@ -28,24 +54,12 @@ func handleConnection(conn net.Conn) {
break
}
- _, err := conn.Read(buf)
+ message, err := readRequest(conn)
if err != nil {
break
}
+ authAttempts += 1
- size := fromBytes(buf[:4])
- if size > maxBodySize {
- resp, _ := json.Marshal(Response{
- Status: FAILURE,
- Data: "Превышен максимальный размер запроса",
- })
-
- _ = sendMessage(conn, resp)
- break
- }
-
- rawMessage := buf[4 : size+4]
- message := parseMessage(rawMessage)
if message.Type == AUTHENTICATE {
// TODO(andrew): Добавить процесс аутентификации
if true {
@@ -55,6 +69,7 @@ func handleConnection(conn net.Conn) {
Data: "Аутентификация прошла успешно",
})
_ = sendMessage(conn, resp)
+ break
} else {
resp, _ := json.Marshal(Response{
Status: FAILURE,
@@ -65,82 +80,128 @@ func handleConnection(conn net.Conn) {
continue
}
- if !authenticated {
- resp, _ := json.Marshal(Response{
- Status: FAILURE,
- Data: "Для работы с сервером необходима аутентификация",
- })
+ resp, _ := json.Marshal(Response{
+ Status: FAILURE,
+ Data: "Для работы с сервером необходима аутентификация",
+ })
- err = sendMessage(conn, resp)
+ err = sendMessage(conn, resp)
+ if err != nil {
+ break
+ }
+ }
+ return authenticated
+}
+
+func handleConnection(conn net.Conn, c chan Message) {
+ defer conn.Close()
+
+ authenticated := handleAuthentication(conn)
+ if authenticated {
+ for {
+ message, err := readRequest(conn)
if err != nil {
break
}
- authAttempts += 1
- continue
- }
+ channelLock.Lock()
+ c <- message
- if message.Data == "exit" {
- break
- }
+ if message.Data == "exit" {
+ break
+ }
- resp, _ := json.Marshal(Response{
- Status: SUCCESS,
- Data: "some data",
- })
- err = sendMessage(conn, resp)
- if err != nil {
- break
+ resp, _ := json.Marshal(Response{
+ Status: SUCCESS,
+ Data: "some data",
+ })
+ err = sendMessage(conn, resp)
+ if err != nil {
+ break
+ }
}
}
+ fmt.Println("Closed connection (receiver handler)")
+ received.RLock()
+ sz := len(received.data)
+ received.RUnlock()
+ fmt.Printf("There is %d messages in storage\n", sz)
}
func reader(c chan Message) {
- const maxMessages uint64 = 1024
- var count uint64 = 0
+ //const maxMessages uint64 = 65536
+ //var count uint64 = 0
for {
+ //fmt.Println("Waiting for message")
data := <-c
+ channelLock.Unlock()
+ //fmt.Println("Received message:", data.Data)
received.Lock()
- if count >= maxMessages {
- received.data = []Message{data}
- received.messageShift += maxMessages
- } else {
- received.data = append(received.data, data)
- count += 1
- }
+ //fmt.Println("Locked received")
+ //if count >= maxMessages {
+ // received.data = []Message{data}
+ // received.messageShift += maxMessages
+ //} else {
+ // received.data = append(received.data, data)
+ // count += 1
+ //}
+ received.data = append(received.data, data)
received.Unlock()
+ //fmt.Println("Unlocked received")
}
}
func handleSender(conn net.Conn) {
- received.Lock()
- lastIndex := received.messageShift
- received.Unlock()
+ defer conn.Close()
+
+ authenticated := handleAuthentication(conn)
+ if !authenticated {
+ fmt.Println("Sender is not authenticated")
+ return
+ }
+ fmt.Println("Sender is authenticated")
+ lastIndex := 0
for {
var message Message
- received.Lock()
- idx := lastIndex - received.messageShift
- if idx <= uint64(len(received.data)) {
+ idx := uint64(lastIndex)
+ if idx < uint64(len(received.data)) {
+ received.RLock()
message = received.data[idx]
- }
- received.Lock()
+ received.RUnlock()
- resp, _ := json.Marshal(Response{
- Status: SUCCESS,
- Data: message.serialize(),
- })
- _ = sendMessage(conn, resp)
+ resp, _ := json.Marshal(Response{
+ Status: SUCCESS,
+ Data: message.serialize(),
+ })
+ err := sendMessage(conn, resp)
+ if err != nil {
+ fmt.Printf("Failed to send %d: %s\n", idx, err)
+ break
+ } else {
+ message, err := readRequest(conn)
+ if err != nil {
+ break
+ }
+ if message.Data == SUCCESS {
+ lastIndex += 1
+ }
+ }
+ } else {
+ time.Sleep(100 * time.Millisecond)
+ continue
+ }
}
}
var received struct {
- sync.Mutex
- data []Message
- messageShift uint64
+ sync.RWMutex
+ data []Message
}
+var channelLock sync.Mutex
+
func main() {
var wg sync.WaitGroup
@@ -155,7 +216,7 @@ func main() {
for {
conn, err := sock.Accept()
if err == nil {
- go handleConnection(conn)
+ go handleConnection(conn, c)
} else {
break
}
diff --git a/server/utils.go b/server/utils.go
index cd2069f..f566fd0 100644
--- a/server/utils.go
+++ b/server/utils.go
@@ -62,9 +62,9 @@ func sendMessage(conn net.Conn, message []byte) error {
return nil
}
-func parseMessage(data []byte) Message {
+func parseMessage(data []byte) (Message, error) {
var m Message
// TODO(andrew): Добавить обработку ошибок
- json.Unmarshal(data, &m)
- return m
+ err := json.Unmarshal(data, &m)
+ return m, err
}