summaryrefslogtreecommitdiff
path: root/server/session/session.go
blob: bb6749af46dc764f4b85d7951dc0bf3b39c93aab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package session

import (
	"citrons.xyz/talk/proto"
)

type Session struct {
	send chan<- proto.Line
	UserId string
	PasswordAuthed bool
	subscribedTo map[*Stream]bool
	sendError error
}

func NewSession() (*Session, <-chan proto.Line) {
	var s Session
	s.subscribedTo = make(map[*Stream]bool)
	sendChan := make(chan proto.Line, 1)
	s.send = sendChan
	return &s, s.bufferLines(sendChan)
}

func (s *Session) bufferLines(in <-chan proto.Line) <-chan proto.Line {
	out := make(chan proto.Line, 1)
	go func() {
		buf := make([]proto.Line, 0, 8)
		for {
			line, ok := <-in
			if !ok {
				return
			}
			buf = append(buf, line)
			for len(buf) > 0 {
				select {
				case line, ok := <-in:
					if !ok {
						return
					}
					buf = append(buf, line)
				case out <- buf[0]:
					buf = buf[1:]
				}
			}
		}
	}()
	return out
}

func (s *Session) Event(ev proto.Command) {
	s.send <- proto.Line {'*', "", ev}
}

func (s *Session) Subscriptions() map[*Stream]bool {
	return s.subscribedTo
}

func (s *Session) Subscribe(to *Stream) {
	s.subscribedTo[to] = true
	if to.subscribers == nil {
		to.subscribers = make(map[*Session]bool)
	}
	to.subscribers[s] = true
}

func (s *Session) Unsubscribe(to *Stream) {
	delete(s.subscribedTo, to)
	if to.subscribers != nil {
		delete(to.subscribers, s)
	}
}

func (s *Session) Close() {
	close(s.send)
}

type Stream struct {
	subscribers map[*Session]bool
}

func (s *Stream) Subscribers() map[*Session]bool {
	return s.subscribers
}

func (s *Stream) Event(ev proto.Command) {
	for sub, _ := range s.subscribers {
		sub.Event(ev)
	}
}

func (s *Stream) UnsubscribeAll() {
	for sub, _ := range s.subscribers {
		sub.Unsubscribe(s)
	}
}

type Request struct {
	From *Session
	RequestId string
	Cmd proto.Command
}

func (r Request) Reply(reply proto.Command) {
	r.From.send <- proto.Line {'!', r.RequestId, reply}
}

func (r Request) ReplyOk() {
	r.Reply(proto.NewCmd("ok", ""))
}

func (r Request) ReplyInvalid() {
	r.Reply(proto.Fail{"invalid", "", nil}.Cmd())
}