aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorubq323 <ubq323@ubq323.website>2022-08-10 20:45:03 +0100
committerubq323 <ubq323@ubq323.website>2022-08-10 20:45:03 +0100
commit6e3e9b510368a91d523b15732295d241cdd8cecc (patch)
treebdc6ae37519164f29b247b6674e1b23a6a00a339
parent32bbc59c5b57c13b1433ad212b027a3ac694ac24 (diff)
refactor
-rw-r--r--apioforum/__init__.py1
-rw-r--r--apioforum/forum.py194
-rw-r--r--apioforum/orm.py30
-rw-r--r--apioforum/templates/view_forum.html3
-rw-r--r--apioforum/thread.py54
5 files changed, 157 insertions, 125 deletions
diff --git a/apioforum/__init__.py b/apioforum/__init__.py
index a7d70c0..9c85567 100644
--- a/apioforum/__init__.py
+++ b/apioforum/__init__.py
@@ -53,7 +53,6 @@ def create_app():
p += "?" + request.query_string.decode("utf-8")
return dict(path_for_next=p)
- app.jinja_env.globals.update(forum_path=forum.forum_path)
app.jinja_env.globals.update(post_jump=thread.post_jump)
from .roles import has_permission, is_bureaucrat, get_user_role
app.jinja_env.globals.update(
diff --git a/apioforum/forum.py b/apioforum/forum.py
index 988c9a5..f88900f 100644
--- a/apioforum/forum.py
+++ b/apioforum/forum.py
@@ -8,6 +8,7 @@ from flask import (
)
from .db import get_db
+from .orm import DBObj
from .mdrender import render
from .roles import get_forum_roles,has_permission,is_bureaucrat,get_user_role, permissions as role_permissions
from .permissions import is_admin
@@ -16,6 +17,15 @@ import datetime
import math
import functools
+class Forum(DBObj,table="forums"):
+ fields = ["id","name","parent","description","unlisted"]
+
+ def avail_tags(self):
+ db = get_db()
+ tags = db.execute("select * from tags where forum = ?",(self.id,)).fetchall()
+ return tags
+
+
THREADS_PER_PAGE = 35
bp = Blueprint("forum", __name__, url_prefix="/")
@@ -24,30 +34,6 @@ bp = Blueprint("forum", __name__, url_prefix="/")
def not_actual_index():
return redirect("/1")
-def get_avail_tags(forum_id):
- db = get_db()
- tags = db.execute("""
- WITH RECURSIVE fs AS
- (SELECT * FROM forums WHERE id = ?
- UNION ALL
- SELECT forums.* FROM forums, fs WHERE fs.parent=forums.id)
- SELECT * FROM tags
- WHERE tags.forum in (SELECT id FROM fs)
- ORDER BY id;
- """,(forum_id,)).fetchall()
- return tags
-
-def forum_path(forum_id):
- db = get_db()
- ancestors = db.execute("""
- WITH RECURSIVE fs AS
- (SELECT * FROM forums WHERE id = ?
- UNION ALL
- SELECT forums.* FROM forums, fs WHERE fs.parent=forums.id)
- SELECT * FROM fs;
- """,(forum_id,)).fetchall()
- ancestors.reverse()
- return ancestors
def forum_route(relative_path, pagination=False, **kwargs):
def decorator(f):
@@ -58,9 +44,7 @@ def forum_route(relative_path, pagination=False, **kwargs):
@bp.route(path, **kwargs)
@functools.wraps(f)
def wrapper(forum_id, *args, **kwargs):
- db = get_db()
- forum = db.execute("SELECT * FROM forums WHERE id = ?",
- (forum_id,)).fetchone()
+ forum = Forum.fetch(id=forum_id)
if forum == None:
abort(404)
return f(forum, *args, **kwargs)
@@ -74,7 +58,7 @@ def requires_permission(permission, login_required=True):
def decorator(f):
@functools.wraps(f)
def wrapper(forum, *args, **kwargs):
- if not has_permission(forum['id'],g.user,permission,login_required):
+ if not has_permission(forum.id,g.user,permission,login_required):
abort(403)
return f(forum, *args, **kwargs)
return wrapper
@@ -84,7 +68,7 @@ def requires_bureaucrat(f):
@functools.wraps(f)
@requires_permission("p_view_forum")
def wrapper(forum, *args, **kwargs):
- if not is_bureaucrat(forum['id'], g.user):
+ if not is_bureaucrat(forum.id, g.user):
abort(403)
return f(forum, *args, **kwargs)
return wrapper
@@ -102,9 +86,9 @@ def view_forum(forum,page=1):
sortby_dir = {'d':'DESC','a':'ASC'}[sortby[1]]
sortby_by = {'a':'threads.updated','c':'threads.created'}[sortby[0]]
except KeyError:
- return redirect(url_for('forum.view_forum',forum_id=forum['id']))
+ return redirect(url_for('forum.view_forum',forum_id=forum.id))
- avail_tags = get_avail_tags(forum['id'])
+ avail_tags = forum.avail_tags()
tagfilter = request.args.get("tagfilter",None)
if tagfilter == "":
@@ -116,7 +100,7 @@ def view_forum(forum,page=1):
tagfilter = int(tagfilter)
except ValueError:
flash(f'invalid tag id "{tagfilter}"')
- return redirect(url_for('forum.view_forum',forum_id=forum['id']))
+ abort(400)
else:
# there is no risk of sql injection because
# we just checked it is an int
@@ -127,7 +111,7 @@ def view_forum(forum,page=1):
break
else:
flash("that tag doesn't exist or isn't available here")
- return redirect(url_for('forum.view_forum',forum_id=forum['id']))
+ abort(400)
threads = db.execute(
@@ -148,7 +132,7 @@ def view_forum(forum,page=1):
ORDER BY {sortby_by} {sortby_dir}
LIMIT ? OFFSET ?;
""",(
- forum['id'],
+ forum.id,
THREADS_PER_PAGE,
(page-1)*THREADS_PER_PAGE,
)).fetchall()
@@ -157,15 +141,13 @@ def view_forum(forum,page=1):
SELECT count(*) AS count FROM threads
LEFT OUTER JOIN thread_tags ON threads.id = thread_tags.thread
WHERE threads.forum = ? {tagfilter_clause};
- """,(forum['id'],)).fetchone()['count']
+ """,(forum.id,)).fetchone()['count']
max_pageno = math.ceil(num_threads/THREADS_PER_PAGE)
thread_tags = {}
thread_polls = {}
-
- #todo: somehow optimise this
for thread in threads:
thread_tags[thread['id']] = db.execute(
"""SELECT tags.* FROM tags
@@ -197,13 +179,17 @@ def view_forum(forum,page=1):
thread_polls[thread['id']]=poll
- subforums_rows = db.execute("""
- SELECT max(threads.updated) as updated, forums.* FROM forums
- LEFT OUTER JOIN threads ON threads.forum=forums.id
- WHERE parent = ? AND unlisted = 0
- GROUP BY forums.id
- ORDER BY name ASC
- """,(forum['id'],)).fetchall()
+ # subforums don't exist any more
+ # forums will be able to link to other forums though, eventually
+ #
+ subforums_rows = []
+ # subforums_rows = db.execute("""
+ # SELECT max(threads.updated) as updated, forums.* FROM forums
+ # LEFT OUTER JOIN threads ON threads.forum=forums.id
+ # WHERE parent = ? AND unlisted = 0
+ # GROUP BY forums.id
+ # ORDER BY name ASC
+ # """,(forum.id,)).fetchall()
subforums = []
for s in subforums_rows:
a={}
@@ -216,7 +202,7 @@ def view_forum(forum,page=1):
bureaucrats = db.execute("""
SELECT user FROM role_assignments
WHERE role = 'bureaucrat' AND forum = ?
- """,(forum['id'],)).fetchall()
+ """,(forum.id,)).fetchall()
bureaucrats = [b[0] for b in bureaucrats]
return render_template("view_forum.html",
@@ -226,7 +212,6 @@ def view_forum(forum,page=1):
thread_tags=thread_tags,
bureaucrats=bureaucrats,
thread_polls=thread_polls,
- avail_tags=avail_tags,
max_pageno=max_pageno,
page=page,
current_sortby=sortby,
@@ -238,43 +223,38 @@ def view_forum(forum,page=1):
@requires_permission("p_view_forum")
def create_thread(forum):
db = get_db()
- forum = db.execute("SELECT * FROM forums WHERE id = ?",(forum['id'],)).fetchone()
- if forum is None:
- flash("that forum doesn't exist")
- return redirect(url_for('index'))
+
+ # i want to immortalize this
+ #forum = db.execute("SELECT * FROM forums WHERE id = ?",(forum.id,)).fetchone()
+
if g.user is None:
- flash("you need to be logged in to create a thread")
- return redirect(url_for('index'))
+ abort(403,"you need to be logged in to create a thread")
if request.method == "POST":
title = request.form['title']
content = request.form['content']
- err = None
if len(title.strip()) == 0 or len(content.strip()) == 0:
- err = "title and content can't be empty"
-
- if err is None:
- cur = db.cursor()
- cur.execute(
- "INSERT INTO threads (title,creator,created,updated,forum) VALUES (?,?,current_timestamp,current_timestamp,?);",
- (title,g.user,forum['id'])
- )
- thread_id = cur.lastrowid
- cur.execute(
- "INSERT INTO posts (thread,created,author,content) VALUES (?,current_timestamp,?,?);",
- (thread_id,g.user,content)
- )
- db.commit()
+ abort(400,"title and content can't be empty")
+
+ cur = db.cursor()
+ cur.execute(
+ "INSERT INTO threads (title,creator,created,updated,forum) VALUES (?,?,current_timestamp,current_timestamp,?);",
+ (title,g.user,forum.id)
+ )
+ thread_id = cur.lastrowid
+ cur.execute(
+ "INSERT INTO posts (thread,created,author,content) VALUES (?,current_timestamp,?,?);",
+ (thread_id,g.user,content)
+ )
+ db.commit()
- from . import webhooks
- thread = db.execute("select * from threads where id = ?",(thread_id,)).fetchone()
- webhooks.do_webhooks_thread(forum['id'],thread)
- return redirect(url_for('thread.view_thread',thread_id=thread_id))
- flash(err)
-
-
- return render_template("create_thread.html")
+ from . import webhooks
+ thread = db.execute("select * from threads where id = ?",(thread_id,)).fetchone()
+ webhooks.do_webhooks_thread(forum.id,thread)
+ return redirect(url_for('thread.view_thread',thread_id=thread_id))
+ else:
+ return render_template("create_thread.html")
@forum_route("roles",methods=("GET","POST"))
@requires_bureaucrat
@@ -282,14 +262,14 @@ def edit_roles(forum):
db = get_db()
role_configs = db.execute(
"SELECT * FROM role_config WHERE forum = ? ORDER BY ID ASC",
- (forum['id'],)).fetchall()
+ (forum.id,)).fetchall()
if request.method == "POST":
for config in role_configs:
if 'delete_' + config['role'] in request.form:
db.execute(
"DELETE FROM role_config WHERE forum = ? AND role = ?",
- (forum['id'],config['role']))
+ (forum.id,config['role']))
elif 'roleconfig_' + config['role'] in request.form:
for p in role_permissions:
permission_setting =\
@@ -298,13 +278,13 @@ def edit_roles(forum):
UPDATE role_config SET {p} = ?
WHERE forum = ? AND role = ?;
""",
- (permission_setting,forum['id'], config['role']))
+ (permission_setting,forum.id, config['role']))
db.commit()
flash('roles sucessfully enroled')
- return redirect(url_for('forum.view_forum',forum_id=forum['id']))
+ return redirect(url_for('forum.view_forum',forum_id=forum.id))
role_config_roles = [c['role'] for c in role_configs]
- other_roles = [role for role in get_forum_roles(forum['id']) if not role in role_config_roles]
+ other_roles = [role for role in get_forum_roles(forum.id) if not role in role_config_roles]
return render_template("edit_permissions.html",
forum=forum,
@@ -319,28 +299,28 @@ def add_role(forum):
if not all(c in (" ","-","_") or c.isalnum() for c in name) \
or len(name) > 32:
flash("role name must contain no special characters")
- return redirect(url_for('forum.edit_roles',forum_id=forum['id']))
+ return redirect(url_for('forum.edit_roles',forum_id=forum.id))
if name == "bureaucrat":
flash("cannot configure permissions for bureaucrat")
- return redirect(url_for('forum.edit_roles',forum_id=forum['id']))
+ return redirect(url_for('forum.edit_roles',forum_id=forum.id))
db = get_db()
existing_config = db.execute("""
SELECT * FROM role_config WHERE forum = ? AND role = ?
- """,(forum['id'],name)).fetchone()
+ """,(forum.id,name)).fetchone()
if not existing_config:
db.execute("INSERT INTO role_config (forum,role) VALUES (?,?)",
- (forum['id'],name))
+ (forum.id,name))
db.commit()
- return redirect(url_for('forum.edit_roles',forum_id=forum['id']))
+ return redirect(url_for('forum.edit_roles',forum_id=forum.id))
@forum_route("role",methods=["GET","POST"])
@requires_permission("p_approve")
def view_user_role(forum):
if request.method == "POST":
return redirect(url_for( 'forum.edit_user_role',
- username=request.form['user'],forum_id=forum['id']))
+ username=request.form['user'],forum_id=forum.id))
else:
return render_template("role_assignment.html",forum=forum)
@@ -352,27 +332,27 @@ def edit_user_role(forum, username):
user = db.execute("SELECT * FROM users WHERE username = ?;",(username,)).fetchone()
if user == None:
return redirect(url_for('forum.edit_user_role',
- username=username,forum_id=forum['id']))
+ username=username,forum_id=forum.id))
role = request.form['role']
- if role not in get_forum_roles(forum['id']) and role != "" and role != "bureaucrat":
+ if role not in get_forum_roles(forum.id) and role != "" and role != "bureaucrat":
flash("no such role")
return redirect(url_for('forum.edit_user_role',
- username=username,forum_id=forum['id']))
- if not is_bureaucrat(forum['id'],g.user) and role != "approved" and role != "":
+ username=username,forum_id=forum.id))
+ if not is_bureaucrat(forum.id,g.user) and role != "approved" and role != "":
# only bureaucrats can assign arbitrary roles
abort(403)
existing = db.execute(
"SELECT * FROM role_assignments WHERE user = ? AND forum = ?;",
- (username,forum['id'])).fetchone()
+ (username,forum.id)).fetchone()
if existing:
- db.execute("DELETE FROM role_assignments WHERE user = ? AND forum = ?;",(username,forum['id']))
+ db.execute("DELETE FROM role_assignments WHERE user = ? AND forum = ?;",(username,forum.id))
if role != "":
db.execute(
"INSERT INTO role_assignments (user,role,forum) VALUES (?,?,?);",
- (username,role,forum['id']))
+ (username,role,forum.id))
db.commit()
flash("role assigned assignedly")
- return redirect(url_for('forum.view_forum',forum_id=forum['id']))
+ return redirect(url_for('forum.view_forum',forum_id=forum.id))
else:
user = db.execute("SELECT * FROM users WHERE username = ?;",(username,)).fetchone()
if user == None:
@@ -380,14 +360,14 @@ def edit_user_role(forum, username):
forum=forum,user=username,invalid_user=True)
r = db.execute(
"SELECT role FROM role_assignments WHERE user = ? AND forum = ?;",
- (username,forum['id'])).fetchone()
+ (username,forum.id)).fetchone()
if not r:
assigned_role = ""
else:
assigned_role = r[0]
- role = get_user_role(forum['id'], username)
- if is_bureaucrat(forum['id'], g.user):
- roles = get_forum_roles(forum['id'])
+ role = get_user_role(forum.id, username)
+ if is_bureaucrat(forum.id, g.user):
+ roles = get_forum_roles(forum.id)
roles.remove("other")
roles.add("bureaucrat")
else:
@@ -403,19 +383,19 @@ def forum_config_page(forum, create=False):
desc = request.form["description"]
if len(name) > 100 or len(name.strip()) == 0:
flash("invalid name")
- return redirect(url_for('forum.edit_forum',forum_id=forum['id']))
+ return redirect(url_for('forum.edit_forum',forum_id=forum.id))
elif len(desc) > 6000:
flash("invalid description")
- return redirect(url_for('forum.edit_forum',forum_id=forum['id']))
+ return redirect(url_for('forum.edit_forum',forum_id=forum.id))
if not create:
db.execute("UPDATE forums SET name = ?, description = ? WHERE id = ?",
- (name,desc,forum['id']))
- fid = forum['id']
+ (name,desc,forum.id))
+ fid = forum.id
else:
cur = db.cursor()
cur.execute(
"INSERT INTO forums (name,description,parent) VALUES (?,?,?)",
- (name,desc,forum['id']))
+ (name,desc,forum.id))
new = cur.lastrowid
# creator becomes bureaucrat of new forum
db.execute("INSERT INTO role_assignments (role,user,forum) VALUES (?,?,?)",
@@ -428,9 +408,9 @@ def forum_config_page(forum, create=False):
name = ""
desc = ""
else:
- name = forum['name']
- desc = forum['description']
- cancel_link = url_for('forum.view_forum',forum_id=forum['id'])
+ name = forum.name
+ desc = forum.description
+ cancel_link = url_for('forum.view_forum',forum_id=forum.id)
return render_template("edit_forum.html",create=create,
name=name,description=desc,cancel_link=cancel_link)
@@ -449,7 +429,7 @@ def create_forum(forum):
# if not is_admin: abort(403) # why doesn't this fucking work
# db = get_db()
# unlisted = db.execute(
-# "SELECT * FROM forums WHERE unlisted = 1 AND parent = ?",(forum['id'],))
+# "SELECT * FROM forums WHERE unlisted = 1 AND parent = ?",(forum.id,))
# return render_template('view_unlisted.html',forum=forum,unlisted=unlisted)
@bp.route("/search")
diff --git a/apioforum/orm.py b/apioforum/orm.py
new file mode 100644
index 0000000..c87dbdd
--- /dev/null
+++ b/apioforum/orm.py
@@ -0,0 +1,30 @@
+# "orm" is used very loosely
+# this is not very good, probably
+
+from .db import get_db
+
+class DBObj:
+ def __init_subclass__(cls, /, table, **kwargs):
+ # DO NOT pass anything with sql special characters in as the table name
+ super().__init_subclass__(**kwargs)
+ cls.table_name = table
+ @classmethod
+ def fetch(cls,*,id):
+ """fetch an object from the database, looked up by id."""
+ db = get_db()
+ # xxx this could be sped up by caching this query maybe instead of
+ # string formatting every time
+ row = db.execute(f"select * from {cls.table_name} where id = ?",(id,)).fetchone()
+ if row is None:
+ return None
+ item = cls.from_row(row)
+ return item
+ @classmethod
+ def from_row(cls,row):
+ # doesn't handle the ability to set fields yet
+ # we will use something like properties instead
+ # so this is somewhat bleh for now
+ self = cls()
+ for fieldname in cls.fields:
+ setattr(self,fieldname,row[fieldname])
+ return self
diff --git a/apioforum/templates/view_forum.html b/apioforum/templates/view_forum.html
index b03d51a..24b9cbe 100644
--- a/apioforum/templates/view_forum.html
+++ b/apioforum/templates/view_forum.html
@@ -2,9 +2,6 @@
{% from 'common.html' import ts, tag, disp_user, post_url, forum_breadcrumb, ab, vote_meter, pagination_nav %}
{% block header %}
<h1>{% block title %}{{forum.name}}{% endblock %} <span class="thing-id">#{{forum.id}}</span></h1>
-{% if forum.id != 1 %}
- {{ forum_breadcrumb(forum) }}
-{% endif %}
{%endblock%}
{%block content%}
diff --git a/apioforum/thread.py b/apioforum/thread.py
index 83b1adc..a2ba6f5 100644
--- a/apioforum/thread.py
+++ b/apioforum/thread.py
@@ -1,6 +1,7 @@
# view posts in thread
import itertools, math
+import functools
from flask import (
Blueprint, render_template, abort, request, g, redirect,
@@ -9,10 +10,35 @@ from flask import (
from .db import get_db
from .roles import has_permission
from . import webhooks
-from .forum import get_avail_tags
+from .forum import Forum
+
+class Thread:
+ fields = ["id","title","creator","created","updated","forum","poll"]
+
+
+
bp = Blueprint("thread", __name__, url_prefix="/thread")
+def thread_route(relative_path, pagination=False, **kwargs):
+ def decorator(f):
+ path = "/<int:thread_id>"
+ if relative_path != "":
+ path += "/" + relative_path
+
+ @bp.route(path, **kwargs)
+ @functools.wraps(f)
+ def wrapper(thread_id, *args, **kwargs):
+ thread = Thread.fetch(id=forum_id)
+ if thread == None:
+ abort(404)
+ return f(forum, *args, **kwargs)
+
+ if pagination:
+ wrapper = bp.route(path+"/page/<int:page>", **kwargs)(wrapper)
+
+ return decorator
+
POSTS_PER_PAGE = 28
def which_page(post_id,return_thread_id=False):
@@ -43,24 +69,24 @@ def post_jump(post_id,*,external=False):
page,thread_id=which_page(post_id,True)
return url_for("thread.view_thread",thread_id=thread_id,page=page,_external=external)+"#post_"+str(post_id)
-@bp.route("/<int:thread_id>")
-@bp.route("/<int:thread_id>/page/<int:page>")
-def view_thread(thread_id,page=1):
+#@bp.route("/<int:thread_id>")
+#@bp.route("/<int:thread_id>/page/<int:page>")
+@thread_route("",pagination=True)
+def view_thread(thread,page=1):
if page < 1:
abort(400)
db = get_db()
- thread = db.execute("SELECT * FROM threads WHERE id = ?;",(thread_id,)).fetchone()
- if thread is None:
- abort(404)
- if not has_permission(thread['forum'], g.user, "p_view_threads", False):
+
+ if not has_permission(thread.forum, g.user, "p_view_threads", False):
abort(403)
+
posts = db.execute("""
SELECT * FROM posts
WHERE posts.thread = ?
ORDER BY created ASC
LIMIT ? OFFSET ?;
""",(
- thread_id,
+ thread.id,
POSTS_PER_PAGE,
(page-1)*POSTS_PER_PAGE,
)).fetchall()
@@ -69,18 +95,18 @@ def view_thread(thread_id,page=1):
max_pageno = math.ceil(num_posts/POSTS_PER_PAGE)
tags = db.execute(
- """SELECT tags.* FROM tags
+ """SELECT tags.* FROM tags
INNER JOIN thread_tags ON thread_tags.tag = tags.id
WHERE thread_tags.thread = ?
ORDER BY tags.id""",(thread_id,)).fetchall()
poll = None
votes = None
- if thread['poll'] is not None:
+ if thread.poll is not None:
poll_row= db.execute("""
SELECT polls.*,total_vote_counts.total_votes FROM polls
LEFT OUTER JOIN total_vote_counts ON polls.id = total_vote_counts.poll
WHERE polls.id = ?;
- """,(thread['poll'],)).fetchone()
+ """,(thread.poll,)).fetchone()
options = db.execute("""
SELECT poll_options.*, vote_counts.num
FROM poll_options
@@ -93,7 +119,6 @@ def view_thread(thread_id,page=1):
poll.update(poll_row)
poll['options'] = options
votes = {}
- # todo: optimise this somehow
for post in posts:
if post['vote'] is not None:
votes[post['id']] = db.execute("SELECT * FROM votes WHERE id = ?",(post['vote'],)).fetchone()
@@ -360,7 +385,8 @@ def config_thread(thread_id):
db = get_db()
thread = db.execute("select * from threads where id = ?",(thread_id,)).fetchone()
thread_tags = [r['tag'] for r in db.execute("select tag from thread_tags where thread = ?",(thread_id,)).fetchall()]
- avail_tags = get_avail_tags(thread['forum'])
+ forum = Forum.fetch(id=thread['forum'])
+ avail_tags = forum.avail_tags()
err = None
if g.user is None:
err = "you need to be logged in to do that"