package main import ( "encoding/json" "errors" "fmt" "log" "net" "sync" "time" ) func readRequest(conn net.Conn) (Request, error) { // 1 мегабайт var maxBodySize uint32 = 1048576 var message Request var err error buf := make([]byte, 4096) _, err = conn.Read(buf) if err != nil { return message, err } size := fromBytes(buf[:4]) if size > maxBodySize { resp, _ := json.Marshal(Response{ Status: FAILURE, Data: "Превышен максимальный размер запроса", }) _ = sendMessage(conn, resp) return message, errors.New("body size exceeded") } rawMessage := buf[4 : size+4] return parseRequest(rawMessage) } func handleAuthentication(conn net.Conn) bool { maxAuthAttempts := 5 authAttempts := 0 authenticated := false for { if authAttempts >= maxAuthAttempts { resp, _ := json.Marshal(Response{ Status: FAILURE, Data: "Превышено количество неавторизованных запросов", }) _ = sendMessage(conn, resp) break } message, err := readRequest(conn) if err != nil { break } authAttempts += 1 if message.Type == AUTHENTICATE { // TODO(andrew): Добавить процесс аутентификации if true { authenticated = true resp, _ := json.Marshal(Response{ Status: SUCCESS, Data: "Аутентификация прошла успешно", }) _ = sendMessage(conn, resp) break } else { resp, _ := json.Marshal(Response{ Status: FAILURE, Data: "При аутентификации произошла ошибка", }) _ = sendMessage(conn, resp) } continue } resp, _ := json.Marshal(Response{ Status: FAILURE, Data: "Для работы с сервером необходима аутентификация", }) err = sendMessage(conn, resp) if err != nil { break } } return authenticated } func handleConnection(conn net.Conn, c chan Request) { defer conn.Close() authenticated := handleAuthentication(conn) if authenticated { for { message, err := readRequest(conn) if err != nil { break } channelLock.Lock() c <- message fmt.Printf("Got new message from %s: %s\n", message.User, message.Data) if message.Type == EXIT { break } resp, _ := json.Marshal(Response{ Status: SUCCESS, Data: "some data", }) err = sendMessage(conn, resp) if err != nil { break } } } fmt.Println("Closed connection (receiver handler)") //received.RLock() //sz := len(received.data) //received.RUnlock() //fmt.Printf("There is %d messages in storage\n", sz) } func reader(c chan Request) { //const maxMessages uint64 = 65536 //var count uint64 = 0 for { //fmt.Println("Waiting for message") data := <-c channelLock.Unlock() //fmt.Println("Received message:", data.Data) received.Lock() //fmt.Println("Locked received") //if count >= maxMessages { // received.data = []Request{data} // received.messageShift += maxMessages //} else { // received.data = append(received.data, data) // count += 1 //} received.data = append(received.data, data) received.Unlock() //fmt.Println("Unlocked received") } } func handleSender(conn net.Conn) { defer conn.Close() authenticated := handleAuthentication(conn) if !authenticated { fmt.Println("Sender is not authenticated") return } fmt.Println("Sender is authenticated") lastIndex := 0 for { var message Request idx := uint64(lastIndex) if idx < uint64(len(received.data)) { received.RLock() message = received.data[idx] received.RUnlock() resp, _ := json.Marshal(Response{ Status: SUCCESS, Data: message.serialize(), }) err := sendMessage(conn, resp) if err != nil { fmt.Printf("Failed to send %d: %s\n", idx, err) break } else { message, err := readRequest(conn) if err != nil { break } if message.Data == SUCCESS { lastIndex += 1 } } } else { resp, _ := json.Marshal(Response{ Status: PING, Data: PING, }) err := sendMessage(conn, resp) if err != nil { fmt.Println("Closed connection (sender handler)") break } time.Sleep(100 * time.Millisecond) continue } } } var received struct { sync.RWMutex data []Request } var channelLock sync.Mutex func main() { var wg sync.WaitGroup c := make(chan Request) go reader(c) sock, err := net.Listen("tcp", "localhost:8080") handleError(err) defer sock.Close() wg.Add(1) go func(sock net.Listener) { for { conn, err := sock.Accept() if err == nil { go handleConnection(conn, c) } else { break } } wg.Done() }(sock) senderSock, err := net.Listen("tcp", "localhost:8081") handleError(err) defer senderSock.Close() wg.Add(1) go func(sock net.Listener) { for { conn, err := sock.Accept() if err == nil { go handleSender(conn) } else { break } } wg.Done() }(senderSock) wg.Wait() } func handleError(err error) { if err != nil { log.Fatal(err) } }