summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/server/server.go12
-rw-r--r--server/session/session.go46
2 files changed, 40 insertions, 18 deletions
diff --git a/server/server/server.go b/server/server/server.go
index 07625c4..8ec7201 100644
--- a/server/server/server.go
+++ b/server/server/server.go
@@ -28,6 +28,7 @@ func (s *server) mainLoop() {
s.onConnect(c)
case c := <-s.disconnects:
s.onDisconnect(c)
+ c.Close()
case rq := <-s.requests:
if rq.From.UserId == "" && rq.Cmd.Target != "" {
rq.ReplyInvalid()
@@ -96,9 +97,8 @@ func Serve() {
func handleConn(conn net.Conn, rq chan<- session.Request,
disconnects chan<- *session.Session) *session.Session {
recv, recvErr := proto.ReadLines(bufio.NewReader(conn))
- send := make(chan proto.Line)
- sendErr := proto.WriteLines(bufio.NewWriter(conn), send)
- s := session.NewSession(send)
+ s, out := session.NewSession()
+ sendErr := proto.WriteLines(bufio.NewWriter(conn), out)
go func() {
select {
@@ -107,11 +107,11 @@ func handleConn(conn net.Conn, rq chan<- session.Request,
log.Print("client read error: ", err)
}
case err := <-sendErr:
- log.Print("client write error: ", err)
+ if err != nil {
+ log.Print("client write error: ", err)
+ }
}
conn.Close()
- s.Close()
- close(send)
disconnects <- s
}()
diff --git a/server/session/session.go b/server/session/session.go
index b6f9e5d..df7ab65 100644
--- a/server/session/session.go
+++ b/server/session/session.go
@@ -2,27 +2,51 @@ package session
import (
"citrons.xyz/talk/proto"
- "sync/atomic"
)
type Session struct {
send chan<- proto.Line
UserId string
subscribedTo map[*Stream]bool
- disconnected atomic.Bool
+ sendError error
}
-func NewSession(send chan<- proto.Line) *Session {
+func NewSession() (*Session, <-chan proto.Line) {
var s Session
- s.send = send
s.subscribedTo = make(map[*Stream]bool)
- return &s
+ sendChan := make(chan proto.Line, 1)
+ s.send = sendChan
+ return &s, s.bufferLines(sendChan)
+}
+
+func (s *Session) bufferLines(in <-chan proto.Line) <-chan proto.Line {
+ out := make(chan proto.Line, 1)
+ go func() {
+ buf := make([]proto.Line, 0, 8)
+ for {
+ line, ok := <-in
+ if !ok {
+ return
+ }
+ buf = append(buf, line)
+ for len(buf) > 0 {
+ select {
+ case line, ok := <-in:
+ if !ok {
+ return
+ }
+ buf = append(buf, line)
+ case out <- buf[0]:
+ buf = buf[1:]
+ }
+ }
+ }
+ }()
+ return out
}
func (s *Session) Event(ev proto.Command) {
- if !s.disconnected.Load() {
- s.send <- proto.Line {'*', "", ev}
- }
+ s.send <- proto.Line {'*', "", ev}
}
func (s *Session) Subscriptions() map[*Stream]bool {
@@ -45,7 +69,7 @@ func (s *Session) Unsubscribe(to *Stream) {
}
func (s *Session) Close() {
- s.disconnected.Store(true)
+ close(s.send)
}
type Stream struct {
@@ -75,9 +99,7 @@ type Request struct {
}
func (r Request) Reply(reply proto.Command) {
- if !r.From.disconnected.Load() {
- r.From.send <- proto.Line {'!', r.RequestId, reply}
- }
+ r.From.send <- proto.Line {'!', r.RequestId, reply}
}
func (r Request) ReplyOk() {