1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
package session
import (
"citrons.xyz/talk/proto"
)
type Session struct {
send chan<- proto.Line
UserId string
subscribedTo map[*Stream]bool
sendError error
}
func NewSession() (*Session, <-chan proto.Line) {
var s Session
s.subscribedTo = make(map[*Stream]bool)
sendChan := make(chan proto.Line, 1)
s.send = sendChan
return &s, s.bufferLines(sendChan)
}
func (s *Session) bufferLines(in <-chan proto.Line) <-chan proto.Line {
out := make(chan proto.Line, 1)
go func() {
buf := make([]proto.Line, 0, 8)
for {
line, ok := <-in
if !ok {
return
}
buf = append(buf, line)
for len(buf) > 0 {
select {
case line, ok := <-in:
if !ok {
return
}
buf = append(buf, line)
case out <- buf[0]:
buf = buf[1:]
}
}
}
}()
return out
}
func (s *Session) Event(ev proto.Command) {
s.send <- proto.Line {'*', "", ev}
}
func (s *Session) Subscriptions() map[*Stream]bool {
return s.subscribedTo
}
func (s *Session) Subscribe(to *Stream) {
s.subscribedTo[to] = true
if to.subscribers == nil {
to.subscribers = make(map[*Session]bool)
}
to.subscribers[s] = true
}
func (s *Session) Unsubscribe(to *Stream) {
delete(s.subscribedTo, to)
if to.subscribers != nil {
delete(to.subscribers, s)
}
}
func (s *Session) Close() {
close(s.send)
}
type Stream struct {
subscribers map[*Session]bool
}
func (s *Stream) Subscribers() map[*Session]bool {
return s.subscribers
}
func (s *Stream) Event(ev proto.Command) {
for sub, _ := range s.subscribers {
sub.Event(ev)
}
}
func (s *Stream) UnsubscribeAll() {
for sub, _ := range s.subscribers {
sub.Unsubscribe(s)
}
}
type Request struct {
From *Session
RequestId string
Cmd proto.Command
}
func (r Request) Reply(reply proto.Command) {
r.From.send <- proto.Line {'!', r.RequestId, reply}
}
func (r Request) ReplyOk() {
r.Reply(proto.NewCmd("ok", ""))
}
func (r Request) ReplyInvalid() {
r.Reply(proto.Fail{"invalid", "", nil}.Cmd())
}
|