diff options
| author | citrons <citrons@mondecitronne.com> | 2025-06-07 17:43:51 -0500 |
|---|---|---|
| committer | citrons <citrons@mondecitronne.com> | 2025-06-07 17:43:51 -0500 |
| commit | 24b15604403341044fcb5f7ece18ffc9c569a2cf (patch) | |
| tree | 6789d0c3e4da2fcd26622171f7418033e97143e7 | |
| parent | 4fd84671877754ae3b66acb8a8af8cf0935a53c9 (diff) | |
more robust connection handling
| -rw-r--r-- | client/client/client.go | 42 | ||||
| -rw-r--r-- | server/server/server.go | 12 | ||||
| -rw-r--r-- | server/session/session.go | 46 |
3 files changed, 67 insertions, 33 deletions
diff --git a/client/client/client.go b/client/client/client.go index 7c9cf03..0c563bf 100644 --- a/client/client/client.go +++ b/client/client/client.go @@ -45,19 +45,18 @@ func New(address string) Client { } func (c *Client) RunClient() { - sleep := time.Second / 4 + reconnectWait := time.Second / 4 for { - conn, err := net.Dial("tcp", c.Address) + conn, err := net.DialTimeout("tcp", c.Address, 30 * time.Second) if err != nil { c.message <- Message {func(mh MessageHandler) { mh.OnDisconnect(err) }} - time.Sleep(sleep) - sleep = min(sleep * 2, 30 * time.Second) + time.Sleep(reconnectWait) + reconnectWait = min(reconnectWait * 2, 30 * time.Second) continue } defer conn.Close() - sleep = time.Second / time.Duration(2 + rand.Int() % 4) recv, recvErr := proto.ReadLines(bufio.NewReader(conn)) send := make(chan proto.Line, 1) @@ -88,11 +87,10 @@ func (c *Client) RunClient() { }() defer close(c.send) - c.message <- Message {func(mh MessageHandler) { - mh.OnConnect() - }} - - var debugger Debugger + var ( + debugger Debugger + connected bool + ) run: for { select { case line := <-recv: @@ -101,6 +99,18 @@ func (c *Client) RunClient() { debugger.OnLine(line, false) }} } + + if !connected { + if line.Kind == '*' && line.Cmd.Kind == "hi" { + c.message <- Message {func(mh MessageHandler) { + mh.OnConnect() + }} + reconnectWait = + time.Second / time.Duration(2 + rand.Int() % 4) + connected = true + } + } + switch line.Kind { case '*': c.message <- Message {func(mh MessageHandler) { @@ -130,11 +140,13 @@ func (c *Client) RunClient() { return } } - c.message <- Message {func(mh MessageHandler) { - mh.OnDisconnect(err) - }} - time.Sleep(sleep) - sleep = min(sleep * 2, 30 * time.Second) + if connected { + c.message <- Message {func(mh MessageHandler) { + mh.OnDisconnect(err) + }} + } + time.Sleep(reconnectWait) + reconnectWait = min(reconnectWait * 2, 30 * time.Second) } } diff --git a/server/server/server.go b/server/server/server.go index 07625c4..8ec7201 100644 --- a/server/server/server.go +++ b/server/server/server.go @@ -28,6 +28,7 @@ func (s *server) mainLoop() { s.onConnect(c) case c := <-s.disconnects: s.onDisconnect(c) + c.Close() case rq := <-s.requests: if rq.From.UserId == "" && rq.Cmd.Target != "" { rq.ReplyInvalid() @@ -96,9 +97,8 @@ func Serve() { func handleConn(conn net.Conn, rq chan<- session.Request, disconnects chan<- *session.Session) *session.Session { recv, recvErr := proto.ReadLines(bufio.NewReader(conn)) - send := make(chan proto.Line) - sendErr := proto.WriteLines(bufio.NewWriter(conn), send) - s := session.NewSession(send) + s, out := session.NewSession() + sendErr := proto.WriteLines(bufio.NewWriter(conn), out) go func() { select { @@ -107,11 +107,11 @@ func handleConn(conn net.Conn, rq chan<- session.Request, log.Print("client read error: ", err) } case err := <-sendErr: - log.Print("client write error: ", err) + if err != nil { + log.Print("client write error: ", err) + } } conn.Close() - s.Close() - close(send) disconnects <- s }() diff --git a/server/session/session.go b/server/session/session.go index b6f9e5d..df7ab65 100644 --- a/server/session/session.go +++ b/server/session/session.go @@ -2,27 +2,51 @@ package session import ( "citrons.xyz/talk/proto" - "sync/atomic" ) type Session struct { send chan<- proto.Line UserId string subscribedTo map[*Stream]bool - disconnected atomic.Bool + sendError error } -func NewSession(send chan<- proto.Line) *Session { +func NewSession() (*Session, <-chan proto.Line) { var s Session - s.send = send s.subscribedTo = make(map[*Stream]bool) - return &s + sendChan := make(chan proto.Line, 1) + s.send = sendChan + return &s, s.bufferLines(sendChan) +} + +func (s *Session) bufferLines(in <-chan proto.Line) <-chan proto.Line { + out := make(chan proto.Line, 1) + go func() { + buf := make([]proto.Line, 0, 8) + for { + line, ok := <-in + if !ok { + return + } + buf = append(buf, line) + for len(buf) > 0 { + select { + case line, ok := <-in: + if !ok { + return + } + buf = append(buf, line) + case out <- buf[0]: + buf = buf[1:] + } + } + } + }() + return out } func (s *Session) Event(ev proto.Command) { - if !s.disconnected.Load() { - s.send <- proto.Line {'*', "", ev} - } + s.send <- proto.Line {'*', "", ev} } func (s *Session) Subscriptions() map[*Stream]bool { @@ -45,7 +69,7 @@ func (s *Session) Unsubscribe(to *Stream) { } func (s *Session) Close() { - s.disconnected.Store(true) + close(s.send) } type Stream struct { @@ -75,9 +99,7 @@ type Request struct { } func (r Request) Reply(reply proto.Command) { - if !r.From.disconnected.Load() { - r.From.send <- proto.Line {'!', r.RequestId, reply} - } + r.From.send <- proto.Line {'!', r.RequestId, reply} } func (r Request) ReplyOk() { |
