package channel import ( "citrons.xyz/talk/proto" "citrons.xyz/talk/server/object" "citrons.xyz/talk/server/session" "citrons.xyz/talk/server/validate" "citrons.xyz/talk/server/user" "strings" "strconv" "slices" "sort" "maps" "bytes" "bufio" bolt "go.etcd.io/bbolt" "log" ) type ChannelKind struct { world *object.World db *bolt.DB } type Channel struct { kind *ChannelKind id string name string isDirect bool Stream session.Stream } func Kind(world *object.World) *ChannelKind { return &ChannelKind {world, world.DB()} } func (cs *ChannelKind) CreateChannel(name string) (*Channel, *proto.Fail) { if cs.ByName(name) != nil { return nil, &proto.Fail { "name-taken", "", map[string]string {"": name}, } } if !validate.Name(name) { return nil, &proto.Fail { "invalid-name", "", map[string]string {"": name}, } } var c Channel c.kind = cs c.name = name c.id = proto.GenId() c.SetDefaultMembership(DefaultMembership) c.Save() return &c, nil } func DirectHandle(among map[string]bool) string { return strings.Join(slices.Sorted(maps.Keys(among)), "\x00") } func (cs *ChannelKind) GetDirect(among map[string]bool) *Channel { handle := DirectHandle(among) switch ch := cs.world.Lookup("direct-channel", handle).(type) { case *Channel: return ch } var c Channel c.isDirect = true c.kind = cs c.id = proto.GenId() for member, _ := range among { c.SetMembership(member, DefaultMembership) } c.Save() return &c } func (cs *ChannelKind) UserChannels(uid string) map[string]bool { result := make(map[string]bool) err := cs.db.View(func(tx *bolt.Tx) error { channels := tx.Bucket([]byte("user channels")) if channels == nil { return nil } user := channels.Bucket([]byte(uid)) if user == nil { return nil } return user.ForEach(func(k, v []byte) error { result[string(k)] = true return nil }) }) if err != nil { log.Fatal("error updating database: ", err) } return result } func (cs *ChannelKind) ByName(name string) *Channel { switch ch := cs.world.Lookup("channel", name).(type) { case *Channel: return ch default: return nil } } func (cs *ChannelKind) Undata(o proto.Object) object.Object { var c Channel c.kind = cs c.id = o.Id c.name = o.Fields[""] c.isDirect = o.Kind == "direct-channel" return &c } func (c *Channel) Name() string { return c.NameFor("") } func (c *Channel) NameFor(uid string) string { if !c.isDirect { return c.name } else { var members []string for member, _ := range c.Members() { if member == uid && len(c.Members()) > 1 { continue } u := c.kind.world.GetObject(member) if u != nil { members = append(members, u.InfoFor(uid).Fields[""]) } } sort.Strings(members) return strings.Join(members, ", ") } } func (c *Channel) Handle() string { if !c.isDirect { return c.name } else { members := make(map[string]bool) for member := range c.Members() { members[member] = true } return DirectHandle(members) } } func (c *Channel) Data() proto.Object { return c.InfoFor("") } func (c *Channel) Save() { c.kind.world.PutObject(c.id, c) } func (c *Channel) Id() string { return c.id } func (c *Channel) Rename(name string) *proto.Fail { if !validate.Name(name) { return &proto.Fail { "invalid-name", "", map[string]string {"": name}, } } if c.kind.ByName(name) != nil { return &proto.Fail { "name-taken", "", map[string]string {"": name}, } } c.name = name c.Save() return nil } func (c *Channel) Put(m proto.Object, From *session.Session) proto.Object { m.Id = proto.GenId() m.Fields["t"] = proto.Timestamp() err := c.kind.db.Update(func(tx *bolt.Tx) error { history, _ := tx.CreateBucketIfNotExists([]byte("message history")) channel, _ := history.CreateBucketIfNotExists([]byte(c.id)) ids, _ := channel.CreateBucketIfNotExists([]byte("ids")) key := []byte(strconv.Itoa(int(channel.Sequence()))) _, err := channel.NextSequence() if err != nil { return err } var buf bytes.Buffer writer := bufio.NewWriter(&buf) proto.WriteObject(writer, m) writer.Flush() err = channel.Put(key, buf.Bytes()) if err != nil { return err } return ids.Put([]byte(m.Id), key) }) if err != nil { log.Fatal("error updating database: ", err) } for s, _ := range c.Stream.Subscribers() { if From == s { continue } if c.GetMembership(s.UserId).See { s.Event(proto.NewCmd("p", c.id, m)) } } return m } func (c *Channel) HistorySize() int { var size int err := c.kind.db.View(func(tx *bolt.Tx) error { history := tx.Bucket([]byte("message history")) if history == nil { return nil } channel := history.Bucket([]byte(c.id)) if channel == nil { return nil } size = int(channel.Sequence()) return nil }) if err != nil { log.Fatal("error reading database: ", err) } return size } func (c *Channel) MessageIndex(mid string) (index int, ok bool) { err := c.kind.db.View(func(tx *bolt.Tx) error { history := tx.Bucket([]byte("message history")) if history == nil { return nil } channel := history.Bucket([]byte(c.id)) if channel == nil { return nil } ids := channel.Bucket([]byte("ids")) data := ids.Get([]byte(mid)) ok = data != nil index, _ = strconv.Atoi(string(data)) return nil }) if err != nil { log.Fatal("error reading database: ", err) } return index, ok } func (c *Channel) History(min, max int) []proto.Object { var result []proto.Object err := c.kind.db.View(func(tx *bolt.Tx) error { history := tx.Bucket([]byte("message history")) channel := history.Bucket([]byte(c.id)) for index := min; index < max; index++ { data := channel.Get([]byte(strconv.Itoa(index))) if data == nil { continue } m, err := proto.ReadObject(bufio.NewReader(bytes.NewReader(data))) if err != nil { panic(err) } result = append(result, m) } return nil }) if err != nil { log.Fatal("error reading database: ", err) } return result } func (c *Channel) Join(u *user.User) *proto.Fail { if c.isDirect { return &proto.Fail {"invalid", "", nil} } if c.GetMembership(u.Id()).Yes { return nil } c.SetMembership(u.Id(), c.GetDefaultMembership()) c.Put(proto.Object{"join", "", map[string]string {"f": u.Id()}}, nil) return nil } func (c *Channel) Leave(u *user.User) *proto.Fail { if c.isDirect { return &proto.Fail {"invalid", "", nil} } if !c.GetMembership(u.Id()).Yes { return nil } c.SetMembership(u.Id(), Membership {Yes: false}) c.Put(proto.Object{"leave", "", map[string]string {"f": u.Id()}}, nil) return nil } func (c *Channel) Members() map[string]Membership { result := make(map[string]Membership) err := c.kind.db.View(func(tx *bolt.Tx) error { channels := tx.Bucket([]byte("channel membership")) if channels == nil { return nil } members := channels.Bucket([]byte(c.id)) if members == nil { return nil } return members.ForEach(func(k, v []byte) error { var mship Membership o, _ := proto.ReadObject(bufio.NewReader(bytes.NewReader(v))) result[string(k)], _ = mship.Change(o) return nil }) }) if err != nil { log.Fatal("error updating database: ", err) } return result } func (c *Channel) GetMembership(uid string) Membership { var mship Membership err := c.kind.db.View(func(tx *bolt.Tx) error { channels := tx.Bucket([]byte("channel membership")) if channels == nil { return nil } members := channels.Bucket([]byte(c.id)) if members == nil { return nil } data := members.Get([]byte(uid)) if data != nil { o, _ := proto.ReadObject(bufio.NewReader(bytes.NewReader(data))) mship.Undata(o) } return nil }) if err != nil { log.Fatal("error updating database: ", err) } return mship } func (c *Channel) SetMembership(uid string, m Membership) { err := c.kind.db.Update(func(tx *bolt.Tx) error { channels, _ := tx.CreateBucketIfNotExists([]byte("channel membership")) members, _ := channels.CreateBucketIfNotExists([]byte(c.id)) userChans, _ := tx.CreateBucketIfNotExists([]byte("user channels")) directChans, _ := tx.CreateBucketIfNotExists([]byte("direct channels")) var user *bolt.Bucket if !c.isDirect { user, _ = userChans.CreateBucketIfNotExists([]byte(uid)) } else { user, _ = directChans.CreateBucketIfNotExists([]byte(uid)) } if m.Yes { err := user.Put([]byte(c.id), []byte("yes")) if err != nil { return err } var buf bytes.Buffer writer := bufio.NewWriter(&buf) proto.WriteObject(writer, m.GetInfo()) writer.Flush() return members.Put([]byte(uid), buf.Bytes()) } else { user.Delete([]byte(c.id)) return members.Delete([]byte(uid)) } }) if err != nil { log.Fatal("error updating database: ", err) } } func (c *Channel) GetDefaultMembership() Membership { return DefaultMembership } func (c *Channel) SetDefaultMembership(m Membership) { } func (c *Channel) Delete() { c.Stream.Event(proto.NewCmd("delete", c.id)) c.Stream.UnsubscribeAll() deleted := object.Tombstone { c.id, map[string]string {"": c.name, "kind": c.Kind()}, } c.kind.world.PutObject(c.id, deleted) err := c.kind.db.Update(func(tx *bolt.Tx) error { channels := tx.Bucket([]byte("channel membership")) return channels.DeleteBucket([]byte(c.id)) }) if err != nil { log.Fatal("error updating database: ", err) } } func (c *Channel) Kind() string { switch { case c.isDirect: return "direct-channel" default: return "channel" } } func (c *Channel) InfoFor(uid string) proto.Object { return proto.Object { c.Kind(), c.id, map[string]string {"": c.NameFor(uid)}, } }