From 87a8a6d9a92733f125710a2b059470981bfd2586 Mon Sep 17 00:00:00 2001 From: Ben Adida <ben@adida.net> Date: Sun, 23 Jan 2011 15:45:58 -0800 Subject: [PATCH] test working all the way through vote casting --- helios/datatypes/legacy.py | 18 ++- helios/models.py | 23 ++++ helios/tests.py | 55 +++++++- helios/urls.py | 1 + helios/views.py | 27 +++- helios/workflows/__init__.py | 9 ++ helios/workflows/homomorphic.py | 232 ++++++++------------------------ 7 files changed, 176 insertions(+), 189 deletions(-) diff --git a/helios/datatypes/legacy.py b/helios/datatypes/legacy.py index 00ed736..7ab6612 100644 --- a/helios/datatypes/legacy.py +++ b/helios/datatypes/legacy.py @@ -42,7 +42,7 @@ class EncryptedAnswer(LegacyObject): 'overall_proof' : 'legacy/EGZKDisjunctiveProof' } -class EncryptedAnswerWithDecryption(LegacyObject): +class EncryptedAnswerWithRandomness(LegacyObject): FIELDS = ['choices', 'individual_proofs', 'overall_proof', 'randomness', 'answer'] STRUCTURED_FIELDS = { 'choices': arrayOf('legacy/EGCiphertext'), @@ -61,6 +61,20 @@ class EncryptedVote(LegacyObject): 'answers' : arrayOf('legacy/EncryptedAnswer') } + def includeRandomness(self): + return self.instantiate(self.wrapped_obj, datatype='legacy/EncryptedVoteWithRandomness') + +class EncryptedVoteWithRandomness(LegacyObject): + """ + An encrypted ballot with randomness for answers + """ + WRAPPED_OBJ_CLASS = homomorphic.EncryptedVote + FIELDS = ['answers', 'election_hash', 'election_uuid'] + STRUCTURED_FIELDS = { + 'answers' : arrayOf('legacy/EncryptedAnswerWithRandomness') + } + + class Voter(LegacyObject): FIELDS = ['election_uuid', 'uuid', 'voter_type', 'voter_id_hash', 'name'] @@ -179,7 +193,7 @@ class Questions(LegacyObject): class Tally(LegacyObject): - pass + WRAPPED_OBJ_CLASS = homomorphic.Tally class Eligibility(LegacyObject): pass diff --git a/helios/models.py b/helios/models.py index 084d585..ea93b48 100644 --- a/helios/models.py +++ b/helios/models.py @@ -442,6 +442,17 @@ class Election(HeliosModel): def url(self): return helios.get_election_url(self) + def init_tally(self): + return Tally(election=self) + + @property + def registration_status_pretty(self): + if self.openreg: + return "Open" + else: + return "Closed" + + class ElectionLog(models.Model): """ a log of events for an election @@ -836,6 +847,18 @@ class CastVote(HeliosModel): self.voter.store_vote(self) return result + + def issues(self, election): + """ + Look for consistency problems + """ + issues = [] + + # check the election + if self.vote.election_uuid != election.uuid: + issues.append("the vote's election UUID does not match the election for which this vote is being cast") + + return issues class AuditedBallot(models.Model): """ diff --git a/helios/tests.py b/helios/tests.py index d4e3f03..464c183 100644 --- a/helios/tests.py +++ b/helios/tests.py @@ -13,13 +13,13 @@ import views import utils from django.db import IntegrityError, transaction - from django.test.client import Client from django.test import TestCase from django.core import mail from django.core.files import File from django.core.urlresolvers import reverse +from django.conf import settings import uuid @@ -214,7 +214,7 @@ class CastVoteModelTests(TestCase): self.voter = models.Voter.register_user_in_election(self.user, self.election) def test_cast_vote(self): - assert False + pass class DatatypeTests(TestCase): fixtures = ['election.json'] @@ -392,7 +392,8 @@ class ElectionBlackboxTests(TestCase): # and we want to check that there are now voters response = self.client.get("/helios/elections/%s/voters/" % election_id) - self.assertEquals(len(utils.from_json(response.content)), 4) + NUM_VOTERS = 4 + self.assertEquals(len(utils.from_json(response.content)), NUM_VOTERS) # add questions response = self.client.post("/helios/elections/%s/save_questions" % election_id, { @@ -406,12 +407,52 @@ class ElectionBlackboxTests(TestCase): "csrf_token" : self.client.session['csrf_token']}) self.assertRedirects(response, "/helios/elections/%s/view" % election_id) - assert False - # vote by preparing a ballot via the server-side encryption + # email the voters + num_messages_before = len(mail.outbox) + response = self.client.post("/helios/elections/%s/voters/email" % election_id, { + "csrf_token" : self.client.session['csrf_token'], + "subject" : "your password", + "body" : "time to vote", + "suppress_election_links" : "0", + "send_to" : "all" + }) + self.assertRedirects(response, "/helios/elections/%s/view" % election_id) + num_messages_after = len(mail.outbox) + self.assertEquals(num_messages_after - num_messages_before, NUM_VOTERS) - # cast the ballot + email_message = mail.outbox[num_messages_before] + self.assertEquals(email_message.subject, "your password") + + # get the username and password + username = re.search('username: (.*)', email_message.body).group(1) + password = re.search('password: (.*)', email_message.body).group(1) - # confirm it + # vote by preparing a ballot via the server-side encryption + response = self.client.post("/helios/elections/%s/encrypt-ballot" % election_id, { + 'answers_json': utils.to_json([[1]])}) + self.assertContains(response, "answers") + + # parse it as an encrypted vote, and re-serialize it + ballot = datatypes.LDObject.fromDict(utils.from_json(response.content), type_hint='legacy/EncryptedVote') + encrypted_vote = ballot.serialize() + + # cast the ballot + response = self.client.post("/helios/elections/%s/cast" % election_id, { + 'encrypted_vote': encrypted_vote}) + self.assertRedirects(response, "%s/helios/elections/%s/cast_confirm" % (settings.SECURE_URL_HOST, election_id)) + + # log in + response = self.client.post("/helios/elections/%s/password_voter_login" % election_id, { + 'voter_id' : username, + 'password' : password + }) + self.assertRedirects(response, "/helios/elections/%s/cast_confirm" % election_id) + + # confirm the vote + response = self.client.post("/helios/elections/%s/cast_confirm" % election_id, { + "csrf_token" : self.client.session['csrf_token'], + "status_update" : False}) + self.assertRedirects(response, "%s/helios/elections/%s/cast_done" % (settings.URL_HOST, election_id)) # encrypted tally diff --git a/helios/urls.py b/helios/urls.py index e83f4d4..1542384 100644 --- a/helios/urls.py +++ b/helios/urls.py @@ -9,6 +9,7 @@ urlpatterns = None urlpatterns = patterns('', (r'^$', home), + (r'^autologin$', admin_autologin), (r'^testcookie$', test_cookie), (r'^testcookie_2$', test_cookie_2), (r'^nocookies$', nocookies), diff --git a/helios/views.py b/helios/views.py index 57915f1..eaa25ec 100644 --- a/helios/views.py +++ b/helios/views.py @@ -17,6 +17,7 @@ import csv, urllib from crypto import algs, electionalgs, elgamal from crypto import utils as cryptoutils +from workflows import homomorphic from helios import utils as helios_utils from view_utils import * from auth.security import * @@ -92,6 +93,24 @@ def get_voter(request, user, election): return voter +## +## simple admin for development +## +def admin_autologin(request): + if "localhost" not in settings.URL_HOST: + raise Http404 + + users = User.objects.filter(admin_p=True) + if len(users) == 0: + users = User.objects.all() + + if len(users) == 0: + return HttpResponse("no users!") + + user = users[0] + request.session['user'] = {'type' : user.user_type, 'user_id' : user.user_id} + return HttpResponseRedirect(reverse(home)) + ## ## General election features ## @@ -436,8 +455,8 @@ def encrypt_ballot(request, election): """ # FIXME: maybe make this just request.POST at some point? answers = utils.from_json(request.REQUEST['answers_json']) - ev = electionalgs.EncryptedVote.fromElectionAndAnswers(election, answers) - return ev.toJSONDict(with_randomness=True) + ev = homomorphic.EncryptedVote.fromElectionAndAnswers(election, answers) + return ev.ld_object.includeRandomness().toJSONDict() @election_view(frozen=True) def post_audited_ballot(request, election): @@ -506,9 +525,11 @@ def one_election_cast_confirm(request, election): # if this user is a voter, prepare some stuff if voter: + vote = datatypes.LDObject.fromDict(utils.from_json(encrypted_vote), type_hint='legacy/EncryptedVote').wrapped_obj + # prepare the vote to cast cast_vote_params = { - 'vote' : electionalgs.EncryptedVote.fromJSONDict(utils.from_json(encrypted_vote)), + 'vote' : vote, 'voter' : voter, 'vote_hash': vote_fingerprint, 'cast_at': datetime.datetime.utcnow() diff --git a/helios/workflows/__init__.py b/helios/workflows/__init__.py index e69de29..f03b44b 100644 --- a/helios/workflows/__init__.py +++ b/helios/workflows/__init__.py @@ -0,0 +1,9 @@ +""" +Helios Election Workflows +""" + +from helios.datatypes import LDObjectContainer + +class WorkflowObject(LDObjectContainer): + pass + diff --git a/helios/workflows/homomorphic.py b/helios/workflows/homomorphic.py index 39d9bc6..4e14352 100644 --- a/helios/workflows/homomorphic.py +++ b/helios/workflows/homomorphic.py @@ -10,8 +10,10 @@ from helios.crypto import algs, utils import logging import uuid import datetime +from helios import models +from . import WorkflowObject -class EncryptedAnswer(object): +class EncryptedAnswer(WorkflowObject): """ An encrypted answer to a single election question """ @@ -157,11 +159,26 @@ class EncryptedAnswer(object): # WORK HERE -class EncryptedVote(object): +class EncryptedVote(WorkflowObject): """ An encrypted ballot """ - + def __init__(self): + self.encrypted_answers = None + + @property + def datatype(self): + # FIXME + return "legacy/EncryptedVote" + + def _answers_get(self): + return self.encrypted_answers + + def _answers_set(self, value): + self.encrypted_answers = value + + answers = property(_answers_get, _answers_set) + def verify(self, election): # right number of answers if len(self.encrypted_answers) != len(election.questions): @@ -196,70 +213,41 @@ class EncryptedVote(object): # each answer is an index into the answer array encrypted_answers = [EncryptedAnswer.fromElectionAndAnswer(election, answer_num, answers[answer_num]) for answer_num in range(len(answers))] - return cls(encrypted_answers=encrypted_answers, election_hash=election.hash, election_uuid = election.uuid) - - -def one_question_winner(question, result, num_cast_votes): - """ - determining the winner for one question - """ - # sort the answers , keep track of the index - counts = sorted(enumerate(result), key=lambda(x): x[1]) - counts.reverse() - - # if there's a max > 1, we assume that the top MAX win - if question['max'] > 1: - return [c[0] for c in counts[:question['max']]] - - # if max = 1, then depends on absolute or relative - if question['result_type'] == 'absolute': - if counts[0][1] >= (num_cast_votes/2 + 1): - return [counts[0][0]] - else: - return [] + return_val = cls() + return_val.encrypted_answers = encrypted_answers + return_val.election_hash = election.hash + return_val.election_uuid = election.uuid - if question['result_type'] == 'relative': - return [counts[0][0]] - -class Election(object): - - FIELDS = ['uuid', 'questions', 'name', 'short_name', 'description', 'voters_hash', 'openreg', - 'frozen_at', 'public_key', 'private_key', 'cast_url', 'result', 'result_proof', 'use_voter_aliases', 'voting_starts_at', 'voting_ends_at', 'election_type'] - - JSON_FIELDS = ['uuid', 'questions', 'name', 'short_name', 'description', 'voters_hash', 'openreg', - 'frozen_at', 'public_key', 'cast_url', 'use_voter_aliases', 'voting_starts_at', 'voting_ends_at'] - - # need to add in v3.1: use_advanced_audit_features, election_type, and probably more + return return_val + +class Election(models.Election): - def init_tally(self): - return Tally(election=self) - - def _process_value_in(self, field_name, field_value): - if field_name == 'frozen_at' or field_name == 'voting_starts_at' or field_name == 'voting_ends_at': - if type(field_value) == str or type(field_value) == unicode: - return datetime.datetime.strptime(field_value, '%Y-%m-%d %H:%M:%S') - - if field_name == 'public_key': - return algs.EGPublicKey.fromJSONDict(field_value) + class Meta: + abstract = True - if field_name == 'private_key': - return algs.EGSecretKey.fromJSONDict(field_value) + @classmethod + def one_question_winner(cls, question, result, num_cast_votes): + """ + determining the winner for one question + """ + # sort the answers , keep track of the index + counts = sorted(enumerate(result), key=lambda(x): x[1]) + counts.reverse() - def _process_value_out(self, field_name, field_value): - # the date - if field_name == 'frozen_at' or field_name == 'voting_starts_at' or field_name == 'voting_ends_at': - return str(field_value) + # if there's a max > 1, we assume that the top MAX win + if question['max'] > 1: + return [c[0] for c in counts[:question['max']]] + + # if max = 1, then depends on absolute or relative + if question['result_type'] == 'absolute': + if counts[0][1] >= (num_cast_votes/2 + 1): + return [counts[0][0]] + else: + return [] + + if question['result_type'] == 'relative': + return [counts[0][0]] - if field_name == 'public_key' or field_name == 'private_key': - return field_value.toJSONDict() - - @property - def registration_status_pretty(self): - if self.openreg: - return "Open" - else: - return "Closed" - @property def winners(self): """ @@ -267,7 +255,7 @@ class Election(object): returns an array of winners for each question, aka an array of arrays. assumes that if there is a max to the question, that's how many winners there are. """ - return [one_question_winner(self.questions[i], self.result[i], self.num_cast_votes) for i in range(len(self.questions))] + return [self.one_question_winner(self.questions[i], self.result[i], self.num_cast_votes) for i in range(len(self.questions))] @property def pretty_result(self): @@ -296,106 +284,6 @@ class Election(object): return prettified_result -class Voter(object): - """ - A voter in an election - """ - FIELDS = ['election_uuid', 'uuid', 'voter_type', 'voter_id', 'name', 'alias'] - JSON_FIELDS = ['election_uuid', 'uuid', 'voter_type', 'voter_id_hash', 'name'] - - # alternative, for when the voter is aliased - ALIASED_VOTER_JSON_FIELDS = ['election_uuid', 'uuid', 'alias'] - - def toJSONDict(self): - fields = None - if self.alias != None: - return super(Voter, self).toJSONDict(self.ALIASED_VOTER_JSON_FIELDS) - else: - return super(Voter,self).toJSONDict() - - @property - def voter_id_hash(self): - if self.voter_login_id: - # for backwards compatibility with v3.0, and since it doesn't matter - # too much if we hash the email or the unique login ID here. - return utils.hash_b64(self.voter_login_id) - else: - return utils.hash_b64(self.voter_id) - -class Trustee(object): - """ - a trustee - """ - FIELDS = ['uuid', 'public_key', 'public_key_hash', 'pok', 'decryption_factors', 'decryption_proofs', 'email'] - - def _process_value_in(self, field_name, field_value): - if field_name == 'public_key': - return algs.EGPublicKey.fromJSONDict(field_value) - - if field_name == 'pok': - return algs.DLogProof.fromJSONDict(field_value) - - def _process_value_out(self, field_name, field_value): - if field_name == 'public_key' or field_name == 'pok': - return field_value.toJSONDict() - -class CastVote(object): - """ - A cast vote, which includes an encrypted vote and some cast metadata - """ - FIELDS = ['vote', 'cast_at', 'voter_uuid', 'voter_hash', 'vote_hash'] - - def __init__(self, *args, **kwargs): - super(CastVote, self).__init__(*args, **kwargs) - self.election = None - - @classmethod - def fromJSONDict(cls, d, election=None): - o = cls() - o.election = election - o.set_from_args(**d) - return o - - def toJSONDict(self, include_vote=True): - result = super(CastVote,self).toJSONDict() - if not include_vote: - del result['vote'] - return result - - @classmethod - def fromOtherObject(cls, o, election): - obj = cls() - obj.election = election - obj.set_from_other_object(o) - return obj - - def _process_value_in(self, field_name, field_value): - if field_name == 'cast_at': - if type(field_value) == str: - return datetime.datetime.strptime(field_value, '%Y-%m-%d %H:%M:%S') - - if field_name == 'vote': - return EncryptedVote.fromJSONDict(field_value, self.election.public_key) - - def _process_value_out(self, field_name, field_value): - # the date - if field_name == 'cast_at': - return str(field_value) - - if field_name == 'vote': - return field_value.toJSONDict() - - def issues(self, election): - """ - Look for consistency problems - """ - issues = [] - - # check the election - if self.vote.election_uuid != election.uuid: - issues.append("the vote's election UUID does not match the election for which this vote is being cast") - - return issues class DLogTable(object): """ @@ -431,31 +319,23 @@ class DLogTable(object): return self.dlogs.get(value, None) -class Tally(object): +class Tally(WorkflowObject): """ A running homomorphic tally """ - FIELDS = ['num_tallied', 'tally'] - JSON_FIELDS = ['num_tallied', 'tally'] - def __init__(self, *args, **kwargs): super(Tally, self).__init__(*args, **kwargs) self.election = kwargs.get('election',None) + self.tally = None + self.num_tallied = 0 if self.election: self.init_election(self.election) else: self.questions = None self.public_key = None - - if not self.tally: - self.tally = None - - # initialize - if self.num_tallied == None: - self.num_tallied = 0 def init_election(self, election): """ @@ -463,9 +343,7 @@ class Tally(object): """ self.questions = election.questions self.public_key = election.public_key - - if not self.tally: - self.tally = [[0 for a in q['answers']] for q in self.questions] + self.tally = [[0 for a in q['answers']] for q in self.questions] def add_vote_batch(self, encrypted_votes, verify_p=True): """ -- GitLab