diff options
| author | citrons <citrons@mondecitronne.com> | 2025-06-07 17:43:51 -0500 |
|---|---|---|
| committer | citrons <citrons@mondecitronne.com> | 2025-06-07 17:43:51 -0500 |
| commit | 24b15604403341044fcb5f7ece18ffc9c569a2cf (patch) | |
| tree | 6789d0c3e4da2fcd26622171f7418033e97143e7 /server/session/session.go | |
| parent | 4fd84671877754ae3b66acb8a8af8cf0935a53c9 (diff) | |
more robust connection handling
Diffstat (limited to 'server/session/session.go')
| -rw-r--r-- | server/session/session.go | 46 |
1 files changed, 34 insertions, 12 deletions
diff --git a/server/session/session.go b/server/session/session.go index b6f9e5d..df7ab65 100644 --- a/server/session/session.go +++ b/server/session/session.go @@ -2,27 +2,51 @@ package session import ( "citrons.xyz/talk/proto" - "sync/atomic" ) type Session struct { send chan<- proto.Line UserId string subscribedTo map[*Stream]bool - disconnected atomic.Bool + sendError error } -func NewSession(send chan<- proto.Line) *Session { +func NewSession() (*Session, <-chan proto.Line) { var s Session - s.send = send s.subscribedTo = make(map[*Stream]bool) - return &s + sendChan := make(chan proto.Line, 1) + s.send = sendChan + return &s, s.bufferLines(sendChan) +} + +func (s *Session) bufferLines(in <-chan proto.Line) <-chan proto.Line { + out := make(chan proto.Line, 1) + go func() { + buf := make([]proto.Line, 0, 8) + for { + line, ok := <-in + if !ok { + return + } + buf = append(buf, line) + for len(buf) > 0 { + select { + case line, ok := <-in: + if !ok { + return + } + buf = append(buf, line) + case out <- buf[0]: + buf = buf[1:] + } + } + } + }() + return out } func (s *Session) Event(ev proto.Command) { - if !s.disconnected.Load() { - s.send <- proto.Line {'*', "", ev} - } + s.send <- proto.Line {'*', "", ev} } func (s *Session) Subscriptions() map[*Stream]bool { @@ -45,7 +69,7 @@ func (s *Session) Unsubscribe(to *Stream) { } func (s *Session) Close() { - s.disconnected.Store(true) + close(s.send) } type Stream struct { @@ -75,9 +99,7 @@ type Request struct { } func (r Request) Reply(reply proto.Command) { - if !r.From.disconnected.Load() { - r.From.send <- proto.Line {'!', r.RequestId, reply} - } + r.From.send <- proto.Line {'!', r.RequestId, reply} } func (r Request) ReplyOk() { |
