From 6edb15c2e8012e963c6f238c9177bdef06a9d651 Mon Sep 17 00:00:00 2001 From: citrons Date: Thu, 12 Jun 2025 15:15:57 -0500 Subject: properly clean up resources associated with conn --- client/client/client.go | 173 ++++++++++++++++++++++++------------------------ 1 file changed, 88 insertions(+), 85 deletions(-) (limited to 'client') diff --git a/client/client/client.go b/client/client/client.go index 0c563bf..0f0fb03 100644 --- a/client/client/client.go +++ b/client/client/client.go @@ -32,6 +32,7 @@ type Client struct { activeRequests map[string]func(proto.Command) debuggerAttach chan Debugger debugger Debugger + reconnectWait time.Duration } func New(address string) Client { @@ -45,109 +46,111 @@ func New(address string) Client { } func (c *Client) RunClient() { - reconnectWait := time.Second / 4 + c.reconnectWait = time.Second / 4 for { conn, err := net.DialTimeout("tcp", c.Address, 30 * time.Second) if err != nil { c.message <- Message {func(mh MessageHandler) { mh.OnDisconnect(err) }} - time.Sleep(reconnectWait) - reconnectWait = min(reconnectWait * 2, 30 * time.Second) - continue + } else { + c.handleConnection(conn) + conn.Close() } - defer conn.Close() - - recv, recvErr := proto.ReadLines(bufio.NewReader(conn)) - send := make(chan proto.Line, 1) - defer close(send) - sendErr := proto.WriteLines(bufio.NewWriter(conn), send) - - c.send = make(chan proto.Line, 1) - go func() { - buf := make([]proto.Line, 0, 8) - for { - line, ok := <-c.send - if !ok { - return - } - buf = append(buf, line) - for len(buf) > 0 { - select { - case line, ok := <-c.send: - if !ok { - return - } - buf = append(buf, line) - case send <- buf[0]: - buf = buf[1:] + time.Sleep(c.reconnectWait) + c.reconnectWait = min(c.reconnectWait * 2, 30 * time.Second) + } +} + +func (c *Client) handleConnection(conn net.Conn) (err error) { + recv, recvErr := proto.ReadLines(bufio.NewReader(conn)) + send := make(chan proto.Line, 1) + defer close(send) + sendErr := proto.WriteLines(bufio.NewWriter(conn), send) + + c.send = make(chan proto.Line, 1) + go func() { + buf := make([]proto.Line, 0, 8) + for { + line, ok := <-c.send + if !ok { + return + } + buf = append(buf, line) + for len(buf) > 0 { + select { + case line, ok := <-c.send: + if !ok { + return } + buf = append(buf, line) + case send <- buf[0]: + buf = buf[1:] } } - }() - defer close(c.send) - - var ( - debugger Debugger - connected bool - ) - run: for { - select { - case line := <-recv: - if debugger != nil { + } + }() + defer close(c.send) + + var ( + debugger Debugger + connected bool + ) + run: for { + select { + case line := <-recv: + if debugger != nil { + c.message <- Message {func(mh MessageHandler) { + debugger.OnLine(line, false) + }} + } + + if !connected { + if line.Kind == '*' && line.Cmd.Kind == "hi" { c.message <- Message {func(mh MessageHandler) { - debugger.OnLine(line, false) + mh.OnConnect() }} + c.reconnectWait = + time.Second / time.Duration(2 + rand.Int() % 4) + connected = true } + } - if !connected { - if line.Kind == '*' && line.Cmd.Kind == "hi" { - c.message <- Message {func(mh MessageHandler) { - mh.OnConnect() - }} - reconnectWait = - time.Second / time.Duration(2 + rand.Int() % 4) - connected = true + switch line.Kind { + case '*': + c.message <- Message {func(mh MessageHandler) { + mh.OnEvent(line.Cmd) + }} + case '!': + cb := c.activeRequests[line.RequestId] + c.message <- Message {func(mh MessageHandler) { + mh.OnResponse(line.RequestId, line.Cmd) + if cb != nil { + cb(line.Cmd) } - } - - switch line.Kind { - case '*': - c.message <- Message {func(mh MessageHandler) { - mh.OnEvent(line.Cmd) - }} - case '!': - cb := c.activeRequests[line.RequestId] - c.message <- Message {func(mh MessageHandler) { - mh.OnResponse(line.RequestId, line.Cmd) - if cb != nil { - cb(line.Cmd) - } - }} - delete(c.activeRequests, line.RequestId) - } - case debugger = <-c.debuggerAttach: - if debugger != nil { - c.message <- Message {func(mh MessageHandler) { - debugger.SetSendChan(c.send) - }} - } - case err = <-recvErr: - break run - case err = <-sendErr: - break run - case <-c.stop: - return + }} + delete(c.activeRequests, line.RequestId) } + case debugger = <-c.debuggerAttach: + if debugger != nil { + c.message <- Message {func(mh MessageHandler) { + debugger.SetSendChan(c.send) + }} + } + case err = <-recvErr: + break run + case err = <-sendErr: + break run + case <-c.stop: + return } - if connected { - c.message <- Message {func(mh MessageHandler) { - mh.OnDisconnect(err) - }} - } - time.Sleep(reconnectWait) - reconnectWait = min(reconnectWait * 2, 30 * time.Second) } + if connected { + c.message <- Message {func(mh MessageHandler) { + mh.OnDisconnect(err) + }} + } + return err } func (c *Client) Stop() { -- cgit v1.2.3