package main import ( "encoding/json" "errors" "fmt" "log" "net" "sync" "time" ) func readRequest(conn net.Conn) (Message, error) { // 1 мегабайт var maxBodySize uint32 = 1048576 var message Message var err error buf := make([]byte, 4096) _, err = conn.Read(buf) if err != nil { return message, err } size := fromBytes(buf[:4]) if size > maxBodySize { resp, _ := json.Marshal(Response{ Status: FAILURE, Data: "Превышен максимальный размер запроса", }) _ = sendMessage(conn, resp) return message, errors.New("body size exceeded") } rawMessage := buf[4 : size+4] return parseMessage(rawMessage) } func handleAuthentication(conn net.Conn) bool { maxAuthAttempts := 5 authAttempts := 0 authenticated := false for { if authAttempts >= maxAuthAttempts { resp, _ := json.Marshal(Response{ Status: FAILURE, Data: "Превышено количество неавторизованных запросов", }) _ = sendMessage(conn, resp) break } message, err := readRequest(conn) if err != nil { break } authAttempts += 1 if message.Type == AUTHENTICATE { // TODO(andrew): Добавить процесс аутентификации if true { authenticated = true resp, _ := json.Marshal(Response{ Status: SUCCESS, Data: "Аутентификация прошла успешно", }) _ = sendMessage(conn, resp) break } else { resp, _ := json.Marshal(Response{ Status: FAILURE, Data: "При аутентификации произошла ошибка", }) _ = sendMessage(conn, resp) } continue } resp, _ := json.Marshal(Response{ Status: FAILURE, Data: "Для работы с сервером необходима аутентификация", }) err = sendMessage(conn, resp) if err != nil { break } } return authenticated } func handleConnection(conn net.Conn, c chan Message) { defer conn.Close() authenticated := handleAuthentication(conn) if authenticated { for { message, err := readRequest(conn) if err != nil { break } channelLock.Lock() c <- message if message.Data == "exit" { break } resp, _ := json.Marshal(Response{ Status: SUCCESS, Data: "some data", }) err = sendMessage(conn, resp) if err != nil { break } } } fmt.Println("Closed connection (receiver handler)") received.RLock() sz := len(received.data) received.RUnlock() fmt.Printf("There is %d messages in storage\n", sz) } func reader(c chan Message) { //const maxMessages uint64 = 65536 //var count uint64 = 0 for { //fmt.Println("Waiting for message") data := <-c channelLock.Unlock() //fmt.Println("Received message:", data.Data) received.Lock() //fmt.Println("Locked received") //if count >= maxMessages { // received.data = []Message{data} // received.messageShift += maxMessages //} else { // received.data = append(received.data, data) // count += 1 //} received.data = append(received.data, data) received.Unlock() //fmt.Println("Unlocked received") } } func handleSender(conn net.Conn) { defer conn.Close() authenticated := handleAuthentication(conn) if !authenticated { fmt.Println("Sender is not authenticated") return } fmt.Println("Sender is authenticated") lastIndex := 0 for { var message Message idx := uint64(lastIndex) if idx < uint64(len(received.data)) { received.RLock() message = received.data[idx] received.RUnlock() resp, _ := json.Marshal(Response{ Status: SUCCESS, Data: message.serialize(), }) err := sendMessage(conn, resp) if err != nil { fmt.Printf("Failed to send %d: %s\n", idx, err) break } else { message, err := readRequest(conn) if err != nil { break } if message.Data == SUCCESS { lastIndex += 1 } } } else { time.Sleep(100 * time.Millisecond) continue } } } var received struct { sync.RWMutex data []Message } var channelLock sync.Mutex func main() { var wg sync.WaitGroup c := make(chan Message) go reader(c) sock, err := net.Listen("tcp", "localhost:8080") handleError(err) defer sock.Close() wg.Add(1) go func(sock net.Listener) { for { conn, err := sock.Accept() if err == nil { go handleConnection(conn, c) } else { break } } wg.Done() }(sock) senderSock, err := net.Listen("tcp", "localhost:8081") handleError(err) defer senderSock.Close() wg.Add(1) go func(sock net.Listener) { for { conn, err := sock.Accept() if err == nil { go handleSender(conn) } else { break } } wg.Done() }(senderSock) wg.Wait() } func handleError(err error) { if err != nil { log.Fatal(err) } }