summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcitrons <citrons@mondecitronne.com>2025-06-07 17:43:51 -0500
committercitrons <citrons@mondecitronne.com>2025-06-07 17:43:51 -0500
commit24b15604403341044fcb5f7ece18ffc9c569a2cf (patch)
tree6789d0c3e4da2fcd26622171f7418033e97143e7
parent4fd84671877754ae3b66acb8a8af8cf0935a53c9 (diff)
more robust connection handling
-rw-r--r--client/client/client.go42
-rw-r--r--server/server/server.go12
-rw-r--r--server/session/session.go46
3 files changed, 67 insertions, 33 deletions
diff --git a/client/client/client.go b/client/client/client.go
index 7c9cf03..0c563bf 100644
--- a/client/client/client.go
+++ b/client/client/client.go
@@ -45,19 +45,18 @@ func New(address string) Client {
}
func (c *Client) RunClient() {
- sleep := time.Second / 4
+ reconnectWait := time.Second / 4
for {
- conn, err := net.Dial("tcp", c.Address)
+ conn, err := net.DialTimeout("tcp", c.Address, 30 * time.Second)
if err != nil {
c.message <- Message {func(mh MessageHandler) {
mh.OnDisconnect(err)
}}
- time.Sleep(sleep)
- sleep = min(sleep * 2, 30 * time.Second)
+ time.Sleep(reconnectWait)
+ reconnectWait = min(reconnectWait * 2, 30 * time.Second)
continue
}
defer conn.Close()
- sleep = time.Second / time.Duration(2 + rand.Int() % 4)
recv, recvErr := proto.ReadLines(bufio.NewReader(conn))
send := make(chan proto.Line, 1)
@@ -88,11 +87,10 @@ func (c *Client) RunClient() {
}()
defer close(c.send)
- c.message <- Message {func(mh MessageHandler) {
- mh.OnConnect()
- }}
-
- var debugger Debugger
+ var (
+ debugger Debugger
+ connected bool
+ )
run: for {
select {
case line := <-recv:
@@ -101,6 +99,18 @@ func (c *Client) RunClient() {
debugger.OnLine(line, false)
}}
}
+
+ if !connected {
+ if line.Kind == '*' && line.Cmd.Kind == "hi" {
+ c.message <- Message {func(mh MessageHandler) {
+ mh.OnConnect()
+ }}
+ reconnectWait =
+ time.Second / time.Duration(2 + rand.Int() % 4)
+ connected = true
+ }
+ }
+
switch line.Kind {
case '*':
c.message <- Message {func(mh MessageHandler) {
@@ -130,11 +140,13 @@ func (c *Client) RunClient() {
return
}
}
- c.message <- Message {func(mh MessageHandler) {
- mh.OnDisconnect(err)
- }}
- time.Sleep(sleep)
- sleep = min(sleep * 2, 30 * time.Second)
+ if connected {
+ c.message <- Message {func(mh MessageHandler) {
+ mh.OnDisconnect(err)
+ }}
+ }
+ time.Sleep(reconnectWait)
+ reconnectWait = min(reconnectWait * 2, 30 * time.Second)
}
}
diff --git a/server/server/server.go b/server/server/server.go
index 07625c4..8ec7201 100644
--- a/server/server/server.go
+++ b/server/server/server.go
@@ -28,6 +28,7 @@ func (s *server) mainLoop() {
s.onConnect(c)
case c := <-s.disconnects:
s.onDisconnect(c)
+ c.Close()
case rq := <-s.requests:
if rq.From.UserId == "" && rq.Cmd.Target != "" {
rq.ReplyInvalid()
@@ -96,9 +97,8 @@ func Serve() {
func handleConn(conn net.Conn, rq chan<- session.Request,
disconnects chan<- *session.Session) *session.Session {
recv, recvErr := proto.ReadLines(bufio.NewReader(conn))
- send := make(chan proto.Line)
- sendErr := proto.WriteLines(bufio.NewWriter(conn), send)
- s := session.NewSession(send)
+ s, out := session.NewSession()
+ sendErr := proto.WriteLines(bufio.NewWriter(conn), out)
go func() {
select {
@@ -107,11 +107,11 @@ func handleConn(conn net.Conn, rq chan<- session.Request,
log.Print("client read error: ", err)
}
case err := <-sendErr:
- log.Print("client write error: ", err)
+ if err != nil {
+ log.Print("client write error: ", err)
+ }
}
conn.Close()
- s.Close()
- close(send)
disconnects <- s
}()
diff --git a/server/session/session.go b/server/session/session.go
index b6f9e5d..df7ab65 100644
--- a/server/session/session.go
+++ b/server/session/session.go
@@ -2,27 +2,51 @@ package session
import (
"citrons.xyz/talk/proto"
- "sync/atomic"
)
type Session struct {
send chan<- proto.Line
UserId string
subscribedTo map[*Stream]bool
- disconnected atomic.Bool
+ sendError error
}
-func NewSession(send chan<- proto.Line) *Session {
+func NewSession() (*Session, <-chan proto.Line) {
var s Session
- s.send = send
s.subscribedTo = make(map[*Stream]bool)
- return &s
+ sendChan := make(chan proto.Line, 1)
+ s.send = sendChan
+ return &s, s.bufferLines(sendChan)
+}
+
+func (s *Session) bufferLines(in <-chan proto.Line) <-chan proto.Line {
+ out := make(chan proto.Line, 1)
+ go func() {
+ buf := make([]proto.Line, 0, 8)
+ for {
+ line, ok := <-in
+ if !ok {
+ return
+ }
+ buf = append(buf, line)
+ for len(buf) > 0 {
+ select {
+ case line, ok := <-in:
+ if !ok {
+ return
+ }
+ buf = append(buf, line)
+ case out <- buf[0]:
+ buf = buf[1:]
+ }
+ }
+ }
+ }()
+ return out
}
func (s *Session) Event(ev proto.Command) {
- if !s.disconnected.Load() {
- s.send <- proto.Line {'*', "", ev}
- }
+ s.send <- proto.Line {'*', "", ev}
}
func (s *Session) Subscriptions() map[*Stream]bool {
@@ -45,7 +69,7 @@ func (s *Session) Unsubscribe(to *Stream) {
}
func (s *Session) Close() {
- s.disconnected.Store(true)
+ close(s.send)
}
type Stream struct {
@@ -75,9 +99,7 @@ type Request struct {
}
func (r Request) Reply(reply proto.Command) {
- if !r.From.disconnected.Load() {
- r.From.send <- proto.Line {'!', r.RequestId, reply}
- }
+ r.From.send <- proto.Line {'!', r.RequestId, reply}
}
func (r Request) ReplyOk() {