package channel import ( "citrons.xyz/talk/proto" "citrons.xyz/talk/server/object" "citrons.xyz/talk/server/session" "citrons.xyz/talk/server/validate" "citrons.xyz/talk/server/user" "strings" "slices" "sort" "maps" "bytes" "bufio" bolt "go.etcd.io/bbolt" "log" ) type ChannelKind struct { world *object.World db *bolt.DB } type Channel struct { kind *ChannelKind id string name string isDirect bool Stream session.Stream byId map[string]int messages []proto.Object } func Kind(world *object.World) *ChannelKind { return &ChannelKind {world, world.DB()} } func (cs *ChannelKind) CreateChannel(name string) (*Channel, *proto.Fail) { if cs.ByName(name) != nil { return nil, &proto.Fail { "name-taken", "", map[string]string {"": name}, } } if !validate.Name(name) { return nil, &proto.Fail { "invalid-name", "", map[string]string {"": name}, } } var c Channel c.kind = cs c.name = name c.id = proto.GenId() c.byId = make(map[string]int) err := cs.db.Update(func(tx *bolt.Tx) error { chm, _ := tx.CreateBucketIfNotExists([]byte("channel membership")) chm.CreateBucket([]byte(c.id)) tx.CreateBucketIfNotExists([]byte("user channels")) return nil }) if err != nil { log.Fatal("error updating database: ", err) } c.SetDefaultMembership(DefaultMembership) c.Save() return &c, nil } func DirectHandle(among map[string]bool) string { return strings.Join(slices.Sorted(maps.Keys(among)), "\x00") } func (cs *ChannelKind) UserChannels(uid string) map[string]bool { result := make(map[string]bool) err := cs.db.View(func(tx *bolt.Tx) error { channels := tx.Bucket([]byte("user channels")) if channels == nil { return nil } user := channels.Bucket([]byte(uid)) if user == nil { return nil } return user.ForEach(func(k, v []byte) error { result[string(k)] = true return nil }) }) if err != nil { log.Fatal("error updating database: ", err) } return result } func (cs *ChannelKind) GetDirect(among map[string]bool) *Channel { handle := DirectHandle(among) switch ch := cs.world.Lookup("direct-channel", handle).(type) { case *Channel: return ch } var c Channel c.isDirect = true c.kind = cs c.id = proto.GenId() c.byId = make(map[string]int) for member, _ := range among { c.SetMembership(member, DefaultMembership) } c.Save() return &c } func (cs *ChannelKind) ByName(name string) *Channel { switch ch := cs.world.Lookup("channel", name).(type) { case *Channel: return ch default: return nil } } func (cs *ChannelKind) Undata(o proto.Object) object.Object { log.Println("load: ", o) var c Channel c.kind = cs c.id = o.Id c.name = o.Fields[""] c.isDirect = o.Kind == "direct-channel" c.byId = make(map[string]int) return &c } func (c *Channel) Name() string { return c.NameFor("") } func (c *Channel) NameFor(uid string) string { if !c.isDirect { return c.name } else { var members []string for member, _ := range c.Members() { if member == uid && len(c.Members()) > 1 { continue } u := c.kind.world.GetObject(member) if u != nil { members = append(members, u.InfoFor(uid).Fields[""]) } } sort.Strings(members) return strings.Join(members, ", ") } } func (c *Channel) Handle() string { if !c.isDirect { return c.name } else { members := make(map[string]bool) for member := range c.Members() { members[member] = true } return DirectHandle(members) } } func (c *Channel) Data() proto.Object { log.Println("save: ", c.InfoFor("")) return c.InfoFor("") } func (c *Channel) Save() { c.kind.world.PutObject(c.id, c) } func (c *Channel) Id() string { return c.id } func (c *Channel) Rename(name string) *proto.Fail { if !validate.Name(name) { return &proto.Fail { "invalid-name", "", map[string]string {"": name}, } } if c.kind.ByName(name) != nil { return &proto.Fail { "name-taken", "", map[string]string {"": name}, } } c.name = name c.Save() return nil } func (c *Channel) Put(m proto.Object) proto.Object { m.Id = proto.GenId() m.Fields["t"] = proto.Timestamp() c.byId[m.Id] = len(c.messages) c.messages = append(c.messages, m) for s, _ := range c.Stream.Subscribers() { if m.Fields["f"] == s.UserId { continue } if c.GetMembership(s.UserId).See { s.Event(proto.NewCmd("p", c.id, m)) } } return m } func (c *Channel) Join(u *user.User) *proto.Fail { if c.GetMembership(u.Id()).Yes { return nil } c.SetMembership(u.Id(), c.GetDefaultMembership()) c.Put(proto.Object{"join", "", map[string]string {"f": u.Id()}}) return nil } func (c *Channel) Leave(u *user.User) *proto.Fail { if !c.GetMembership(u.Id()).Yes { return nil } c.SetMembership(u.Id(), Membership {Yes: false}) c.Put(proto.Object{"leave", "", map[string]string {"f": u.Id()}}) return nil } func (c *Channel) Members() map[string]Membership { result := make(map[string]Membership) err := c.kind.db.View(func(tx *bolt.Tx) error { channels := tx.Bucket([]byte("channel membership")) members := channels.Bucket([]byte(c.id)) return members.ForEach(func(k, v []byte) error { var mship Membership o, _ := proto.ReadObject(bufio.NewReader(bytes.NewReader(v))) result[string(k)], _ = mship.Change(o) return nil }) }) if err != nil { log.Fatal("error updating database: ", err) } return result } func (c *Channel) GetMembership(uid string) Membership { var mship Membership err := c.kind.db.View(func(tx *bolt.Tx) error { channels := tx.Bucket([]byte("channel membership")) members := channels.Bucket([]byte(c.id)) data := members.Get([]byte(uid)) if data != nil { o, _ := proto.ReadObject(bufio.NewReader(bytes.NewReader(data))) mship.Undata(o) } return nil }) if err != nil { log.Fatal("error updating database: ", err) } return mship } func (c *Channel) SetMembership(uid string, m Membership) { err := c.kind.db.Update(func(tx *bolt.Tx) error { channels := tx.Bucket([]byte("channel membership")) userChannels := tx.Bucket([]byte("user channels")) members, _ := channels.CreateBucketIfNotExists([]byte(c.id)) user, _ := userChannels.CreateBucketIfNotExists([]byte(uid)) if m.Yes { err := user.Put([]byte(c.id), []byte("yes")) if err != nil { return err } var buf bytes.Buffer writer := bufio.NewWriter(&buf) proto.WriteObject(writer, m.GetInfo()) writer.Flush() return members.Put([]byte(uid), buf.Bytes()) } else { user.Delete([]byte(c.id)) return members.Delete([]byte(uid)) } }) if err != nil { log.Fatal("error updating database: ", err) } } func (c *Channel) GetDefaultMembership() Membership { return DefaultMembership } func (c *Channel) SetDefaultMembership(m Membership) { } func (c *Channel) Delete() { c.Stream.Event(proto.NewCmd("delete", c.id)) c.Stream.UnsubscribeAll() deleted := object.Tombstone { c.id, map[string]string {"": c.name, "kind": c.Kind()}, } c.kind.world.PutObject(c.id, deleted) err := c.kind.db.Update(func(tx *bolt.Tx) error { channels := tx.Bucket([]byte("channel membership")) return channels.DeleteBucket([]byte(c.id)) }) if err != nil { log.Fatal("error updating database: ", err) } } func (c *Channel) Kind() string { switch { case c.isDirect: return "direct-channel" default: return "channel" } } func (c *Channel) InfoFor(uid string) proto.Object { return proto.Object { c.Kind(), c.id, map[string]string {"": c.NameFor(uid)}, } }