summaryrefslogtreecommitdiff
path: root/server/session/session.go
diff options
context:
space:
mode:
authorcitrons <citrons@mondecitronne.com>2025-06-07 17:43:51 -0500
committercitrons <citrons@mondecitronne.com>2025-06-07 17:43:51 -0500
commit24b15604403341044fcb5f7ece18ffc9c569a2cf (patch)
tree6789d0c3e4da2fcd26622171f7418033e97143e7 /server/session/session.go
parent4fd84671877754ae3b66acb8a8af8cf0935a53c9 (diff)
more robust connection handling
Diffstat (limited to 'server/session/session.go')
-rw-r--r--server/session/session.go46
1 files changed, 34 insertions, 12 deletions
diff --git a/server/session/session.go b/server/session/session.go
index b6f9e5d..df7ab65 100644
--- a/server/session/session.go
+++ b/server/session/session.go
@@ -2,27 +2,51 @@ package session
import (
"citrons.xyz/talk/proto"
- "sync/atomic"
)
type Session struct {
send chan<- proto.Line
UserId string
subscribedTo map[*Stream]bool
- disconnected atomic.Bool
+ sendError error
}
-func NewSession(send chan<- proto.Line) *Session {
+func NewSession() (*Session, <-chan proto.Line) {
var s Session
- s.send = send
s.subscribedTo = make(map[*Stream]bool)
- return &s
+ sendChan := make(chan proto.Line, 1)
+ s.send = sendChan
+ return &s, s.bufferLines(sendChan)
+}
+
+func (s *Session) bufferLines(in <-chan proto.Line) <-chan proto.Line {
+ out := make(chan proto.Line, 1)
+ go func() {
+ buf := make([]proto.Line, 0, 8)
+ for {
+ line, ok := <-in
+ if !ok {
+ return
+ }
+ buf = append(buf, line)
+ for len(buf) > 0 {
+ select {
+ case line, ok := <-in:
+ if !ok {
+ return
+ }
+ buf = append(buf, line)
+ case out <- buf[0]:
+ buf = buf[1:]
+ }
+ }
+ }
+ }()
+ return out
}
func (s *Session) Event(ev proto.Command) {
- if !s.disconnected.Load() {
- s.send <- proto.Line {'*', "", ev}
- }
+ s.send <- proto.Line {'*', "", ev}
}
func (s *Session) Subscriptions() map[*Stream]bool {
@@ -45,7 +69,7 @@ func (s *Session) Unsubscribe(to *Stream) {
}
func (s *Session) Close() {
- s.disconnected.Store(true)
+ close(s.send)
}
type Stream struct {
@@ -75,9 +99,7 @@ type Request struct {
}
func (r Request) Reply(reply proto.Command) {
- if !r.From.disconnected.Load() {
- r.From.send <- proto.Line {'!', r.RequestId, reply}
- }
+ r.From.send <- proto.Line {'!', r.RequestId, reply}
}
func (r Request) ReplyOk() {