package session import ( "citrons.xyz/talk/proto" "sync/atomic" ) type Session struct { send chan<- proto.Line UserId string subscribedTo map[*Stream]bool disconnected atomic.Bool } func NewSession(send chan<- proto.Line) *Session { var s Session s.send = send s.subscribedTo = make(map[*Stream]bool) return &s } func (s *Session) Event(ev proto.Command) { if !s.disconnected.Load() { s.send <- proto.Line {'*', "", ev} } } func (s *Session) Subscriptions() map[*Stream]bool { return s.subscribedTo } func (s *Session) Subscribe(to *Stream) { s.subscribedTo[to] = true if to.subscribers == nil { to.subscribers = make(map[*Session]bool) } to.subscribers[s] = true } func (s *Session) Unsubscribe(to *Stream) { delete(s.subscribedTo, to) if to.subscribers != nil { delete(to.subscribers, s) } } func (s *Session) Close() { s.disconnected.Store(true) } type Stream struct { subscribers map[*Session]bool } func (s *Stream) Subscribers() map[*Session]bool { return s.subscribers } func (s *Stream) Event(ev proto.Command) { for sub, _ := range s.subscribers { sub.Event(ev) } } func (s *Stream) UnsubscribeAll() { for sub, _ := range s.subscribers { sub.Unsubscribe(s) } } type Request struct { From *Session RequestId string Cmd proto.Command } func (r Request) Reply(reply proto.Command) { if !r.From.disconnected.Load() { r.From.send <- proto.Line {'!', r.RequestId, reply} } } func (r Request) ReplyOk() { r.Reply(proto.NewCmd("ok", "")) } func (r Request) ReplyInvalid() { r.Reply(proto.Fail{"invalid", "", nil}.Cmd()) }