diff options
Diffstat (limited to 'server')
| -rw-r--r-- | server/server/server.go | 12 | ||||
| -rw-r--r-- | server/session/session.go | 46 |
2 files changed, 40 insertions, 18 deletions
diff --git a/server/server/server.go b/server/server/server.go index 07625c4..8ec7201 100644 --- a/server/server/server.go +++ b/server/server/server.go @@ -28,6 +28,7 @@ func (s *server) mainLoop() { s.onConnect(c) case c := <-s.disconnects: s.onDisconnect(c) + c.Close() case rq := <-s.requests: if rq.From.UserId == "" && rq.Cmd.Target != "" { rq.ReplyInvalid() @@ -96,9 +97,8 @@ func Serve() { func handleConn(conn net.Conn, rq chan<- session.Request, disconnects chan<- *session.Session) *session.Session { recv, recvErr := proto.ReadLines(bufio.NewReader(conn)) - send := make(chan proto.Line) - sendErr := proto.WriteLines(bufio.NewWriter(conn), send) - s := session.NewSession(send) + s, out := session.NewSession() + sendErr := proto.WriteLines(bufio.NewWriter(conn), out) go func() { select { @@ -107,11 +107,11 @@ func handleConn(conn net.Conn, rq chan<- session.Request, log.Print("client read error: ", err) } case err := <-sendErr: - log.Print("client write error: ", err) + if err != nil { + log.Print("client write error: ", err) + } } conn.Close() - s.Close() - close(send) disconnects <- s }() diff --git a/server/session/session.go b/server/session/session.go index b6f9e5d..df7ab65 100644 --- a/server/session/session.go +++ b/server/session/session.go @@ -2,27 +2,51 @@ package session import ( "citrons.xyz/talk/proto" - "sync/atomic" ) type Session struct { send chan<- proto.Line UserId string subscribedTo map[*Stream]bool - disconnected atomic.Bool + sendError error } -func NewSession(send chan<- proto.Line) *Session { +func NewSession() (*Session, <-chan proto.Line) { var s Session - s.send = send s.subscribedTo = make(map[*Stream]bool) - return &s + sendChan := make(chan proto.Line, 1) + s.send = sendChan + return &s, s.bufferLines(sendChan) +} + +func (s *Session) bufferLines(in <-chan proto.Line) <-chan proto.Line { + out := make(chan proto.Line, 1) + go func() { + buf := make([]proto.Line, 0, 8) + for { + line, ok := <-in + if !ok { + return + } + buf = append(buf, line) + for len(buf) > 0 { + select { + case line, ok := <-in: + if !ok { + return + } + buf = append(buf, line) + case out <- buf[0]: + buf = buf[1:] + } + } + } + }() + return out } func (s *Session) Event(ev proto.Command) { - if !s.disconnected.Load() { - s.send <- proto.Line {'*', "", ev} - } + s.send <- proto.Line {'*', "", ev} } func (s *Session) Subscriptions() map[*Stream]bool { @@ -45,7 +69,7 @@ func (s *Session) Unsubscribe(to *Stream) { } func (s *Session) Close() { - s.disconnected.Store(true) + close(s.send) } type Stream struct { @@ -75,9 +99,7 @@ type Request struct { } func (r Request) Reply(reply proto.Command) { - if !r.From.disconnected.Load() { - r.From.send <- proto.Line {'!', r.RequestId, reply} - } + r.From.send <- proto.Line {'!', r.RequestId, reply} } func (r Request) ReplyOk() { |
