aboutsummaryrefslogtreecommitdiffhomepage
path: root/apioforum/forum.py
diff options
context:
space:
mode:
Diffstat (limited to 'apioforum/forum.py')
-rw-r--r--apioforum/forum.py860
1 files changed, 430 insertions, 430 deletions
diff --git a/apioforum/forum.py b/apioforum/forum.py
index b86fcc9..ce90853 100644
--- a/apioforum/forum.py
+++ b/apioforum/forum.py
@@ -3,8 +3,8 @@
# ^ aha we never removed this. we should keep it. it is funny.
from flask import (
- Blueprint, render_template, request,
- g, redirect, url_for, flash, abort
+ Blueprint, render_template, request,
+ g, redirect, url_for, flash, abort
)
from .db import get_db
@@ -23,484 +23,484 @@ bp = Blueprint("forum", __name__, url_prefix="/")
@bp.route("/")
def not_actual_index():
- return redirect("/1")
+ return redirect("/1")
def get_avail_tags(forum_id):
- db = get_db()
- tags = db.execute("""
- WITH RECURSIVE fs AS
- (SELECT * FROM forums WHERE id = ?
- UNION ALL
- SELECT forums.* FROM forums, fs WHERE fs.parent=forums.id)
- SELECT * FROM tags
- WHERE tags.forum in (SELECT id FROM fs)
- ORDER BY id;
- """,(forum_id,)).fetchall()
- return tags
+ db = get_db()
+ tags = db.execute("""
+ WITH RECURSIVE fs AS
+ (SELECT * FROM forums WHERE id = ?
+ UNION ALL
+ SELECT forums.* FROM forums, fs WHERE fs.parent=forums.id)
+ SELECT * FROM tags
+ WHERE tags.forum in (SELECT id FROM fs)
+ ORDER BY id;
+ """,(forum_id,)).fetchall()
+ return tags
def forum_path(forum_id):
- db = get_db()
- ancestors = db.execute("""
- WITH RECURSIVE fs AS
- (SELECT * FROM forums WHERE id = ?
- UNION ALL
- SELECT forums.* FROM forums, fs WHERE fs.parent=forums.id)
- SELECT * FROM fs;
- """,(forum_id,)).fetchall()
- ancestors.reverse()
- return ancestors
+ db = get_db()
+ ancestors = db.execute("""
+ WITH RECURSIVE fs AS
+ (SELECT * FROM forums WHERE id = ?
+ UNION ALL
+ SELECT forums.* FROM forums, fs WHERE fs.parent=forums.id)
+ SELECT * FROM fs;
+ """,(forum_id,)).fetchall()
+ ancestors.reverse()
+ return ancestors
def forum_route(relative_path, pagination=False, **kwargs):
- def decorator(f):
- path = "/<int:forum_id>"
- if relative_path != "":
- path += "/" + relative_path
-
- @bp.route(path, **kwargs)
- @functools.wraps(f)
- def wrapper(forum_id, *args, **kwargs):
- db = get_db()
- forum = db.execute("SELECT * FROM forums WHERE id = ?",
- (forum_id,)).fetchone()
- if forum == None:
- abort(404)
- return f(forum, *args, **kwargs)
-
- if pagination:
- wrapper = bp.route(path+"/page/<int:page>", **kwargs)(wrapper)
-
- return decorator
+ def decorator(f):
+ path = "/<int:forum_id>"
+ if relative_path != "":
+ path += "/" + relative_path
+
+ @bp.route(path, **kwargs)
+ @functools.wraps(f)
+ def wrapper(forum_id, *args, **kwargs):
+ db = get_db()
+ forum = db.execute("SELECT * FROM forums WHERE id = ?",
+ (forum_id,)).fetchone()
+ if forum == None:
+ abort(404)
+ return f(forum, *args, **kwargs)
+
+ if pagination:
+ wrapper = bp.route(path+"/page/<int:page>", **kwargs)(wrapper)
+
+ return decorator
def requires_permission(permission, login_required=True):
- def decorator(f):
- @functools.wraps(f)
- def wrapper(forum, *args, **kwargs):
- if not has_permission(forum['id'],g.user,permission,login_required):
- abort(403)
- return f(forum, *args, **kwargs)
- return wrapper
- return decorator
+ def decorator(f):
+ @functools.wraps(f)
+ def wrapper(forum, *args, **kwargs):
+ if not has_permission(forum['id'],g.user,permission,login_required):
+ abort(403)
+ return f(forum, *args, **kwargs)
+ return wrapper
+ return decorator
def requires_bureaucrat(f):
- @functools.wraps(f)
- @requires_permission("p_view_forum")
- def wrapper(forum, *args, **kwargs):
- if not is_bureaucrat(forum['id'], g.user):
- abort(403)
- return f(forum, *args, **kwargs)
- return wrapper
+ @functools.wraps(f)
+ @requires_permission("p_view_forum")
+ def wrapper(forum, *args, **kwargs):
+ if not is_bureaucrat(forum['id'], g.user):
+ abort(403)
+ return f(forum, *args, **kwargs)
+ return wrapper
def set_updated(forum_id):
- db = get_db()
- ancestors = db.execute("""
- WITH RECURSIVE fs AS
- (SELECT * FROM forums WHERE id = ?
- UNION ALL
- SELECT forums.* FROM forums, fs WHERE fs.parent=forums.id)
- SELECT * FROM fs;
- """,(forum_id,)).fetchall()
- for f in ancestors:
- db.execute("UPDATE forums SET updated = current_timestamp WHERE id = ?;", (f['id'],))
- db.commit()
+ db = get_db()
+ ancestors = db.execute("""
+ WITH RECURSIVE fs AS
+ (SELECT * FROM forums WHERE id = ?
+ UNION ALL
+ SELECT forums.* FROM forums, fs WHERE fs.parent=forums.id)
+ SELECT * FROM fs;
+ """,(forum_id,)).fetchall()
+ for f in ancestors:
+ db.execute("UPDATE forums SET updated = current_timestamp WHERE id = ?;", (f['id'],))
+ db.commit()
@forum_route("",pagination=True)
@requires_permission("p_view_forum", login_required=False)
def view_forum(forum,page=1):
- db = get_db()
-
- sortby = request.args.get("sortby","ad")
- try:
- sortby_dir = {'d':'DESC','a':'ASC'}[sortby[1]]
- sortby_by = {'a':'threads.updated','c':'threads.created'}[sortby[0]]
- except KeyError:
- return redirect(url_for('forum.view_forum',forum_id=forum['id']))
-
- avail_tags = get_avail_tags(forum['id'])
-
- tagfilter = request.args.get("tagfilter",None)
- if tagfilter == "":
- tagfilter = None
- tagfilter_clause = ""
- tagfilter_tag = None
- if tagfilter is not None:
- try:
- tagfilter = int(tagfilter)
- except ValueError:
- flash(f'invalid tag id "{tagfilter}"')
- return redirect(url_for('forum.view_forum',forum_id=forum['id']))
- else:
- # there is no risk of sql injection because
- # we just checked it is an int
- tagfilter_clause = f"AND thread_tags.tag = {tagfilter}"
- for the_tag in avail_tags:
- if the_tag['id'] == tagfilter:
- tagfilter_tag = the_tag
- break
- else:
- flash("that tag doesn't exist or isn't available here")
- return redirect(url_for('forum.view_forum',forum_id=forum['id']))
-
-
- threads = db.execute(
- f"""SELECT
- threads.id, threads.title, threads.creator, threads.created,
- threads.updated, threads.poll, number_of_posts.num_replies,
- most_recent_posts.created as mrp_created,
- most_recent_posts.author as mrp_author,
- most_recent_posts.id as mrp_id,
- most_recent_posts.content as mrp_content,
- most_recent_posts.deleted as mrp_deleted
- FROM threads
- INNER JOIN most_recent_posts ON most_recent_posts.thread = threads.id
- INNER JOIN number_of_posts ON number_of_posts.thread = threads.id
- LEFT OUTER JOIN thread_tags ON threads.id = thread_tags.thread
- WHERE threads.forum = ? {tagfilter_clause}
- GROUP BY threads.id
- ORDER BY {sortby_by} {sortby_dir}
- LIMIT ? OFFSET ?;
- """,(
- forum['id'],
- THREADS_PER_PAGE,
- (page-1)*THREADS_PER_PAGE,
- )).fetchall()
-
- num_threads = db.execute(f"""
- SELECT count(*) AS count FROM threads
- LEFT OUTER JOIN thread_tags ON threads.id = thread_tags.thread
- WHERE threads.forum = ? {tagfilter_clause};
- """,(forum['id'],)).fetchone()['count']
- max_pageno = math.ceil(num_threads/THREADS_PER_PAGE)
- if page < 1:
- abort(404)
- elif page > max_pageno and (max_pageno > 0 or page != 1):
- abort(404)
-
- thread_tags = {}
- thread_polls = {}
-
-
- #todo: somehow optimise this
- for thread in threads:
- thread_tags[thread['id']] = db.execute(
- """SELECT tags.* FROM tags
- INNER JOIN thread_tags ON thread_tags.tag = tags.id
- WHERE thread_tags.thread = ?
- ORDER BY tags.id;
- """,(thread['id'],)).fetchall()
-
- if thread['poll'] is not None:
- # todo: make this not be duplicated from thread.py
- poll_row= db.execute("""
- SELECT polls.*,total_vote_counts.total_votes FROM polls
- LEFT OUTER JOIN total_vote_counts ON polls.id = total_vote_counts.poll
- WHERE polls.id = ?;
- """,(thread['poll'],)).fetchone()
- options = db.execute("""
- SELECT poll_options.*, vote_counts.num
- FROM poll_options
- LEFT OUTER JOIN vote_counts ON poll_options.poll = vote_counts.poll
- AND poll_options.option_idx = vote_counts.option_idx
- WHERE poll_options.poll = ?
- ORDER BY option_idx asc;
- """,(poll_row['id'],)).fetchall()
-
- poll = {}
- poll.update(poll_row)
- poll['options'] = options
- poll['total_votes']=poll['total_votes'] or 0
- thread_polls[thread['id']]=poll
-
-
- subforums_rows = db.execute("""
- SELECT max(threads.updated) as updated, forums.* FROM forums
- LEFT OUTER JOIN threads ON threads.forum=forums.id
- WHERE parent = ? AND unlisted = 0
- GROUP BY forums.id
- ORDER BY name ASC
- """,(forum['id'],)).fetchall()
- subforums = []
- for s in subforums_rows:
- a={}
- a.update(s)
- if a['updated'] is not None:
- a['updated'] = datetime.datetime.fromisoformat(a['updated'])
- if has_permission(a['id'],g.user,"p_view_forum",login_required=False):
- subforums.append(a)
-
- bureaucrats = db.execute("""
- SELECT user FROM role_assignments
- WHERE role = 'bureaucrat' AND forum = ?
- """,(forum['id'],)).fetchall()
- bureaucrats = [b[0] for b in bureaucrats]
-
-
- if g.user != None:
- db.execute("DELETE FROM read WHERE user = ? AND forum = ?;", (g.user, forum['id']))
- db.execute("INSERT INTO read (user,forum,time) VALUES (?,?,current_timestamp);", (g.user, forum['id']))
- db.commit()
-
-
- return render_template("view_forum.html",
- forum=forum,
- subforums=subforums,
- threads=threads,
- thread_tags=thread_tags,
- bureaucrats=bureaucrats,
- thread_polls=thread_polls,
- avail_tags=avail_tags,
- max_pageno=max_pageno,
- page=page,
- current_sortby=sortby,
- tagfilter_tag=tagfilter_tag,
- is_read=read.is_read,
- )
+ db = get_db()
+
+ sortby = request.args.get("sortby","ad")
+ try:
+ sortby_dir = {'d':'DESC','a':'ASC'}[sortby[1]]
+ sortby_by = {'a':'threads.updated','c':'threads.created'}[sortby[0]]
+ except KeyError:
+ return redirect(url_for('forum.view_forum',forum_id=forum['id']))
+
+ avail_tags = get_avail_tags(forum['id'])
+
+ tagfilter = request.args.get("tagfilter",None)
+ if tagfilter == "":
+ tagfilter = None
+ tagfilter_clause = ""
+ tagfilter_tag = None
+ if tagfilter is not None:
+ try:
+ tagfilter = int(tagfilter)
+ except ValueError:
+ flash(f'invalid tag id "{tagfilter}"')
+ return redirect(url_for('forum.view_forum',forum_id=forum['id']))
+ else:
+ # there is no risk of sql injection because
+ # we just checked it is an int
+ tagfilter_clause = f"AND thread_tags.tag = {tagfilter}"
+ for the_tag in avail_tags:
+ if the_tag['id'] == tagfilter:
+ tagfilter_tag = the_tag
+ break
+ else:
+ flash("that tag doesn't exist or isn't available here")
+ return redirect(url_for('forum.view_forum',forum_id=forum['id']))
+
+
+ threads = db.execute(
+ f"""SELECT
+ threads.id, threads.title, threads.creator, threads.created,
+ threads.updated, threads.poll, number_of_posts.num_replies,
+ most_recent_posts.created as mrp_created,
+ most_recent_posts.author as mrp_author,
+ most_recent_posts.id as mrp_id,
+ most_recent_posts.content as mrp_content,
+ most_recent_posts.deleted as mrp_deleted
+ FROM threads
+ INNER JOIN most_recent_posts ON most_recent_posts.thread = threads.id
+ INNER JOIN number_of_posts ON number_of_posts.thread = threads.id
+ LEFT OUTER JOIN thread_tags ON threads.id = thread_tags.thread
+ WHERE threads.forum = ? {tagfilter_clause}
+ GROUP BY threads.id
+ ORDER BY {sortby_by} {sortby_dir}
+ LIMIT ? OFFSET ?;
+ """,(
+ forum['id'],
+ THREADS_PER_PAGE,
+ (page-1)*THREADS_PER_PAGE,
+ )).fetchall()
+
+ num_threads = db.execute(f"""
+ SELECT count(*) AS count FROM threads
+ LEFT OUTER JOIN thread_tags ON threads.id = thread_tags.thread
+ WHERE threads.forum = ? {tagfilter_clause};
+ """,(forum['id'],)).fetchone()['count']
+ max_pageno = math.ceil(num_threads/THREADS_PER_PAGE)
+ if page < 1:
+ abort(404)
+ elif page > max_pageno and (max_pageno > 0 or page != 1):
+ abort(404)
+
+ thread_tags = {}
+ thread_polls = {}
+
+
+ #todo: somehow optimise this
+ for thread in threads:
+ thread_tags[thread['id']] = db.execute(
+ """SELECT tags.* FROM tags
+ INNER JOIN thread_tags ON thread_tags.tag = tags.id
+ WHERE thread_tags.thread = ?
+ ORDER BY tags.id;
+ """,(thread['id'],)).fetchall()
+
+ if thread['poll'] is not None:
+ # todo: make this not be duplicated from thread.py
+ poll_row= db.execute("""
+ SELECT polls.*,total_vote_counts.total_votes FROM polls
+ LEFT OUTER JOIN total_vote_counts ON polls.id = total_vote_counts.poll
+ WHERE polls.id = ?;
+ """,(thread['poll'],)).fetchone()
+ options = db.execute("""
+ SELECT poll_options.*, vote_counts.num
+ FROM poll_options
+ LEFT OUTER JOIN vote_counts ON poll_options.poll = vote_counts.poll
+ AND poll_options.option_idx = vote_counts.option_idx
+ WHERE poll_options.poll = ?
+ ORDER BY option_idx asc;
+ """,(poll_row['id'],)).fetchall()
+
+ poll = {}
+ poll.update(poll_row)
+ poll['options'] = options
+ poll['total_votes']=poll['total_votes'] or 0
+ thread_polls[thread['id']]=poll
+
+
+ subforums_rows = db.execute("""
+ SELECT max(threads.updated) as updated, forums.* FROM forums
+ LEFT OUTER JOIN threads ON threads.forum=forums.id
+ WHERE parent = ? AND unlisted = 0
+ GROUP BY forums.id
+ ORDER BY name ASC
+ """,(forum['id'],)).fetchall()
+ subforums = []
+ for s in subforums_rows:
+ a={}
+ a.update(s)
+ if a['updated'] is not None:
+ a['updated'] = datetime.datetime.fromisoformat(a['updated'])
+ if has_permission(a['id'],g.user,"p_view_forum",login_required=False):
+ subforums.append(a)
+
+ bureaucrats = db.execute("""
+ SELECT user FROM role_assignments
+ WHERE role = 'bureaucrat' AND forum = ?
+ """,(forum['id'],)).fetchall()
+ bureaucrats = [b[0] for b in bureaucrats]
+
+
+ if g.user != None:
+ db.execute("DELETE FROM read WHERE user = ? AND forum = ?;", (g.user, forum['id']))
+ db.execute("INSERT INTO read (user,forum,time) VALUES (?,?,current_timestamp);", (g.user, forum['id']))
+ db.commit()
+
+
+ return render_template("view_forum.html",
+ forum=forum,
+ subforums=subforums,
+ threads=threads,
+ thread_tags=thread_tags,
+ bureaucrats=bureaucrats,
+ thread_polls=thread_polls,
+ avail_tags=avail_tags,
+ max_pageno=max_pageno,
+ page=page,
+ current_sortby=sortby,
+ tagfilter_tag=tagfilter_tag,
+ is_read=read.is_read,
+ )
@forum_route("create_thread",methods=("GET","POST"))
@requires_permission("p_create_threads")
@requires_permission("p_view_forum")
def create_thread(forum):
- db = get_db()
- forum = db.execute("SELECT * FROM forums WHERE id = ?",(forum['id'],)).fetchone()
- if forum is None:
- flash("that forum doesn't exist")
- return redirect(url_for('index'))
-
- if g.user is None:
- flash("you need to be logged in to create a thread")
- return redirect(url_for('index'))
-
- if request.method == "POST":
- title = request.form['title']
- content = request.form['content']
- err = None
- if len(title.strip()) == 0 or len(content.strip()) == 0:
- err = "title and content can't be empty"
-
- if err is None:
- cur = db.cursor()
- cur.execute(
- "INSERT INTO threads (title,creator,created,updated,forum) VALUES (?,?,current_timestamp,current_timestamp,?);",
- (title,g.user,forum['id'])
- )
- thread_id = cur.lastrowid
- cur.execute(
- "INSERT INTO posts (thread,created,author,content) VALUES (?,current_timestamp,?,?);",
- (thread_id,g.user,content)
- )
- db.commit()
- set_updated(forum['id'])
-
- from . import webhooks
- thread = db.execute("select * from threads where id = ?",(thread_id,)).fetchone()
- webhooks.do_webhooks_thread(forum['id'],thread)
- return redirect(url_for('thread.view_thread',thread_id=thread_id))
- flash(err)
-
-
- return render_template("create_thread.html")
+ db = get_db()
+ forum = db.execute("SELECT * FROM forums WHERE id = ?",(forum['id'],)).fetchone()
+ if forum is None:
+ flash("that forum doesn't exist")
+ return redirect(url_for('index'))
+
+ if g.user is None:
+ flash("you need to be logged in to create a thread")
+ return redirect(url_for('index'))
+
+ if request.method == "POST":
+ title = request.form['title']
+ content = request.form['content']
+ err = None
+ if len(title.strip()) == 0 or len(content.strip()) == 0:
+ err = "title and content can't be empty"
+
+ if err is None:
+ cur = db.cursor()
+ cur.execute(
+ "INSERT INTO threads (title,creator,created,updated,forum) VALUES (?,?,current_timestamp,current_timestamp,?);",
+ (title,g.user,forum['id'])
+ )
+ thread_id = cur.lastrowid
+ cur.execute(
+ "INSERT INTO posts (thread,created,author,content) VALUES (?,current_timestamp,?,?);",
+ (thread_id,g.user,content)
+ )
+ db.commit()
+ set_updated(forum['id'])
+
+ from . import webhooks
+ thread = db.execute("select * from threads where id = ?",(thread_id,)).fetchone()
+ webhooks.do_webhooks_thread(forum['id'],thread)
+ return redirect(url_for('thread.view_thread',thread_id=thread_id))
+ flash(err)
+
+
+ return render_template("create_thread.html")
@forum_route("roles",methods=("GET","POST"))
@requires_bureaucrat
def edit_roles(forum):
- db = get_db()
- role_configs = db.execute(
- "SELECT * FROM role_config WHERE forum = ? ORDER BY ID ASC",
- (forum['id'],)).fetchall()
-
- if request.method == "POST":
- for config in role_configs:
- if 'delete_' + config['role'] in request.form:
- db.execute(
- "DELETE FROM role_config WHERE forum = ? AND role = ?",
- (forum['id'],config['role']))
- elif 'roleconfig_' + config['role'] in request.form:
- for p in role_permissions:
- permission_setting =\
- f"perm_{config['role']}_{p}" in request.form
- db.execute(f"""
- UPDATE role_config SET {p} = ?
- WHERE forum = ? AND role = ?;
- """,
- (permission_setting,forum['id'], config['role']))
- db.commit()
- flash('roles sucessfully enroled')
- return redirect(url_for('forum.view_forum',forum_id=forum['id']))
-
- role_config_roles = [c['role'] for c in role_configs]
- other_roles = [role for role in get_forum_roles(forum['id']) if not role in role_config_roles]
-
- return render_template("edit_permissions.html",
- forum=forum,
- role_configs=role_configs,
- other_roles=other_roles
- )
+ db = get_db()
+ role_configs = db.execute(
+ "SELECT * FROM role_config WHERE forum = ? ORDER BY ID ASC",
+ (forum['id'],)).fetchall()
+
+ if request.method == "POST":
+ for config in role_configs:
+ if 'delete_' + config['role'] in request.form:
+ db.execute(
+ "DELETE FROM role_config WHERE forum = ? AND role = ?",
+ (forum['id'],config['role']))
+ elif 'roleconfig_' + config['role'] in request.form:
+ for p in role_permissions:
+ permission_setting =\
+ f"perm_{config['role']}_{p}" in request.form
+ db.execute(f"""
+ UPDATE role_config SET {p} = ?
+ WHERE forum = ? AND role = ?;
+ """,
+ (permission_setting,forum['id'], config['role']))
+ db.commit()
+ flash('roles sucessfully enroled')
+ return redirect(url_for('forum.view_forum',forum_id=forum['id']))
+
+ role_config_roles = [c['role'] for c in role_configs]
+ other_roles = [role for role in get_forum_roles(forum['id']) if not role in role_config_roles]
+
+ return render_template("edit_permissions.html",
+ forum=forum,
+ role_configs=role_configs,
+ other_roles=other_roles
+ )
@forum_route("roles/new",methods=["POST"])
@requires_bureaucrat
def add_role(forum):
- name = request.form['role'].strip()
- if not all(c in (" ","-","_") or c.isalnum() for c in name) \
- or len(name) > 32:
- flash("role name must contain no special characters")
- return redirect(url_for('forum.edit_roles',forum_id=forum['id']))
- if name == "bureaucrat":
- flash("cannot configure permissions for bureaucrat")
- return redirect(url_for('forum.edit_roles',forum_id=forum['id']))
-
- db = get_db()
-
- existing_config = db.execute("""
- SELECT * FROM role_config WHERE forum = ? AND role = ?
- """,(forum['id'],name)).fetchone()
- if not existing_config:
- db.execute("INSERT INTO role_config (forum,role) VALUES (?,?)",
- (forum['id'],name))
- db.commit()
- return redirect(url_for('forum.edit_roles',forum_id=forum['id']))
+ name = request.form['role'].strip()
+ if not all(c in (" ","-","_") or c.isalnum() for c in name) \
+ or len(name) > 32:
+ flash("role name must contain no special characters")
+ return redirect(url_for('forum.edit_roles',forum_id=forum['id']))
+ if name == "bureaucrat":
+ flash("cannot configure permissions for bureaucrat")
+ return redirect(url_for('forum.edit_roles',forum_id=forum['id']))
+
+ db = get_db()
+
+ existing_config = db.execute("""
+ SELECT * FROM role_config WHERE forum = ? AND role = ?
+ """,(forum['id'],name)).fetchone()
+ if not existing_config:
+ db.execute("INSERT INTO role_config (forum,role) VALUES (?,?)",
+ (forum['id'],name))
+ db.commit()
+ return redirect(url_for('forum.edit_roles',forum_id=forum['id']))
@forum_route("role",methods=["GET","POST"])
@requires_permission("p_approve")
def view_user_role(forum):
- if request.method == "POST":
- return redirect(url_for( 'forum.edit_user_role',
- username=request.form['user'],forum_id=forum['id']))
- else:
- return render_template("role_assignment.html",forum=forum)
+ if request.method == "POST":
+ return redirect(url_for( 'forum.edit_user_role',
+ username=request.form['user'],forum_id=forum['id']))
+ else:
+ return render_template("role_assignment.html",forum=forum)
@forum_route("role/<username>",methods=["GET","POST"])
@requires_permission("p_approve")
def edit_user_role(forum, username):
- db = get_db()
- if request.method == "POST":
- user = db.execute("SELECT * FROM users WHERE username = ?;",(username,)).fetchone()
- if user == None:
- return redirect(url_for('forum.edit_user_role',
- username=username,forum_id=forum['id']))
- role = request.form['role']
- if role not in get_forum_roles(forum['id']) and role != "" and role != "bureaucrat":
- flash("no such role")
- return redirect(url_for('forum.edit_user_role',
- username=username,forum_id=forum['id']))
- if not is_bureaucrat(forum['id'],g.user) and role != "approved" and role != "":
- # only bureaucrats can assign arbitrary roles
- abort(403)
- existing = db.execute(
- "SELECT * FROM role_assignments WHERE user = ? AND forum = ?;",
- (username,forum['id'])).fetchone()
- if existing:
- db.execute("DELETE FROM role_assignments WHERE user = ? AND forum = ?;",(username,forum['id']))
- if role != "":
- db.execute(
- "INSERT INTO role_assignments (user,role,forum) VALUES (?,?,?);",
- (username,role,forum['id']))
- db.commit()
- flash("role assigned assignedly")
- return redirect(url_for('forum.view_forum',forum_id=forum['id']))
- else:
- user = db.execute("SELECT * FROM users WHERE username = ?;",(username,)).fetchone()
- if user == None:
- return render_template("role_assignment.html",
- forum=forum,user=username,invalid_user=True)
- r = db.execute(
- "SELECT role FROM role_assignments WHERE user = ? AND forum = ?;",
- (username,forum['id'])).fetchone()
- if not r:
- assigned_role = ""
- else:
- assigned_role = r[0]
- role = get_user_role(forum['id'], username)
- if is_bureaucrat(forum['id'], g.user):
- roles = get_forum_roles(forum['id'])
- roles.remove("other")
- roles.add("bureaucrat")
- else:
- roles = ["approved"]
- return render_template("role_assignment.html",
- forum=forum,user=username,role=role,
- assigned_role=assigned_role,forum_roles=roles)
+ db = get_db()
+ if request.method == "POST":
+ user = db.execute("SELECT * FROM users WHERE username = ?;",(username,)).fetchone()
+ if user == None:
+ return redirect(url_for('forum.edit_user_role',
+ username=username,forum_id=forum['id']))
+ role = request.form['role']
+ if role not in get_forum_roles(forum['id']) and role != "" and role != "bureaucrat":
+ flash("no such role")
+ return redirect(url_for('forum.edit_user_role',
+ username=username,forum_id=forum['id']))
+ if not is_bureaucrat(forum['id'],g.user) and role != "approved" and role != "":
+ # only bureaucrats can assign arbitrary roles
+ abort(403)
+ existing = db.execute(
+ "SELECT * FROM role_assignments WHERE user = ? AND forum = ?;",
+ (username,forum['id'])).fetchone()
+ if existing:
+ db.execute("DELETE FROM role_assignments WHERE user = ? AND forum = ?;",(username,forum['id']))
+ if role != "":
+ db.execute(
+ "INSERT INTO role_assignments (user,role,forum) VALUES (?,?,?);",
+ (username,role,forum['id']))
+ db.commit()
+ flash("role assigned assignedly")
+ return redirect(url_for('forum.view_forum',forum_id=forum['id']))
+ else:
+ user = db.execute("SELECT * FROM users WHERE username = ?;",(username,)).fetchone()
+ if user == None:
+ return render_template("role_assignment.html",
+ forum=forum,user=username,invalid_user=True)
+ r = db.execute(
+ "SELECT role FROM role_assignments WHERE user = ? AND forum = ?;",
+ (username,forum['id'])).fetchone()
+ if not r:
+ assigned_role = ""
+ else:
+ assigned_role = r[0]
+ role = get_user_role(forum['id'], username)
+ if is_bureaucrat(forum['id'], g.user):
+ roles = get_forum_roles(forum['id'])
+ roles.remove("other")
+ roles.add("bureaucrat")
+ else:
+ roles = ["approved"]
+ return render_template("role_assignment.html",
+ forum=forum,user=username,role=role,
+ assigned_role=assigned_role,forum_roles=roles)
def forum_config_page(forum, create=False):
- db = get_db()
- if request.method == "POST":
- name = request.form["name"]
- desc = request.form["description"]
- if len(name) > 100 or len(name.strip()) == 0:
- flash("invalid name")
- return redirect(url_for('forum.edit_forum',forum_id=forum['id']))
- elif len(desc) > 6000:
- flash("invalid description")
- return redirect(url_for('forum.edit_forum',forum_id=forum['id']))
- if not create:
- db.execute("UPDATE forums SET name = ?, description = ? WHERE id = ?",
- (name,desc,forum['id']))
- fid = forum['id']
- else:
- cur = db.cursor()
- cur.execute(
- "INSERT INTO forums (name,description,parent) VALUES (?,?,?)",
- (name,desc,forum['id']))
- new = cur.lastrowid
- # creator becomes bureaucrat of new forum
- db.execute("INSERT INTO role_assignments (role,user,forum) VALUES (?,?,?)",
- ("bureaucrat",g.user,new))
- fid = new
- db.commit()
- return redirect(url_for('forum.view_forum',forum_id=fid))
- else:
- if create:
- name = ""
- desc = ""
- else:
- name = forum['name']
- desc = forum['description']
- cancel_link = url_for('forum.view_forum',forum_id=forum['id'])
- return render_template("edit_forum.html",create=create,
- name=name,description=desc,cancel_link=cancel_link)
+ db = get_db()
+ if request.method == "POST":
+ name = request.form["name"]
+ desc = request.form["description"]
+ if len(name) > 100 or len(name.strip()) == 0:
+ flash("invalid name")
+ return redirect(url_for('forum.edit_forum',forum_id=forum['id']))
+ elif len(desc) > 6000:
+ flash("invalid description")
+ return redirect(url_for('forum.edit_forum',forum_id=forum['id']))
+ if not create:
+ db.execute("UPDATE forums SET name = ?, description = ? WHERE id = ?",
+ (name,desc,forum['id']))
+ fid = forum['id']
+ else:
+ cur = db.cursor()
+ cur.execute(
+ "INSERT INTO forums (name,description,parent) VALUES (?,?,?)",
+ (name,desc,forum['id']))
+ new = cur.lastrowid
+ # creator becomes bureaucrat of new forum
+ db.execute("INSERT INTO role_assignments (role,user,forum) VALUES (?,?,?)",
+ ("bureaucrat",g.user,new))
+ fid = new
+ db.commit()
+ return redirect(url_for('forum.view_forum',forum_id=fid))
+ else:
+ if create:
+ name = ""
+ desc = ""
+ else:
+ name = forum['name']
+ desc = forum['description']
+ cancel_link = url_for('forum.view_forum',forum_id=forum['id'])
+ return render_template("edit_forum.html",create=create,
+ name=name,description=desc,cancel_link=cancel_link)
@forum_route("edit",methods=["GET","POST"])
@requires_bureaucrat
def edit_forum(forum):
- return forum_config_page(forum)
+ return forum_config_page(forum)
@forum_route("create",methods=["GET","POST"])
@requires_permission("p_create_subforum")
def create_forum(forum):
- return forum_config_page(forum,create=True)
+ return forum_config_page(forum,create=True)
#@forum_route("unlisted")
#def view_unlisted(forum):
-# if not is_admin: abort(403) # why doesn't this fucking work
-# db = get_db()
-# unlisted = db.execute(
-# "SELECT * FROM forums WHERE unlisted = 1 AND parent = ?",(forum['id'],))
-# return render_template('view_unlisted.html',forum=forum,unlisted=unlisted)
+# if not is_admin: abort(403) # why doesn't this fucking work
+# db = get_db()
+# unlisted = db.execute(
+# "SELECT * FROM forums WHERE unlisted = 1 AND parent = ?",(forum['id'],))
+# return render_template('view_unlisted.html',forum=forum,unlisted=unlisted)
@bp.route("/search")
def search():
- db = get_db()
- query = request.args["q"]
- try:
- results = db.execute("""
- SELECT posts.id, highlight(posts_fts, 0, '<mark>', '</mark>') AS
- content, posts.thread, posts.author, posts.created, posts.edited,
- posts.updated, threads.title AS thread_title
- FROM posts_fts
- JOIN posts ON posts_fts.rowid = posts.id
- JOIN threads ON threads.id = posts.thread
- JOIN public_posts ON public_posts.id = posts.id
- WHERE posts_fts MATCH ? AND public_posts.public
- ORDER BY rank
- LIMIT 50
- """, (query,)).fetchall()
- except OperationalError as e:
- print(e)
- flash('your search query was malformed.')
- return redirect(url_for("forum.not_actual_index"))
-
- display_thread_id = [ True ] * len(results)
- last_thread = None
- for ix, result in enumerate(results):
- if result["thread"] == last_thread:
- display_thread_id[ix] = False
- last_thread = result["thread"]
- return render_template("search_results.html", results=results, query=query, display_thread_id=display_thread_id)
+ db = get_db()
+ query = request.args["q"]
+ try:
+ results = db.execute("""
+ SELECT posts.id, highlight(posts_fts, 0, '<mark>', '</mark>') AS
+ content, posts.thread, posts.author, posts.created, posts.edited,
+ posts.updated, threads.title AS thread_title
+ FROM posts_fts
+ JOIN posts ON posts_fts.rowid = posts.id
+ JOIN threads ON threads.id = posts.thread
+ JOIN public_posts ON public_posts.id = posts.id
+ WHERE posts_fts MATCH ? AND public_posts.public
+ ORDER BY rank
+ LIMIT 50
+ """, (query,)).fetchall()
+ except OperationalError as e:
+ print(e)
+ flash('your search query was malformed.')
+ return redirect(url_for("forum.not_actual_index"))
+
+ display_thread_id = [ True ] * len(results)
+ last_thread = None
+ for ix, result in enumerate(results):
+ if result["thread"] == last_thread:
+ display_thread_id[ix] = False
+ last_thread = result["thread"]
+ return render_template("search_results.html", results=results, query=query, display_thread_id=display_thread_id)