import base64 import binascii import json import math import random import secrets import urllib.parse import gql import requests from django.conf import settings from django.http import HttpResponse, JsonResponse from django.shortcuts import render from django.template.loader import render_to_string from django_http_exceptions import HTTPExceptions from gql.transport.exceptions import TransportQueryError from gql.transport.requests import RequestsHTTPTransport def get_options_by_member(source, rv_members) -> dict: try: options_by_member = json.loads(source) # The provided votes must be a dictionary with at least 1 key. if not isinstance(options_by_member, dict) or len(options_by_member) == 0: raise ValueError except ValueError: raise HTTPExceptions.BAD_REQUEST rv_members = convert_rv_members_to_dict(rv_members) for member, options in options_by_member.items(): # `member` is not a string if not isinstance(member, str): raise HTTPExceptions.BAD_REQUEST # `member` is not in the list of known members if member not in options_by_member: raise HTTPExceptions.BAD_REQUEST # `options` is not a list of 1+ length if not isinstance(options, list) and len(options) != 0: raise HTTPExceptions.BAD_REQUEST # all items in `options` are strings for item in options: if not isinstance(item, str): raise HTTPExceptions.BAD_REQUEST return options_by_member def index(request): rv_members = get_rv_members(request) keyed_rv_members = convert_rv_members_to_dict(rv_members) # Get votes from a possible permalink options_by_member = request.GET.get("votes") if options_by_member is not None: options_by_member = urllib.parse.unquote(options_by_member) # BEGIN Input validation if options_by_member is not None: options_by_member = get_options_by_member( options_by_member, rv_members=rv_members, ) return render( request, "rv_voting_calc/index.html", { "rv_members": rv_members, "keyed_rv_members": keyed_rv_members, "options_by_member": options_by_member, }, ) def get_rv_members(request, get_rv_gid: bool = False): transport = RequestsHTTPTransport(url=settings.CHOBOTNICE_API_URL) client = gql.Client( transport=transport, fetch_schema_from_transport=True, ) rv_gid = None if "rv_gid" in request.GET: rv_gid = urllib.parse.unquote(request.GET["rv_gid"]) try: base64.b64decode(rv_gid, validate=True) except binascii.Error: raise HTTPExceptions.BAD_REQUEST else: rv_gid = settings.CHOBOTNICE_RV_GID # Get members from query query = gql.gql( f""" {{ group(id: "{rv_gid}") {{ memberships {{ person {{ username displayName officialLastName }} }} }} }} """ ) try: result = client.execute(query) except TransportQueryError: # rv_gid was not found raise HTTPExceptions.BAD_REQUEST rv_members = [] # Convert to a nicer format for member in result["group"]["memberships"]: rv_members.append(member["person"]) # Sort alphabetically rv_members.sort(key=lambda value: value["displayName"]) if get_rv_gid: return rv_members, rv_gid else: return rv_members def convert_rv_members_to_dict(source: list) -> dict: rv_members = {} for member in source: # Convert to username: {data} rv_members[member["username"]] = { "displayName": member["displayName"], "officialLastName": member["officialLastName"], } return rv_members def do_step_b_through_d( request, iteration: int, html_steps: list, md_steps: list, options: dict, options_without_support: list, options_by_member: dict, rv_members: dict, ) -> tuple: ## BEGIN Step B options_marked_for_deletion = [] # Move ticket votes to the next most popular option, if necessary for option_key, option in options.items(): votes_to_remove = [] if option["marked_for_deletion"]: options_marked_for_deletion.append(option_key) # If the option has support and isn't marked for deletion, we don't need # to do anything. if option["has_support"] and not option["marked_for_deletion"]: continue # For all of the option's votes... for vote_list_position, vote in enumerate(option["ticket_votes"]): # Find the other acceptable options listed within... for other_acceptable_option in vote["other_acceptable"]: # ... if the other acceptable option no longer exists, do nothing. if other_acceptable_option not in options: continue # ... if the other acceptable option has support and isn't marked # for deletion, move this ticket vote there. We don't need to check # positions, since the list we are looping through is ordered # by popularity. if ( options[other_acceptable_option]["has_support"] and not options[other_acceptable_option]["marked_for_deletion"] ): votes_to_remove.append(vote_list_position) options[other_acceptable_option]["ticket_votes"].append(vote) options[other_acceptable_option]["ticket_count"] += 1 break # Remove votes that have been moved from this option. index_offset = 0 for vote_list_position in votes_to_remove: option["ticket_votes"].pop(vote_list_position - index_offset) index_offset += 1 # If we are on the first iteration, show the list of options with those that # don't have support removed. if iteration == 0: # Get the number of options that have support, for displaying. options_with_support_count = 0 for option in options.values(): if option["has_support"]: options_with_support_count += 1 context = { "options": options, "options_by_member": options_by_member, "options_with_support_count": options_with_support_count, "rv_members": rv_members, } html_steps.append( render_to_string( "rv_voting_calc/html_steps/with_support.html", context, request=request, ) ) md_steps.append( render_to_string( "rv_voting_calc/md_steps/with_support.md", context, request=request, ) ) # Delete options without support and those marked for deletion. for option_key in options_without_support + options_marked_for_deletion: options.pop(option_key, None) # If no options remain, show that there are no winners and end everything. if len(options) == 0: html_steps.append( render_to_string( "rv_voting_calc/html_steps/no_winner.html", {}, request=request, ) ) md_steps.append( render_to_string( "rv_voting_calc/md_steps/no_winner.md", {}, request=request, ) ) return options, True ## END Step B ## BEGIN Step C # Sort options by their vote count. options = { key: value for key, value in sorted( options.items(), reverse=True, key=lambda option: option[1]["vote_count"], ) } # Find winners. winner_vote_count = options[next(iter(options))]["vote_count"] ticket_count_same = True winner_count = 0 winning_option_keys = [] for option_key, option in options.items(): if option["vote_count"] == winner_vote_count: if option["ticket_count"] != options[next(iter(options))]["ticket_count"]: ticket_count_same = False winning_option_keys.append(option_key) winner_count += 1 # If there is exactly 1 winner, show the winner and end everything. if winner_count == 1: context = { "option_key": winning_option_keys[0], "option": options[winning_option_keys[0]], } html_steps.append( render_to_string( "rv_voting_calc/html_steps/found_popularity_winner.html", context, request=request, ) ) md_steps.append( render_to_string( "rv_voting_calc/md_steps/found_popularity_winner.md", context, request=request, ) ) return options, True # If all options are winners and have the same ticket vote count, # randomly eliminate one and continue to the next iteration. elif winner_count == len(options) and ticket_count_same: eliminated_winner = random.choice(winning_option_keys) options[eliminated_winner]["marked_for_deletion"] = True context = { "iteration": iteration, "option_key": eliminated_winner, "option": options[eliminated_winner], "options": options, "options_by_member": options_by_member, "rv_members": rv_members, "randomly": True, } html_steps.append( render_to_string( "rv_voting_calc/html_steps/removed_option.html", context, request=request, ) ) md_steps.append( render_to_string( "rv_voting_calc/md_steps/removed_option.md", context, request=request, ) ) return options, False ## END Step C ## BEGIN Step D # Sort options by their vote count, with lowest ranking ones first. options = { key: value for key, value in sorted( options.items(), key=lambda option: option[1]["vote_count"], ) } # Find losing options. loser_vote_count = options[next(iter(options))]["vote_count"] loser_count = 0 losing_option_keys = [] for option_key, option in options.items(): if option["vote_count"] == loser_vote_count: losing_option_keys.append(option_key) loser_count += 1 # If there is exactly one losing option, delete it and move on to the next # iteration. if loser_count == 1: loser_key = next(iter(options)) options[loser_key]["marked_for_deletion"] = True context = { "iteration": iteration, "option_key": loser_key, "option": options[loser_key], "options": options, "options_by_member": options_by_member, "rv_members": rv_members, } html_steps.append( render_to_string( "rv_voting_calc/html_steps/removed_option.html", context, request=request, ) ) md_steps.append( render_to_string( "rv_voting_calc/md_steps/removed_option.md", context, request=request, ) ) return options, False else: # If there are multiple losing options, order them by the amount of # ticket votes they have. losing_option_keys.sort(key=lambda key: options[key]["ticket_count"]) # Delete the vote with the least ticket votes. There maybe more, but # those should be taken care of in the next iteration. options[losing_option_keys[0]]["marked_for_deletion"] = True context = { "iteration": iteration, "option_key": losing_option_keys[0], "option": options[losing_option_keys[0]], "options": options, "options_by_member": options_by_member, "rv_members": rv_members, } html_steps.append( render_to_string( "rv_voting_calc/html_steps/removed_option.html", context, request=request, ) ) md_steps.append( render_to_string( "rv_voting_calc/md_steps/removed_option.md", context, request=request, ) ) # End this iteration and move to the next one. return options, False ## END Step D # (TODO: Will this function ever actually reach the code below?) # If there are no winners, show that and end everything. if len(options) == 0: html_steps.append( render_to_string( "rv_voting_calc/html_steps/no_winner.html", {}, request=request, ) ) md_steps.append( render_to_string( "rv_voting_calc/md_steps/no_winner.md", {}, request=request, ) ) return options, True # Continue on to the next iteration. return options, False def get_calculated_votes(request): options_by_member = request.GET.get("votes") if options_by_member is not None: options_by_member = urllib.parse.unquote(options_by_member) # BEGIN Input validation if options_by_member is None: raise HTTPExceptions.BAD_REQUEST rv_members, rv_gid = get_rv_members(request, get_rv_gid=True) options_by_member = get_options_by_member( options_by_member, rv_members=rv_members, ) rv_members = convert_rv_members_to_dict(rv_members) # END Input validation ## BEGIN Step A # Sorting html_steps = [] md_steps = [] options = {} total_ticket_count = len(options_by_member) # 1 member = 1 ticket has_support_treshold = math.ceil(total_ticket_count * 0.5) # If the ticket count is divisible by 2, add 1 to make the treshold >half if total_ticket_count % 2 == 0: has_support_treshold += 1 # For each member's selected options... for member, selected_options in options_by_member.items(): for position, option in enumerate(selected_options): # Add the option to the options list if it isn't there already. if option not in options: options[option] = { "ticket_count": 0, "vote_count": 0, "marked_for_deletion": False, "has_support": False, "ticket_votes": [], } # If this vote isn't a ticket vote, don't do anything else. if position != 0: continue # Add this vote to the option's ticket votes. options[option]["ticket_votes"].append( {"member": member, "other_acceptable": selected_options[1:]} ) # Increase the vote and ticket vote counters. options[option]["ticket_count"] += 1 options[option]["vote_count"] += 1 # For all other acceptable options the member selected... for other_acceptable_option in selected_options[1:]: # If the option isn't among in the options list, add it. if other_acceptable_option not in options: options[other_acceptable_option] = { "ticket_count": 0, "vote_count": 0, "marked_for_deletion": False, "has_support": False, "ticket_votes": [], } # Increase the option's vote counter. options[other_acceptable_option]["vote_count"] += 1 # Mark options that don't meet the >half acceptability treshold as such. options_without_support = [] for option_key, option in options.items(): if option["vote_count"] >= has_support_treshold: option["has_support"] = True if not option["has_support"]: options_without_support.append(option_key) continue # Order the options based on whether they have support or not. # This is purely for aesthetic purpose. options = { key: value for key, value in sorted( options.items(), reverse=True, key=lambda option: option[1]["has_support"], ) } context = { "options": options, "options_by_member": options_by_member, "rv_members": rv_members, "total_ticket_count": total_ticket_count, "has_support_treshold": has_support_treshold, } html_steps.append( render_to_string( "rv_voting_calc/html_steps/initial_sort.html", context, request=request, ) ) md_steps.append( render_to_string( "rv_voting_calc/md_steps/initial_sort.md", context, request=request, ) ) ## END Step A # Make sure we can always repeat a previously made calculation. This is also # useful for permalinks. # First, try to find the seed in the URL parameters seed = request.GET.get( "seed", # Then, try to find it in the cookies request.COOKIES.get( "seed", # If neither are set, generate a default. base64.b64encode(secrets.token_bytes()).decode("utf-8"), ), ) # Unquote the seed if it was found in the URL parameters if "seed" in request.GET: seed = urllib.parse.unquote(seed) random.seed(seed) print(seed) # Continue until steps B - D have reached a final conclusion and count # the amount of times the function ran to achieve this. iteration = 0 while True: options, ended = do_step_b_through_d( request, iteration, html_steps, md_steps, options, options_without_support, options_by_member, rv_members, ) if ended: response = render( request, "rv_voting_calc/combined_steps.html", {"html_steps": "\n".join(html_steps), "md_steps": md_steps}, ) # Set the random seed cookie to the one used in the calculation. response.set_cookie("seed", seed) # Do the same for RV's GID. response.set_cookie("rv_gid", rv_gid) return response # Step E iteration += 1