package session import ( "citrons.xyz/talk/proto" ) type Session struct { send chan<- proto.Line UserId string PasswordAuthed bool subscribedTo map[*Stream]bool sendError error } func NewSession() (*Session, <-chan proto.Line) { var s Session s.subscribedTo = make(map[*Stream]bool) sendChan := make(chan proto.Line, 1) s.send = sendChan return &s, s.bufferLines(sendChan) } func (s *Session) bufferLines(in <-chan proto.Line) <-chan proto.Line { out := make(chan proto.Line, 1) go func() { buf := make([]proto.Line, 0, 8) for { line, ok := <-in if !ok { return } buf = append(buf, line) for len(buf) > 0 { select { case line, ok := <-in: if !ok { return } buf = append(buf, line) case out <- buf[0]: buf = buf[1:] } } } }() return out } func (s *Session) Event(ev proto.Command) { s.send <- proto.Line {'*', "", ev} } func (s *Session) Subscriptions() map[*Stream]bool { return s.subscribedTo } func (s *Session) Subscribe(to *Stream) { s.subscribedTo[to] = true if to.subscribers == nil { to.subscribers = make(map[*Session]bool) } to.subscribers[s] = true } func (s *Session) Unsubscribe(to *Stream) { delete(s.subscribedTo, to) if to.subscribers != nil { delete(to.subscribers, s) } } func (s *Session) Close() { close(s.send) } type Stream struct { subscribers map[*Session]bool } func (s *Stream) Subscribers() map[*Session]bool { return s.subscribers } func (s *Stream) Event(ev proto.Command) { for sub, _ := range s.subscribers { sub.Event(ev) } } func (s *Stream) UnsubscribeAll() { for sub, _ := range s.subscribers { sub.Unsubscribe(s) } } type Request struct { From *Session RequestId string Cmd proto.Command } func (r Request) Reply(reply proto.Command) { r.From.send <- proto.Line {'!', r.RequestId, reply} } func (r Request) ReplyOk() { r.Reply(proto.NewCmd("ok", "")) } func (r Request) ReplyInvalid() { r.Reply(proto.Fail{"invalid", "", nil}.Cmd()) }