diff --git a/helios/crypto/algs.py b/helios/crypto/algs.py index acac4f44b4c935a49f4873e8dea92bcb17da0bbc..8ef36212bef0e8dfd87963585e5d3d7845637415 100644 --- a/helios/crypto/algs.py +++ b/helios/crypto/algs.py @@ -6,158 +6,62 @@ FIXME: improve random number generation. Ben Adida ben@adida.net """ +from __future__ import print_function -import math, hashlib, logging -import randpool, number +import hashlib +import logging -import numtheory +from Crypto.Util import number -# some utilities -class Utils: - RAND = randpool.RandomPool() - - @classmethod - def random_seed(cls, data): - cls.RAND.add_event(data) - - @classmethod - def random_mpz(cls, n_bits): - low = 2**(n_bits-1) - high = low * 2 - - # increment and find a prime - # return randrange(low, high) - - return number.getRandomNumber(n_bits, cls.RAND.get_bytes) - - @classmethod - def random_mpz_lt(cls, max): - # return randrange(0, max) - n_bits = int(math.floor(math.log(max, 2))) - return (number.getRandomNumber(n_bits, cls.RAND.get_bytes) % max) - - @classmethod - def random_prime(cls, n_bits): - return number.getPrime(n_bits, cls.RAND.get_bytes) - - @classmethod - def is_prime(cls, mpz): - #return numtheory.miller_rabin(mpz) - return number.isPrime(mpz) - - @classmethod - def xgcd(cls, a, b): - """ - Euclid's Extended GCD algorithm - """ - mod = a%b - - if mod == 0: - return 0,1 - else: - x,y = cls.xgcd(b, mod) - return y, x-(y*(a/b)) - - @classmethod - def inverse(cls, mpz, mod): - # return cls.xgcd(mpz,mod)[0] - return number.inverse(mpz, mod) - - @classmethod - def random_safe_prime(cls, n_bits): - p = None - q = None - - while True: - p = cls.random_prime(n_bits) - q = (p-1)/2 - if cls.is_prime(q): - return p - - @classmethod - def random_special_prime(cls, q_n_bits, p_n_bits): - p = None - q = None - - z_n_bits = p_n_bits - q_n_bits - - q = cls.random_prime(q_n_bits) - - while True: - z = cls.random_mpz(z_n_bits) - p = q*z + 1 - if cls.is_prime(p): - return p, q, z +from helios.crypto.utils import random class ElGamal: def __init__(self): - self.p = None - self.q = None - self.g = None - - @classmethod - def generate(cls, n_bits): - """ - generate an El-Gamal environment. Returns an instance - of ElGamal(), with prime p, group size q, and generator g - """ - - EG = ElGamal() - - # find a prime p such that (p-1)/2 is prime q - EG.p = Utils.random_safe_prime(n_bits) - - # q is the order of the group - # FIXME: not always p-1/2 - EG.q = (EG.p-1)/2 - - # find g that generates the q-order subgroup - while True: - EG.g = Utils.random_mpz_lt(EG.p) - if pow(EG.g, EG.q, EG.p) == 1: - break - - return EG + self.p = None + self.q = None + self.g = None def generate_keypair(self): - """ - generates a keypair in the setting - """ + """ + generates a keypair in the setting + """ - keypair = EGKeyPair() - keypair.generate(self.p, self.q, self.g) + keypair = EGKeyPair() + keypair.generate(self.p, self.q, self.g) - return keypair + return keypair def toJSONDict(self): - return {'p': str(self.p), 'q': str(self.q), 'g': str(self.g)} + return {'p': str(self.p), 'q': str(self.q), 'g': str(self.g)} @classmethod def fromJSONDict(cls, d): - eg = cls() - eg.p = int(d['p']) - eg.q = int(d['q']) - eg.g = int(d['g']) - return eg + eg = cls() + eg.p = int(d['p']) + eg.q = int(d['q']) + eg.g = int(d['g']) + return eg + class EGKeyPair: def __init__(self): - self.pk = EGPublicKey() - self.sk = EGSecretKey() + self.pk = EGPublicKey() + self.sk = EGSecretKey() def generate(self, p, q, g): - """ - Generate an ElGamal keypair - """ - self.pk.g = g - self.pk.p = p - self.pk.q = q + """ + Generate an ElGamal keypair + """ + self.pk.g = g + self.pk.p = p + self.pk.q = q + + self.sk.x = random.mpz_lt(q) + self.pk.y = pow(g, self.sk.x, p) - self.sk.x = Utils.random_mpz_lt(q) - self.pk.y = pow(g, self.sk.x, p) + self.sk.pk = self.pk - self.sk.pk = self.pk class EGPublicKey: def __init__(self): @@ -166,7 +70,7 @@ class EGPublicKey: self.g = None self.q = None - def encrypt_with_r(self, plaintext, r, encode_message= False): + def encrypt_with_r(self, plaintext, r, encode_message=False): """ expecting plaintext.m to be a big integer """ @@ -175,13 +79,13 @@ class EGPublicKey: # make sure m is in the right subgroup if encode_message: - y = plaintext.m + 1 - if pow(y, self.q, self.p) == 1: - m = y - else: - m = -y % self.p + y = plaintext.m + 1 + if pow(y, self.q, self.p) == 1: + m = y + else: + m = -y % self.p else: - m = plaintext.m + m = plaintext.m ciphertext.alpha = pow(self.g, r, self.p) ciphertext.beta = (m * pow(self.y, r, self.p)) % self.p @@ -192,7 +96,7 @@ class EGPublicKey: """ Encrypt a plaintext and return the randomness just generated and used. """ - r = Utils.random_mpz_lt(self.q) + r = random.mpz_lt(self.q) ciphertext = self.encrypt_with_r(plaintext, r) return [ciphertext, r] @@ -207,70 +111,70 @@ class EGPublicKey: """ Serialize to dictionary. """ - return {'y' : str(self.y), 'p' : str(self.p), 'g' : str(self.g) , 'q' : str(self.q)} + return {'y': str(self.y), 'p': str(self.p), 'g': str(self.g), 'q': str(self.q)} toJSONDict = to_dict # quick hack FIXME def toJSON(self): - import utils - return utils.to_json(self.toJSONDict()) - - def __mul__(self,other): - if other == 0 or other == 1: - return self - - # check p and q - if self.p != other.p or self.q != other.q or self.g != other.g: - raise Exception("incompatible public keys") - - result = EGPublicKey() - result.p = self.p - result.q = self.q - result.g = self.g - result.y = (self.y * other.y) % result.p - return result + import utils + return utils.to_json(self.toJSONDict()) + + def __mul__(self, other): + if other == 0 or other == 1: + return self + + # check p and q + if self.p != other.p or self.q != other.q or self.g != other.g: + raise Exception("incompatible public keys") + + result = EGPublicKey() + result.p = self.p + result.q = self.q + result.g = self.g + result.y = (self.y * other.y) % result.p + return result - def verify_sk_proof(self, dlog_proof, challenge_generator = None): - """ - verify the proof of knowledge of the secret key - g^response = commitment * y^challenge - """ - left_side = pow(self.g, dlog_proof.response, self.p) - right_side = (dlog_proof.commitment * pow(self.y, dlog_proof.challenge, self.p)) % self.p + def verify_sk_proof(self, dlog_proof, challenge_generator=None): + """ + verify the proof of knowledge of the secret key + g^response = commitment * y^challenge + """ + left_side = pow(self.g, dlog_proof.response, self.p) + right_side = (dlog_proof.commitment * pow(self.y, dlog_proof.challenge, self.p)) % self.p - expected_challenge = challenge_generator(dlog_proof.commitment) % self.q + expected_challenge = challenge_generator(dlog_proof.commitment) % self.q - return ((left_side == right_side) and (dlog_proof.challenge == expected_challenge)) + return (left_side == right_side) and (dlog_proof.challenge == expected_challenge) def validate_pk_params(self): - # check primality of p - if not number.isPrime(self.p): - raise Exception("p is not prime.") + # check primality of p + if not number.isPrime(self.p): + raise Exception("p is not prime.") - # check length of p - if not (number.size(self.p) >= 2048): - raise Exception("p of insufficient length. Should be 2048 bits or greater.") + # check length of p + if not (number.size(self.p) >= 2048): + raise Exception("p of insufficient length. Should be 2048 bits or greater.") - # check primality of q - if not number.isPrime(self.q): - raise Exception("q is not prime.") + # check primality of q + if not number.isPrime(self.q): + raise Exception("q is not prime.") - # check length of q - if not (number.size(self.q) >= 256): - raise Exception("q of insufficient length. Should be 256 bits or greater.") + # check length of q + if not (number.size(self.q) >= 256): + raise Exception("q of insufficient length. Should be 256 bits or greater.") - if (pow(self.g,self.q,self.p)!=1): - raise Exception("g does not generate subgroup of order q.") + if pow(self.g, self.q, self.p) != 1: + raise Exception("g does not generate subgroup of order q.") - if not (1 < self.g < self.p-1): - raise Exception("g out of range.") + if not (1 < self.g < self.p - 1): + raise Exception("g out of range.") - if not (1 < self.y < self.p-1): - raise Exception("y out of range.") + if not (1 < self.y < self.p - 1): + raise Exception("y out of range.") - if (pow(self.y,self.q,self.p)!=1): - raise Exception("g does not generate proper group.") + if pow(self.y, self.q, self.p) != 1: + raise Exception("g does not generate proper group.") @classmethod def from_dict(cls, d): @@ -284,14 +188,15 @@ class EGPublicKey: pk.q = int(d['q']) try: - pk.validate_pk_params() + pk.validate_pk_params() except Exception as e: - raise + raise e return pk fromJSONDict = from_dict + class EGSecretKey: def __init__(self): self.x = None @@ -317,25 +222,25 @@ class EGSecretKey: return dec_factor, proof - def decrypt(self, ciphertext, dec_factor = None, decode_m=False): + def decrypt(self, ciphertext, dec_factor=None, decode_m=False): """ Decrypt a ciphertext. Optional parameter decides whether to encode the message into the proper subgroup. """ if not dec_factor: dec_factor = self.decryption_factor(ciphertext) - m = (Utils.inverse(dec_factor, self.pk.p) * ciphertext.beta) % self.pk.p + m = (number.inverse(dec_factor, self.pk.p) * ciphertext.beta) % self.pk.p if decode_m: - # get m back from the q-order subgroup - if m < self.pk.q: - y = m - else: - y = -m % self.pk.p + # get m back from the q-order subgroup + if m < self.pk.q: + y = m + else: + y = -m % self.pk.p - return EGPlaintext(y-1, self.pk) + return EGPlaintext(y - 1, self.pk) else: - return EGPlaintext(m, self.pk) + return EGPlaintext(m, self.pk) def prove_decryption(self, ciphertext): """ @@ -350,66 +255,66 @@ class EGSecretKey: and alpha^t = b * beta/m ^ c """ - m = (Utils.inverse(pow(ciphertext.alpha, self.x, self.pk.p), self.pk.p) * ciphertext.beta) % self.pk.p - beta_over_m = (ciphertext.beta * Utils.inverse(m, self.pk.p)) % self.pk.p + m = (number.inverse(pow(ciphertext.alpha, self.x, self.pk.p), self.pk.p) * ciphertext.beta) % self.pk.p + beta_over_m = (ciphertext.beta * number.inverse(m, self.pk.p)) % self.pk.p # pick a random w - w = Utils.random_mpz_lt(self.pk.q) + w = random.mpz_lt(self.pk.q) a = pow(self.pk.g, w, self.pk.p) b = pow(ciphertext.alpha, w, self.pk.p) - c = int(hashlib.sha1(str(a) + "," + str(b)).hexdigest(),16) + c = int(hashlib.sha1(str(a) + "," + str(b)).hexdigest(), 16) t = (w + self.x * c) % self.pk.q return m, { - 'commitment' : {'A' : str(a), 'B': str(b)}, - 'challenge' : str(c), - 'response' : str(t) - } + 'commitment': {'A': str(a), 'B': str(b)}, + 'challenge': str(c), + 'response': str(t) + } def to_dict(self): - return {'x' : str(self.x), 'public_key' : self.pk.to_dict()} + return {'x': str(self.x), 'public_key': self.pk.to_dict()} toJSONDict = to_dict def prove_sk(self, challenge_generator): - """ - Generate a PoK of the secret key - Prover generates w, a random integer modulo q, and computes commitment = g^w mod p. - Verifier provides challenge modulo q. - Prover computes response = w + x*challenge mod q, where x is the secret key. - """ - w = Utils.random_mpz_lt(self.pk.q) - commitment = pow(self.pk.g, w, self.pk.p) - challenge = challenge_generator(commitment) % self.pk.q - response = (w + (self.x * challenge)) % self.pk.q - - return DLogProof(commitment, challenge, response) + """ + Generate a PoK of the secret key + Prover generates w, a random integer modulo q, and computes commitment = g^w mod p. + Verifier provides challenge modulo q. + Prover computes response = w + x*challenge mod q, where x is the secret key. + """ + w = random.mpz_lt(self.pk.q) + commitment = pow(self.pk.g, w, self.pk.p) + challenge = challenge_generator(commitment) % self.pk.q + response = (w + (self.x * challenge)) % self.pk.q + return DLogProof(commitment, challenge, response) @classmethod def from_dict(cls, d): if not d: - return None + return None sk = cls() sk.x = int(d['x']) if d.has_key('public_key'): - sk.pk = EGPublicKey.from_dict(d['public_key']) + sk.pk = EGPublicKey.from_dict(d['public_key']) else: - sk.pk = None + sk.pk = None return sk fromJSONDict = from_dict + class EGPlaintext: - def __init__(self, m = None, pk = None): + def __init__(self, m=None, pk=None): self.m = m self.pk = pk def to_dict(self): - return {'m' : self.m} + return {'m': self.m} @classmethod def from_dict(cls, d): @@ -424,17 +329,17 @@ class EGCiphertext: self.alpha = alpha self.beta = beta - def __mul__(self,other): + def __mul__(self, other): """ Homomorphic Multiplication of ciphertexts. """ if type(other) == int and (other == 0 or other == 1): - return self + return self if self.pk != other.pk: - logging.info(self.pk) - logging.info(other.pk) - raise Exception('different PKs!') + logging.info(self.pk) + logging.info(other.pk) + raise Exception('different PKs!') new = EGCiphertext() @@ -460,7 +365,7 @@ class EGCiphertext: """ Reencryption with fresh randomness, which is returned. """ - r = Utils.random_mpz_lt(self.pk.q) + r = random.mpz_lt(self.pk.q) new_c = self.reenc_with_r(r) return [new_c, r] @@ -471,189 +376,195 @@ class EGCiphertext: return self.reenc_return_r()[0] def __eq__(self, other): - """ - Check for ciphertext equality. - """ - if other == None: - return False + """ + Check for ciphertext equality. + """ + if other is None: + return False - return (self.alpha == other.alpha and self.beta == other.beta) + return self.alpha == other.alpha and self.beta == other.beta def generate_encryption_proof(self, plaintext, randomness, challenge_generator): - """ - Generate the disjunctive encryption proof of encryption - """ - # random W - w = Utils.random_mpz_lt(self.pk.q) + """ + Generate the disjunctive encryption proof of encryption + """ + # random W + w = random.mpz_lt(self.pk.q) - # build the proof - proof = EGZKProof() + # build the proof + proof = EGZKProof() - # compute A=g^w, B=y^w - proof.commitment['A'] = pow(self.pk.g, w, self.pk.p) - proof.commitment['B'] = pow(self.pk.y, w, self.pk.p) + # compute A=g^w, B=y^w + proof.commitment['A'] = pow(self.pk.g, w, self.pk.p) + proof.commitment['B'] = pow(self.pk.y, w, self.pk.p) - # generate challenge - proof.challenge = challenge_generator(proof.commitment); + # generate challenge + proof.challenge = challenge_generator(proof.commitment) - # Compute response = w + randomness * challenge - proof.response = (w + (randomness * proof.challenge)) % self.pk.q; + # Compute response = w + randomness * challenge + proof.response = (w + (randomness * proof.challenge)) % self.pk.q - return proof; + return proof def simulate_encryption_proof(self, plaintext, challenge=None): - # generate a random challenge if not provided - if not challenge: - challenge = Utils.random_mpz_lt(self.pk.q) + # generate a random challenge if not provided + if not challenge: + challenge = random.mpz_lt(self.pk.q) - proof = EGZKProof() - proof.challenge = challenge + proof = EGZKProof() + proof.challenge = challenge - # compute beta/plaintext, the completion of the DH tuple - beta_over_plaintext = (self.beta * Utils.inverse(plaintext.m, self.pk.p)) % self.pk.p + # compute beta/plaintext, the completion of the DH tuple + beta_over_plaintext = (self.beta * number.inverse(plaintext.m, self.pk.p)) % self.pk.p - # random response, does not even need to depend on the challenge - proof.response = Utils.random_mpz_lt(self.pk.q); + # random response, does not even need to depend on the challenge + proof.response = random.mpz_lt(self.pk.q) - # now we compute A and B - proof.commitment['A'] = (Utils.inverse(pow(self.alpha, proof.challenge, self.pk.p), self.pk.p) * pow(self.pk.g, proof.response, self.pk.p)) % self.pk.p - proof.commitment['B'] = (Utils.inverse(pow(beta_over_plaintext, proof.challenge, self.pk.p), self.pk.p) * pow(self.pk.y, proof.response, self.pk.p)) % self.pk.p + # now we compute A and B + proof.commitment['A'] = (number.inverse(pow(self.alpha, proof.challenge, self.pk.p), self.pk.p) + * pow(self.pk.g, proof.response, self.pk.p) + ) % self.pk.p + proof.commitment['B'] = (number.inverse(pow(beta_over_plaintext, proof.challenge, self.pk.p), self.pk.p) * pow( + self.pk.y, proof.response, self.pk.p)) % self.pk.p - return proof + return proof def generate_disjunctive_encryption_proof(self, plaintexts, real_index, randomness, challenge_generator): - # note how the interface is as such so that the result does not reveal which is the real proof. + # note how the interface is as such so that the result does not reveal which is the real proof. - proofs = [None for p in plaintexts] + proofs = [None for _ in plaintexts] - # go through all plaintexts and simulate the ones that must be simulated. - for p_num in range(len(plaintexts)): - if p_num != real_index: - proofs[p_num] = self.simulate_encryption_proof(plaintexts[p_num]) + # go through all plaintexts and simulate the ones that must be simulated. + for p_num in range(len(plaintexts)): + if p_num != real_index: + proofs[p_num] = self.simulate_encryption_proof(plaintexts[p_num]) - # the function that generates the challenge - def real_challenge_generator(commitment): - # set up the partial real proof so we're ready to get the hash - proofs[real_index] = EGZKProof() - proofs[real_index].commitment = commitment + # the function that generates the challenge + def real_challenge_generator(commitment): + # set up the partial real proof so we're ready to get the hash + proofs[real_index] = EGZKProof() + proofs[real_index].commitment = commitment - # get the commitments in a list and generate the whole disjunctive challenge - commitments = [p.commitment for p in proofs] - disjunctive_challenge = challenge_generator(commitments); + # get the commitments in a list and generate the whole disjunctive challenge + commitments = [p.commitment for p in proofs] + disjunctive_challenge = challenge_generator(commitments) - # now we must subtract all of the other challenges from this challenge. - real_challenge = disjunctive_challenge - for p_num in range(len(proofs)): - if p_num != real_index: - real_challenge = real_challenge - proofs[p_num].challenge + # now we must subtract all of the other challenges from this challenge. + real_challenge = disjunctive_challenge + for p_num in range(len(proofs)): + if p_num != real_index: + real_challenge = real_challenge - proofs[p_num].challenge - # make sure we mod q, the exponent modulus - return real_challenge % self.pk.q + # make sure we mod q, the exponent modulus + return real_challenge % self.pk.q - # do the real proof - real_proof = self.generate_encryption_proof(plaintexts[real_index], randomness, real_challenge_generator) + # do the real proof + real_proof = self.generate_encryption_proof(plaintexts[real_index], randomness, real_challenge_generator) - # set the real proof - proofs[real_index] = real_proof + # set the real proof + proofs[real_index] = real_proof - return EGZKDisjunctiveProof(proofs) + return EGZKDisjunctiveProof(proofs) def verify_encryption_proof(self, plaintext, proof): - """ - Checks for the DDH tuple g, y, alpha, beta/plaintext. - (PoK of randomness r.) - - Proof contains commitment = {A, B}, challenge, response - """ - # check that A, B are in the correct group - if not (pow(proof.commitment['A'],self.pk.q,self.pk.p)==1 and pow(proof.commitment['B'],self.pk.q,self.pk.p)==1): - return False + """ + Checks for the DDH tuple g, y, alpha, beta/plaintext. + (PoK of randomness r.) - # check that g^response = A * alpha^challenge - first_check = (pow(self.pk.g, proof.response, self.pk.p) == ((pow(self.alpha, proof.challenge, self.pk.p) * proof.commitment['A']) % self.pk.p)) + Proof contains commitment = {A, B}, challenge, response + """ + # check that A, B are in the correct group + if not (pow(proof.commitment['A'], self.pk.q, self.pk.p) == 1 and pow(proof.commitment['B'], self.pk.q, + self.pk.p) == 1): + return False + + # check that g^response = A * alpha^challenge + first_check = (pow(self.pk.g, proof.response, self.pk.p) == ( + (pow(self.alpha, proof.challenge, self.pk.p) * proof.commitment['A']) % self.pk.p)) - # check that y^response = B * (beta/m)^challenge - beta_over_m = (self.beta * Utils.inverse(plaintext.m, self.pk.p)) % self.pk.p - second_check = (pow(self.pk.y, proof.response, self.pk.p) == ((pow(beta_over_m, proof.challenge, self.pk.p) * proof.commitment['B']) % self.pk.p)) + # check that y^response = B * (beta/m)^challenge + beta_over_m = (self.beta * number.inverse(plaintext.m, self.pk.p)) % self.pk.p + second_check = (pow(self.pk.y, proof.response, self.pk.p) == ( + (pow(beta_over_m, proof.challenge, self.pk.p) * proof.commitment['B']) % self.pk.p)) - # print "1,2: %s %s " % (first_check, second_check) - return (first_check and second_check) + # print "1,2: %s %s " % (first_check, second_check) + return first_check and second_check def verify_disjunctive_encryption_proof(self, plaintexts, proof, challenge_generator): - """ - plaintexts and proofs are all lists of equal length, with matching. + """ + plaintexts and proofs are all lists of equal length, with matching. - overall_challenge is what all of the challenges combined should yield. - """ - if len(plaintexts) != len(proof.proofs): - print("bad number of proofs (expected %s, found %s)" % (len(plaintexts), len(proof.proofs))) - return False + overall_challenge is what all of the challenges combined should yield. + """ + if len(plaintexts) != len(proof.proofs): + print("bad number of proofs (expected %s, found %s)" % (len(plaintexts), len(proof.proofs))) + return False - for i in range(len(plaintexts)): - # if a proof fails, stop right there - if not self.verify_encryption_proof(plaintexts[i], proof.proofs[i]): - print "bad proof %s, %s, %s" % (i, plaintexts[i], proof.proofs[i]) - return False + for i in range(len(plaintexts)): + # if a proof fails, stop right there + if not self.verify_encryption_proof(plaintexts[i], proof.proofs[i]): + print("bad proof %s, %s, %s" % (i, plaintexts[i], proof.proofs[i])) + return False - # logging.info("made it past the two encryption proofs") + # logging.info("made it past the two encryption proofs") - # check the overall challenge - return (challenge_generator([p.commitment for p in proof.proofs]) == (sum([p.challenge for p in proof.proofs]) % self.pk.q)) + # check the overall challenge + return (challenge_generator([p.commitment for p in proof.proofs]) == ( + sum([p.challenge for p in proof.proofs]) % self.pk.q)) def verify_decryption_proof(self, plaintext, proof): - """ - Checks for the DDH tuple g, alpha, y, beta/plaintext - (PoK of secret key x.) - """ - return False + """ + Checks for the DDH tuple g, alpha, y, beta/plaintext + (PoK of secret key x.) + """ + return False def verify_decryption_factor(self, dec_factor, dec_proof, public_key): - """ - when a ciphertext is decrypted by a dec factor, the proof needs to be checked - """ - pass + """ + when a ciphertext is decrypted by a dec factor, the proof needs to be checked + """ + pass def decrypt(self, decryption_factors, public_key): - """ - decrypt a ciphertext given a list of decryption factors (from multiple trustees) - For now, no support for threshold - """ - running_decryption = self.beta - for dec_factor in decryption_factors: - running_decryption = (running_decryption * Utils.inverse(dec_factor, public_key.p)) % public_key.p + """ + decrypt a ciphertext given a list of decryption factors (from multiple trustees) + For now, no support for threshold + """ + running_decryption = self.beta + for dec_factor in decryption_factors: + running_decryption = (running_decryption * number.inverse(dec_factor, public_key.p)) % public_key.p - return running_decryption + return running_decryption def check_group_membership(self, pk): - """ - checks to see if an ElGamal element belongs to the group in the pk - """ - if not (1 < self.alpha < pk.p-1): - return False - - elif not (1 < self.beta < pk.p-1): - return False + """ + checks to see if an ElGamal element belongs to the group in the pk + """ + if not (1 < self.alpha < pk.p - 1): + return False - elif (pow(self.alpha, pk.q, pk.p)!=1): - return False + elif not (1 < self.beta < pk.p - 1): + return False - elif (pow(self.beta, pk.q, pk.p)!=1): - return False + elif pow(self.alpha, pk.q, pk.p) != 1: + return False - else: - return True + elif pow(self.beta, pk.q, pk.p) != 1: + return False + else: + return True def to_dict(self): return {'alpha': str(self.alpha), 'beta': str(self.beta)} - toJSONDict= to_dict + toJSONDict = to_dict def to_string(self): return "%s,%s" % (self.alpha, self.beta) @classmethod - def from_dict(cls, d, pk = None): + def from_dict(cls, d, pk=None): result = cls() result.alpha = int(d['alpha']) result.beta = int(d['beta']) @@ -668,127 +579,134 @@ class EGCiphertext: expects alpha,beta """ split = str.split(",") - return cls.from_dict({'alpha' : split[0], 'beta' : split[1]}) + return cls.from_dict({'alpha': split[0], 'beta': split[1]}) + class EGZKProof(object): - def __init__(self): - self.commitment = {'A':None, 'B':None} - self.challenge = None - self.response = None + def __init__(self): + self.commitment = {'A': None, 'B': None} + self.challenge = None + self.response = None + + @classmethod + def generate(cls, little_g, little_h, x, p, q, challenge_generator): + """ + generate a DDH tuple proof, where challenge generator is + almost certainly EG_fiatshamir_challenge_generator + """ - @classmethod - def generate(cls, little_g, little_h, x, p, q, challenge_generator): - """ - generate a DDH tuple proof, where challenge generator is - almost certainly EG_fiatshamir_challenge_generator - """ + # generate random w + w = random.mpz_lt(q) - # generate random w - w = Utils.random_mpz_lt(q) + # create proof instance + proof = cls() - # create proof instance - proof = cls() + # compute A = little_g^w, B=little_h^w + proof.commitment['A'] = pow(little_g, w, p) + proof.commitment['B'] = pow(little_h, w, p) - # compute A = little_g^w, B=little_h^w - proof.commitment['A'] = pow(little_g, w, p) - proof.commitment['B'] = pow(little_h, w, p) + # get challenge + proof.challenge = challenge_generator(proof.commitment) - # get challenge - proof.challenge = challenge_generator(proof.commitment) + # compute response + proof.response = (w + (x * proof.challenge)) % q - # compute response - proof.response = (w + (x * proof.challenge)) % q + # return proof + return proof - # return proof - return proof + @classmethod + def from_dict(cls, d): + p = cls() + p.commitment = {'A': int(d['commitment']['A']), 'B': int(d['commitment']['B'])} + p.challenge = int(d['challenge']) + p.response = int(d['response']) + return p - @classmethod - def from_dict(cls, d): - p = cls() - p.commitment = {'A': int(d['commitment']['A']), 'B': int(d['commitment']['B'])} - p.challenge = int(d['challenge']) - p.response = int(d['response']) - return p + fromJSONDict = from_dict - fromJSONDict = from_dict + def to_dict(self): + return { + 'commitment': {'A': str(self.commitment['A']), 'B': str(self.commitment['B'])}, + 'challenge': str(self.challenge), + 'response': str(self.response) + } - def to_dict(self): - return { - 'commitment' : {'A' : str(self.commitment['A']), 'B' : str(self.commitment['B'])}, - 'challenge': str(self.challenge), - 'response': str(self.response) - } + toJSONDict = to_dict - def verify(self, little_g, little_h, big_g, big_h, p, q, challenge_generator=None): - """ - Verify a DH tuple proof - """ - # check that A, B are in the correct group - if not (pow(proof.commitment['A'],self.pk.q,self.pk.p)==1 and pow(proof.commitment['B'],self.pk.q,self.pk.p)==1): - return False + def verify(self, little_g, little_h, big_g, big_h, p, q, challenge_generator=None): + """ + Verify a DH tuple proof + """ + # check that A, B are in the correct group + if not (pow(self.commitment['A'], self.pk.q, self.pk.p) == 1 + and pow(self.commitment['B'], self.pk.q, self.pk.p) == 1): + return False - # check that little_g^response = A * big_g^challenge - first_check = (pow(little_g, self.response, p) == ((pow(big_g, self.challenge, p) * self.commitment['A']) % p)) + # check that little_g^response = A * big_g^challenge + first_check = (pow(little_g, self.response, p) == ((pow(big_g, self.challenge, p) * self.commitment['A']) % p)) - # check that little_h^response = B * big_h^challenge - second_check = (pow(little_h, self.response, p) == ((pow(big_h, self.challenge, p) * self.commitment['B']) % p)) + # check that little_h^response = B * big_h^challenge + second_check = (pow(little_h, self.response, p) == ((pow(big_h, self.challenge, p) * self.commitment['B']) % p)) - # check the challenge? - third_check = True + # check the challenge? + third_check = True - if challenge_generator: - third_check = (self.challenge == challenge_generator(self.commitment)) + if challenge_generator: + third_check = (self.challenge == challenge_generator(self.commitment)) - return (first_check and second_check and third_check) + return first_check and second_check and third_check - toJSONDict = to_dict class EGZKDisjunctiveProof: - def __init__(self, proofs = None): - self.proofs = proofs + def __init__(self, proofs=None): + self.proofs = proofs - @classmethod - def from_dict(cls, d): - dp = cls() - dp.proofs = [EGZKProof.from_dict(p) for p in d] - return dp + @classmethod + def from_dict(cls, d): + dp = cls() + dp.proofs = [EGZKProof.from_dict(p) for p in d] + return dp - def to_dict(self): - return [p.to_dict() for p in self.proofs] + def to_dict(self): + return [p.to_dict() for p in self.proofs] + + toJSONDict = to_dict - toJSONDict = to_dict class DLogProof(object): - def __init__(self, commitment, challenge, response): - self.commitment = commitment - self.challenge = challenge - self.response = response + def __init__(self, commitment, challenge, response): + self.commitment = commitment + self.challenge = challenge + self.response = response - def to_dict(self): - return {'challenge': str(self.challenge), 'commitment': str(self.commitment), 'response' : str(self.response)} + def to_dict(self): + return {'challenge': str(self.challenge), 'commitment': str(self.commitment), 'response': str(self.response)} - toJSONDict = to_dict + toJSONDict = to_dict - @classmethod - def from_dict(cls, d): - dlp = cls(int(d['commitment']), int(d['challenge']), int(d['response'])) - return dlp + @classmethod + def from_dict(cls, d): + dlp = cls(int(d['commitment']), int(d['challenge']), int(d['response'])) + return dlp + + fromJSONDict = from_dict - fromJSONDict = from_dict def EG_disjunctive_challenge_generator(commitments): - array_to_hash = [] - for commitment in commitments: - array_to_hash.append(str(commitment['A'])) - array_to_hash.append(str(commitment['B'])) + array_to_hash = [] + for commitment in commitments: + array_to_hash.append(str(commitment['A'])) + array_to_hash.append(str(commitment['B'])) + + string_to_hash = ",".join(array_to_hash) + return int(hashlib.sha1(string_to_hash).hexdigest(), 16) - string_to_hash = ",".join(array_to_hash) - return int(hashlib.sha1(string_to_hash).hexdigest(),16) # a challenge generator for Fiat-Shamir with A,B commitment def EG_fiatshamir_challenge_generator(commitment): - return EG_disjunctive_challenge_generator([commitment]) + return EG_disjunctive_challenge_generator([commitment]) + def DLog_challenge_generator(commitment): - string_to_hash = str(commitment) - return int(hashlib.sha1(string_to_hash).hexdigest(),16) + string_to_hash = str(commitment) + return int(hashlib.sha1(string_to_hash).hexdigest(), 16) diff --git a/helios/crypto/electionalgs.py b/helios/crypto/electionalgs.py index c677140432abd01bb6638ce09977de247c0cc5aa..b9a52753e75fa78ddbda47dfa53c83540bbc1fe1 100644 --- a/helios/crypto/electionalgs.py +++ b/helios/crypto/electionalgs.py @@ -4,778 +4,794 @@ Election-specific algorithms for Helios Ben Adida 2008-08-30 """ +import datetime +import uuid import algs -import logging import utils -import uuid -import datetime + class HeliosObject(object): - """ - A base class to ease serialization and de-serialization - crypto objects are kept as full-blown crypto objects, serialized to jsonobjects on the way out - and deserialized from jsonobjects on the way in - """ - FIELDS = [] - JSON_FIELDS = None - - def __init__(self, **kwargs): - self.set_from_args(**kwargs) - - # generate uuid if need be - if 'uuid' in self.FIELDS and (not hasattr(self, 'uuid') or self.uuid == None): - self.uuid = str(uuid.uuid4()) - - def set_from_args(self, **kwargs): - for f in self.FIELDS: - if kwargs.has_key(f): - new_val = self.process_value_in(f, kwargs[f]) - setattr(self, f, new_val) - else: - setattr(self, f, None) - - def set_from_other_object(self, o): - for f in self.FIELDS: - if hasattr(o, f): - setattr(self, f, self.process_value_in(f, getattr(o,f))) - else: - setattr(self, f, None) - - def toJSON(self): - return utils.to_json(self.toJSONDict()) - - def toJSONDict(self, alternate_fields=None): - val = {} - for f in (alternate_fields or self.JSON_FIELDS or self.FIELDS): - val[f] = self.process_value_out(f, getattr(self, f)) - return val - - @classmethod - def fromJSONDict(cls, d): - # go through the keys and fix them - new_d = {} - for k in d.keys(): - new_d[str(k)] = d[k] - - return cls(**new_d) - - @classmethod - def fromOtherObject(cls, o): - obj = cls() - obj.set_from_other_object(o) - return obj - - def toOtherObject(self, o): - for f in self.FIELDS: - # FIXME: why isn't this working? - if hasattr(o, f): - # BIG HAMMER - try: - setattr(o, f, self.process_value_out(f, getattr(self,f))) - except: - pass - - @property - def hash(self): - s = utils.to_json(self.toJSONDict()) - return utils.hash_b64(s) - - def process_value_in(self, field_name, field_value): """ - process some fields on the way into the object + A base class to ease serialization and de-serialization + crypto objects are kept as full-blown crypto objects, serialized to jsonobjects on the way out + and deserialized from jsonobjects on the way in """ - if field_value == None: - return None + FIELDS = [] + JSON_FIELDS = None + + def __init__(self, **kwargs): + self.set_from_args(**kwargs) + + # generate uuid if need be + if 'uuid' in self.FIELDS and (not hasattr(self, 'uuid') or self.uuid is None): + self.uuid = str(uuid.uuid4()) + + def set_from_args(self, **kwargs): + for f in self.FIELDS: + if kwargs.has_key(f): + new_val = self.process_value_in(f, kwargs[f]) + setattr(self, f, new_val) + else: + setattr(self, f, None) + + def set_from_other_object(self, o): + for f in self.FIELDS: + if hasattr(o, f): + setattr(self, f, self.process_value_in(f, getattr(o, f))) + else: + setattr(self, f, None) + + def toJSON(self): + return utils.to_json(self.toJSONDict()) + + def toJSONDict(self, alternate_fields=None): + val = {} + for f in (alternate_fields or self.JSON_FIELDS or self.FIELDS): + val[f] = self.process_value_out(f, getattr(self, f)) + return val + + @classmethod + def fromJSONDict(cls, d): + # go through the keys and fix them + new_d = {} + for k in d.keys(): + new_d[str(k)] = d[k] + + return cls(**new_d) + + @classmethod + def fromOtherObject(cls, o): + obj = cls() + obj.set_from_other_object(o) + return obj + + def toOtherObject(self, o): + for f in self.FIELDS: + # FIXME: why isn't this working? + if hasattr(o, f): + # BIG HAMMER + try: + setattr(o, f, self.process_value_out(f, getattr(self, f))) + except: + pass + + @property + def hash(self): + s = utils.to_json(self.toJSONDict()) + return utils.hash_b64(s) + + def process_value_in(self, field_name, field_value): + """ + process some fields on the way into the object + """ + if field_value is None: + return None + + val = self._process_value_in(field_name, field_value) + if val is not None: + return val + else: + return field_value + + def _process_value_in(self, field_name, field_value): + return None + + def process_value_out(self, field_name, field_value): + """ + process some fields on the way out of the object + """ + if field_value is None: + return None + + val = self._process_value_out(field_name, field_value) + if val is not None: + return val + else: + return field_value + + def _process_value_out(self, field_name, field_value): + return None + + def __eq__(self, other): + if not hasattr(self, 'uuid'): + return super(HeliosObject, self) == other + + return other is not None and self.uuid == other.uuid - val = self._process_value_in(field_name, field_value) - if val != None: - return val - else: - return field_value - def _process_value_in(self, field_name, field_value): - return None - - def process_value_out(self, field_name, field_value): +class EncryptedAnswer(HeliosObject): """ - process some fields on the way out of the object + An encrypted answer to a single election question """ - if field_value == None: - return None - val = self._process_value_out(field_name, field_value) - if val != None: - return val - else: - return field_value + FIELDS = ['choices', 'individual_proofs', 'overall_proof', 'randomness', 'answer'] - def _process_value_out(self, field_name, field_value): - return None + # FIXME: remove this constructor and use only named-var constructor from HeliosObject + def __init__(self, choices=None, individual_proofs=None, overall_proof=None, randomness=None, answer=None): + self.choices = choices + self.individual_proofs = individual_proofs + self.overall_proof = overall_proof + self.randomness = randomness + self.answer = answer - def __eq__(self, other): - if not hasattr(self, 'uuid'): - return super(HeliosObject,self) == other + @classmethod + def generate_plaintexts(cls, pk, min=0, max=1): + plaintexts = [] + running_product = 1 - return other != None and self.uuid == other.uuid + # run the product up to the min + for i in range(max + 1): + # if we're in the range, add it to the array + if i >= min: + plaintexts.append(algs.EGPlaintext(running_product, pk)) -class EncryptedAnswer(HeliosObject): - """ - An encrypted answer to a single election question - """ + # next value in running product + running_product = (running_product * pk.g) % pk.p - FIELDS = ['choices', 'individual_proofs', 'overall_proof', 'randomness', 'answer'] + return plaintexts - # FIXME: remove this constructor and use only named-var constructor from HeliosObject - def __init__(self, choices=None, individual_proofs=None, overall_proof=None, randomness=None, answer=None): - self.choices = choices - self.individual_proofs = individual_proofs - self.overall_proof = overall_proof - self.randomness = randomness - self.answer = answer + def verify_plaintexts_and_randomness(self, pk): + """ + this applies only if the explicit answers and randomness factors are given + we do not verify the proofs here, that is the verify() method + """ + if not hasattr(self, 'answer'): + return False - @classmethod - def generate_plaintexts(cls, pk, min=0, max=1): - plaintexts = [] - running_product = 1 + for choice_num in range(len(self.choices)): + choice = self.choices[choice_num] + choice.pk = pk - # run the product up to the min - for i in range(max+1): - # if we're in the range, add it to the array - if i >= min: - plaintexts.append(algs.EGPlaintext(running_product, pk)) + # redo the encryption + # WORK HERE (paste from below encryption) + + return False - # next value in running product - running_product = (running_product * pk.g) % pk.p + def verify(self, pk, min=0, max=1): + possible_plaintexts = self.generate_plaintexts(pk) + homomorphic_sum = 0 + + for choice_num in range(len(self.choices)): + choice = self.choices[choice_num] + choice.pk = pk + individual_proof = self.individual_proofs[choice_num] + + # verify that elements belong to the proper group + if not choice.check_group_membership(pk): + return False + + # verify the proof on the encryption of that choice + if not choice.verify_disjunctive_encryption_proof(possible_plaintexts, individual_proof, + algs.EG_disjunctive_challenge_generator): + return False + + # compute homomorphic sum if needed + if max is not None: + homomorphic_sum = choice * homomorphic_sum + + if max is not None: + # determine possible plaintexts for the sum + sum_possible_plaintexts = self.generate_plaintexts(pk, min=min, max=max) + + # verify the sum + return homomorphic_sum.verify_disjunctive_encryption_proof(sum_possible_plaintexts, self.overall_proof, + algs.EG_disjunctive_challenge_generator) + else: + # approval voting, no need for overall proof verification + return True + + def toJSONDict(self, with_randomness=False): + value = { + 'choices': [c.to_dict() for c in self.choices], + 'individual_proofs': [p.to_dict() for p in self.individual_proofs] + } + + if self.overall_proof: + value['overall_proof'] = self.overall_proof.to_dict() + else: + value['overall_proof'] = None + + if with_randomness: + value['randomness'] = [str(r) for r in self.randomness] + value['answer'] = self.answer + + return value + + @classmethod + def fromJSONDict(cls, d, pk=None): + ea = cls() + + ea.choices = [algs.EGCiphertext.from_dict(c, pk) for c in d['choices']] + ea.individual_proofs = [algs.EGZKDisjunctiveProof.from_dict(p) for p in d['individual_proofs']] + + if d['overall_proof']: + ea.overall_proof = algs.EGZKDisjunctiveProof.from_dict(d['overall_proof']) + else: + ea.overall_proof = None + + if d.has_key('randomness'): + ea.randomness = [int(r) for r in d['randomness']] + ea.answer = d['answer'] + + return ea + + @classmethod + def fromElectionAndAnswer(cls, election, question_num, answer_indexes): + """ + Given an election, a question number, and a list of answers to that question + in the form of an array of 0-based indexes into the answer array, + produce an EncryptedAnswer that works. + """ + question = election.questions[question_num] + answers = question['answers'] + pk = election.public_key + + # initialize choices, individual proofs, randomness and overall proof + choices = [None for _ in range(len(answers))] + individual_proofs = [None for _ in range(len(answers))] + randomness = [None for _ in range(len(answers))] + + # possible plaintexts [0, 1] + plaintexts = cls.generate_plaintexts(pk) + + # keep track of number of options selected. + num_selected_answers = 0 + + # homomorphic sum of all + homomorphic_sum = 0 + randomness_sum = 0 + + # min and max for number of answers, useful later + min_answers = 0 + if question.has_key('min'): + min_answers = question['min'] + max_answers = question['max'] + + # go through each possible answer and encrypt either a g^0 or a g^1. + for answer_num in range(len(answers)): + plaintext_index = 0 + + # assuming a list of answers + if answer_num in answer_indexes: + plaintext_index = 1 + num_selected_answers += 1 + + # randomness and encryption + randomness[answer_num] = utils.random.mpz_lt(pk.q) + choices[answer_num] = pk.encrypt_with_r(plaintexts[plaintext_index], randomness[answer_num]) + + # generate proof + individual_proofs[answer_num] = choices[answer_num].generate_disjunctive_encryption_proof(plaintexts, + plaintext_index, + randomness[ + answer_num], + algs.EG_disjunctive_challenge_generator) + + # sum things up homomorphically if needed + if max_answers is not None: + homomorphic_sum = choices[answer_num] * homomorphic_sum + randomness_sum = (randomness_sum + randomness[answer_num]) % pk.q + + # prove that the sum is 0 or 1 (can be "blank vote" for this answer) + # num_selected_answers is 0 or 1, which is the index into the plaintext that is actually encoded + + if num_selected_answers < min_answers: + raise Exception("Need to select at least %s answer(s)" % min_answers) + + if max_answers is not None: + sum_plaintexts = cls.generate_plaintexts(pk, min=min_answers, max=max_answers) + + # need to subtract the min from the offset + overall_proof = homomorphic_sum.generate_disjunctive_encryption_proof(sum_plaintexts, + num_selected_answers - min_answers, + randomness_sum, + algs.EG_disjunctive_challenge_generator); + else: + # approval voting + overall_proof = None + + return cls(choices, individual_proofs, overall_proof, randomness, answer_indexes) - return plaintexts - def verify_plaintexts_and_randomness(self, pk): +class EncryptedVote(HeliosObject): """ - this applies only if the explicit answers and randomness factors are given - we do not verify the proofs here, that is the verify() method + An encrypted ballot """ - if not hasattr(self, 'answer'): - return False - - for choice_num in range(len(self.choices)): - choice = self.choices[choice_num] - choice.pk = pk - - # redo the encryption - # WORK HERE (paste from below encryption) - - return False - - def verify(self, pk, min=0, max=1): - possible_plaintexts = self.generate_plaintexts(pk) - homomorphic_sum = 0 + FIELDS = ['encrypted_answers', 'election_hash', 'election_uuid'] - for choice_num in range(len(self.choices)): - choice = self.choices[choice_num] - choice.pk = pk - individual_proof = self.individual_proofs[choice_num] + def verify(self, election): + # right number of answers + if len(self.encrypted_answers) != len(election.questions): + return False - # verify that elements belong to the proper group - if not choice.check_group_membership(pk): - return False + # check hash + if self.election_hash != election.hash: + # print "%s / %s " % (self.election_hash, election.hash) + return False - # verify the proof on the encryption of that choice - if not choice.verify_disjunctive_encryption_proof(possible_plaintexts, individual_proof, algs.EG_disjunctive_challenge_generator): - return False + # check ID + if self.election_uuid != election.uuid: + return False - # compute homomorphic sum if needed - if max != None: - homomorphic_sum = choice * homomorphic_sum + # check proofs on all of answers + for question_num in range(len(election.questions)): + ea = self.encrypted_answers[question_num] - if max != None: - # determine possible plaintexts for the sum - sum_possible_plaintexts = self.generate_plaintexts(pk, min=min, max=max) + question = election.questions[question_num] + min_answers = 0 + if question.has_key('min'): + min_answers = question['min'] - # verify the sum - return homomorphic_sum.verify_disjunctive_encryption_proof(sum_possible_plaintexts, self.overall_proof, algs.EG_disjunctive_challenge_generator) - else: - # approval voting, no need for overall proof verification - return True + if not ea.verify(election.public_key, min=min_answers, max=question['max']): + return False - def toJSONDict(self, with_randomness=False): - value = { - 'choices': [c.to_dict() for c in self.choices], - 'individual_proofs' : [p.to_dict() for p in self.individual_proofs] - } + return True - if self.overall_proof: - value['overall_proof'] = self.overall_proof.to_dict() - else: - value['overall_proof'] = None + def get_hash(self): + return utils.hash_b64(utils.to_json(self.toJSONDict())) - if with_randomness: - value['randomness'] = [str(r) for r in self.randomness] - value['answer'] = self.answer + def toJSONDict(self, with_randomness=False): + return { + 'answers': [a.toJSONDict(with_randomness) for a in self.encrypted_answers], + 'election_hash': self.election_hash, + 'election_uuid': self.election_uuid + } - return value + @classmethod + def fromJSONDict(cls, d, pk=None): + ev = cls() - @classmethod - def fromJSONDict(cls, d, pk=None): - ea = cls() + ev.encrypted_answers = [EncryptedAnswer.fromJSONDict(ea, pk) for ea in d['answers']] + ev.election_hash = d['election_hash'] + ev.election_uuid = d['election_uuid'] - ea.choices = [algs.EGCiphertext.from_dict(c, pk) for c in d['choices']] - ea.individual_proofs = [algs.EGZKDisjunctiveProof.from_dict(p) for p in d['individual_proofs']] + return ev - if d['overall_proof']: - ea.overall_proof = algs.EGZKDisjunctiveProof.from_dict(d['overall_proof']) - else: - ea.overall_proof = None + @classmethod + def fromElectionAndAnswers(cls, election, answers): + pk = election.public_key - if d.has_key('randomness'): - ea.randomness = [int(r) for r in d['randomness']] - ea.answer = d['answer'] + # each answer is an index into the answer array + encrypted_answers = [EncryptedAnswer.fromElectionAndAnswer(election, answer_num, answers[answer_num]) for + answer_num in range(len(answers))] + return cls(encrypted_answers=encrypted_answers, election_hash=election.hash, election_uuid=election.uuid) - return ea - @classmethod - def fromElectionAndAnswer(cls, election, question_num, answer_indexes): +def one_question_winner(question, result, num_cast_votes): """ - Given an election, a question number, and a list of answers to that question - in the form of an array of 0-based indexes into the answer array, - produce an EncryptedAnswer that works. + determining the winner for one question """ - question = election.questions[question_num] - answers = question['answers'] - pk = election.public_key - - # initialize choices, individual proofs, randomness and overall proof - choices = [None for a in range(len(answers))] - individual_proofs = [None for a in range(len(answers))] - overall_proof = None - randomness = [None for a in range(len(answers))] - - # possible plaintexts [0, 1] - plaintexts = cls.generate_plaintexts(pk) + # sort the answers , keep track of the index + counts = sorted(enumerate(result), key=lambda (x): x[1]) + counts.reverse() - # keep track of number of options selected. - num_selected_answers = 0; + # if there's a max > 1, we assume that the top MAX win + if question['max'] > 1: + return [c[0] for c in counts[:question['max']]] - # homomorphic sum of all - homomorphic_sum = 0 - randomness_sum = 0 + # if max = 1, then depends on absolute or relative + if question['result_type'] == 'absolute': + if counts[0][1] >= (num_cast_votes / 2 + 1): + return [counts[0][0]] + else: + return [] - # min and max for number of answers, useful later - min_answers = 0 - if question.has_key('min'): - min_answers = question['min'] - max_answers = question['max'] + if question['result_type'] == 'relative': + return [counts[0][0]] - # go through each possible answer and encrypt either a g^0 or a g^1. - for answer_num in range(len(answers)): - plaintext_index = 0 - # assuming a list of answers - if answer_num in answer_indexes: - plaintext_index = 1 - num_selected_answers += 1 - - # randomness and encryption - randomness[answer_num] = algs.Utils.random_mpz_lt(pk.q) - choices[answer_num] = pk.encrypt_with_r(plaintexts[plaintext_index], randomness[answer_num]) - - # generate proof - individual_proofs[answer_num] = choices[answer_num].generate_disjunctive_encryption_proof(plaintexts, plaintext_index, - randomness[answer_num], algs.EG_disjunctive_challenge_generator) - - # sum things up homomorphically if needed - if max_answers != None: - homomorphic_sum = choices[answer_num] * homomorphic_sum - randomness_sum = (randomness_sum + randomness[answer_num]) % pk.q - - # prove that the sum is 0 or 1 (can be "blank vote" for this answer) - # num_selected_answers is 0 or 1, which is the index into the plaintext that is actually encoded - - if num_selected_answers < min_answers: - raise Exception("Need to select at least %s answer(s)" % min_answers) - - if max_answers != None: - sum_plaintexts = cls.generate_plaintexts(pk, min=min_answers, max=max_answers) - - # need to subtract the min from the offset - overall_proof = homomorphic_sum.generate_disjunctive_encryption_proof(sum_plaintexts, num_selected_answers - min_answers, randomness_sum, algs.EG_disjunctive_challenge_generator); - else: - # approval voting - overall_proof = None - - return cls(choices, individual_proofs, overall_proof, randomness, answer_indexes) - -class EncryptedVote(HeliosObject): - """ - An encrypted ballot - """ - FIELDS = ['encrypted_answers', 'election_hash', 'election_uuid'] - - def verify(self, election): - # right number of answers - if len(self.encrypted_answers) != len(election.questions): - return False - - # check hash - if self.election_hash != election.hash: - # print "%s / %s " % (self.election_hash, election.hash) - return False - - # check ID - if self.election_uuid != election.uuid: - return False - - # check proofs on all of answers - for question_num in range(len(election.questions)): - ea = self.encrypted_answers[question_num] - - question = election.questions[question_num] - min_answers = 0 - if question.has_key('min'): - min_answers = question['min'] - - if not ea.verify(election.public_key, min=min_answers, max=question['max']): - return False +class Election(HeliosObject): + FIELDS = ['uuid', 'questions', 'name', 'short_name', 'description', 'voters_hash', 'openreg', + 'frozen_at', 'public_key', 'private_key', 'cast_url', 'result', 'result_proof', 'use_voter_aliases', + 'voting_starts_at', 'voting_ends_at', 'election_type'] - return True + JSON_FIELDS = ['uuid', 'questions', 'name', 'short_name', 'description', 'voters_hash', 'openreg', + 'frozen_at', 'public_key', 'cast_url', 'use_voter_aliases', 'voting_starts_at', 'voting_ends_at'] - def get_hash(self): - return utils.hash_b64(utils.to_json(self.toJSONDict())) + # need to add in v3.1: use_advanced_audit_features, election_type, and probably more - def toJSONDict(self, with_randomness=False): - return { - 'answers': [a.toJSONDict(with_randomness) for a in self.encrypted_answers], - 'election_hash': self.election_hash, - 'election_uuid': self.election_uuid - } + def init_tally(self): + return Tally(election=self) - @classmethod - def fromJSONDict(cls, d, pk=None): - ev = cls() + def _process_value_in(self, field_name, field_value): + if field_name == 'frozen_at' or field_name == 'voting_starts_at' or field_name == 'voting_ends_at': + if type(field_value) == str or type(field_value) == unicode: + return datetime.datetime.strptime(field_value, '%Y-%m-%d %H:%M:%S') - ev.encrypted_answers = [EncryptedAnswer.fromJSONDict(ea, pk) for ea in d['answers']] - ev.election_hash = d['election_hash'] - ev.election_uuid = d['election_uuid'] + if field_name == 'public_key': + return algs.EGPublicKey.fromJSONDict(field_value) - return ev + if field_name == 'private_key': + return algs.EGSecretKey.fromJSONDict(field_value) - @classmethod - def fromElectionAndAnswers(cls, election, answers): - pk = election.public_key + def _process_value_out(self, field_name, field_value): + # the date + if field_name == 'frozen_at' or field_name == 'voting_starts_at' or field_name == 'voting_ends_at': + return str(field_value) - # each answer is an index into the answer array - encrypted_answers = [EncryptedAnswer.fromElectionAndAnswer(election, answer_num, answers[answer_num]) for answer_num in range(len(answers))] - return cls(encrypted_answers=encrypted_answers, election_hash=election.hash, election_uuid = election.uuid) + if field_name == 'public_key' or field_name == 'private_key': + return field_value.toJSONDict() + @property + def registration_status_pretty(self): + if self.openreg: + return "Open" + else: + return "Closed" -def one_question_winner(question, result, num_cast_votes): - """ - determining the winner for one question - """ - # sort the answers , keep track of the index - counts = sorted(enumerate(result), key=lambda(x): x[1]) - counts.reverse() - - # if there's a max > 1, we assume that the top MAX win - if question['max'] > 1: - return [c[0] for c in counts[:question['max']]] - - # if max = 1, then depends on absolute or relative - if question['result_type'] == 'absolute': - if counts[0][1] >= (num_cast_votes/2 + 1): - return [counts[0][0]] - else: - return [] - - if question['result_type'] == 'relative': - return [counts[0][0]] - -class Election(HeliosObject): + @property + def winners(self): + """ + Depending on the type of each question, determine the winners + returns an array of winners for each question, aka an array of arrays. + assumes that if there is a max to the question, that's how many winners there are. + """ + return [one_question_winner(self.questions[i], self.result[i], self.num_cast_votes) for i in + range(len(self.questions))] - FIELDS = ['uuid', 'questions', 'name', 'short_name', 'description', 'voters_hash', 'openreg', - 'frozen_at', 'public_key', 'private_key', 'cast_url', 'result', 'result_proof', 'use_voter_aliases', 'voting_starts_at', 'voting_ends_at', 'election_type'] + @property + def pretty_result(self): + if not self.result: + return None - JSON_FIELDS = ['uuid', 'questions', 'name', 'short_name', 'description', 'voters_hash', 'openreg', - 'frozen_at', 'public_key', 'cast_url', 'use_voter_aliases', 'voting_starts_at', 'voting_ends_at'] + # get the winners + winners = self.winners - # need to add in v3.1: use_advanced_audit_features, election_type, and probably more + raw_result = self.result + prettified_result = [] - def init_tally(self): - return Tally(election=self) + # loop through questions + for i in range(len(self.questions)): + q = self.questions[i] + pretty_question = [] - def _process_value_in(self, field_name, field_value): - if field_name == 'frozen_at' or field_name == 'voting_starts_at' or field_name == 'voting_ends_at': - if type(field_value) == str or type(field_value) == unicode: - return datetime.datetime.strptime(field_value, '%Y-%m-%d %H:%M:%S') + # go through answers + for j in range(len(q['answers'])): + a = q['answers'][j] + count = raw_result[i][j] + pretty_question.append({'answer': a, 'count': count, 'winner': (j in winners[i])}) - if field_name == 'public_key': - return algs.EGPublicKey.fromJSONDict(field_value) + prettified_result.append({'question': q['short_name'], 'answers': pretty_question}) - if field_name == 'private_key': - return algs.EGSecretKey.fromJSONDict(field_value) + return prettified_result - def _process_value_out(self, field_name, field_value): - # the date - if field_name == 'frozen_at' or field_name == 'voting_starts_at' or field_name == 'voting_ends_at': - return str(field_value) - if field_name == 'public_key' or field_name == 'private_key': - return field_value.toJSONDict() - - @property - def registration_status_pretty(self): - if self.openreg: - return "Open" - else: - return "Closed" - - @property - def winners(self): +class Voter(HeliosObject): """ - Depending on the type of each question, determine the winners - returns an array of winners for each question, aka an array of arrays. - assumes that if there is a max to the question, that's how many winners there are. + A voter in an election """ - return [one_question_winner(self.questions[i], self.result[i], self.num_cast_votes) for i in range(len(self.questions))] - - @property - def pretty_result(self): - if not self.result: - return None + FIELDS = ['election_uuid', 'uuid', 'voter_type', 'voter_id', 'name', 'alias'] + JSON_FIELDS = ['election_uuid', 'uuid', 'voter_type', 'voter_id_hash', 'name'] - # get the winners - winners = self.winners + # alternative, for when the voter is aliased + ALIASED_VOTER_JSON_FIELDS = ['election_uuid', 'uuid', 'alias'] - raw_result = self.result - prettified_result = [] + def toJSONDict(self): + if self.alias is not None: + return super(Voter, self).toJSONDict(self.ALIASED_VOTER_JSON_FIELDS) + else: + return super(Voter, self).toJSONDict() - # loop through questions - for i in range(len(self.questions)): - q = self.questions[i] - pretty_question = [] + @property + def voter_id_hash(self): + if self.voter_login_id: + # for backwards compatibility with v3.0, and since it doesn't matter + # too much if we hash the email or the unique login ID here. + return utils.hash_b64(self.voter_login_id) + else: + return utils.hash_b64(self.voter_id) - # go through answers - for j in range(len(q['answers'])): - a = q['answers'][j] - count = raw_result[i][j] - pretty_question.append({'answer': a, 'count': count, 'winner': (j in winners[i])}) - - prettified_result.append({'question': q['short_name'], 'answers': pretty_question}) - - return prettified_result - - -class Voter(HeliosObject): - """ - A voter in an election - """ - FIELDS = ['election_uuid', 'uuid', 'voter_type', 'voter_id', 'name', 'alias'] - JSON_FIELDS = ['election_uuid', 'uuid', 'voter_type', 'voter_id_hash', 'name'] - - # alternative, for when the voter is aliased - ALIASED_VOTER_JSON_FIELDS = ['election_uuid', 'uuid', 'alias'] - - def toJSONDict(self): - fields = None - if self.alias != None: - return super(Voter, self).toJSONDict(self.ALIASED_VOTER_JSON_FIELDS) - else: - return super(Voter,self).toJSONDict() - - @property - def voter_id_hash(self): - if self.voter_login_id: - # for backwards compatibility with v3.0, and since it doesn't matter - # too much if we hash the email or the unique login ID here. - return utils.hash_b64(self.voter_login_id) - else: - return utils.hash_b64(self.voter_id) class Trustee(HeliosObject): - """ - a trustee - """ - FIELDS = ['uuid', 'public_key', 'public_key_hash', 'pok', 'decryption_factors', 'decryption_proofs', 'email'] - - def _process_value_in(self, field_name, field_value): - if field_name == 'public_key': - return algs.EGPublicKey.fromJSONDict(field_value) - - if field_name == 'pok': - return algs.DLogProof.fromJSONDict(field_value) - - def _process_value_out(self, field_name, field_value): - if field_name == 'public_key' or field_name == 'pok': - return field_value.toJSONDict() - -class CastVote(HeliosObject): - """ - A cast vote, which includes an encrypted vote and some cast metadata - """ - FIELDS = ['vote', 'cast_at', 'voter_uuid', 'voter_hash', 'vote_hash'] - - def __init__(self, *args, **kwargs): - super(CastVote, self).__init__(*args, **kwargs) - self.election = None - - @classmethod - def fromJSONDict(cls, d, election=None): - o = cls() - o.election = election - o.set_from_args(**d) - return o - - def toJSONDict(self, include_vote=True): - result = super(CastVote,self).toJSONDict() - if not include_vote: - del result['vote'] - return result - - @classmethod - def fromOtherObject(cls, o, election): - obj = cls() - obj.election = election - obj.set_from_other_object(o) - return obj - - def _process_value_in(self, field_name, field_value): - if field_name == 'cast_at': - if type(field_value) == str: - return datetime.datetime.strptime(field_value, '%Y-%m-%d %H:%M:%S') - - if field_name == 'vote': - return EncryptedVote.fromJSONDict(field_value, self.election.public_key) - - def _process_value_out(self, field_name, field_value): - # the date - if field_name == 'cast_at': - return str(field_value) - - if field_name == 'vote': - return field_value.toJSONDict() - - def issues(self, election): """ - Look for consistency problems + a trustee """ - issues = [] + FIELDS = ['uuid', 'public_key', 'public_key_hash', 'pok', 'decryption_factors', 'decryption_proofs', 'email'] - # check the election - if self.vote.election_uuid != election.uuid: - issues.append("the vote's election UUID does not match the election for which this vote is being cast") + def _process_value_in(self, field_name, field_value): + if field_name == 'public_key': + return algs.EGPublicKey.fromJSONDict(field_value) - return issues + if field_name == 'pok': + return algs.DLogProof.fromJSONDict(field_value) -class DLogTable(object): - """ - Keeping track of discrete logs - """ - - def __init__(self, base, modulus): - self.dlogs = {} - self.dlogs[1] = 0 - self.last_dlog_result = 1 - self.counter = 0 - - self.base = base - self.modulus = modulus - - def increment(self): - self.counter += 1 - - # new value - new_value = (self.last_dlog_result * self.base) % self.modulus + def _process_value_out(self, field_name, field_value): + if field_name == 'public_key' or field_name == 'pok': + return field_value.toJSONDict() - # record the discrete log - self.dlogs[new_value] = self.counter - # record the last value - self.last_dlog_result = new_value - - def precompute(self, up_to): - while self.counter < up_to: - self.increment() - - def lookup(self, value): - return self.dlogs.get(value, None) - - -class Tally(HeliosObject): - """ - A running homomorphic tally - """ - - FIELDS = ['num_tallied', 'tally'] - JSON_FIELDS = ['num_tallied', 'tally'] +class CastVote(HeliosObject): + """ + A cast vote, which includes an encrypted vote and some cast metadata + """ + FIELDS = ['vote', 'cast_at', 'voter_uuid', 'voter_hash', 'vote_hash'] - def __init__(self, *args, **kwargs): - super(Tally, self).__init__(*args, **kwargs) + def __init__(self, *args, **kwargs): + super(CastVote, self).__init__(*args, **kwargs) + self.election = None - self.election = kwargs.get('election',None) + @classmethod + def fromJSONDict(cls, d, election=None): + o = cls() + o.election = election + o.set_from_args(**d) + return o - if self.election: - self.init_election(self.election) - else: - self.questions = None - self.public_key = None + def toJSONDict(self, include_vote=True): + result = super(CastVote, self).toJSONDict() + if not include_vote: + del result['vote'] + return result - if not self.tally: - self.tally = None + @classmethod + def fromOtherObject(cls, o, election): + obj = cls() + obj.election = election + obj.set_from_other_object(o) + return obj - # initialize - if self.num_tallied == None: - self.num_tallied = 0 + def _process_value_in(self, field_name, field_value): + if field_name == 'cast_at': + if type(field_value) == str: + return datetime.datetime.strptime(field_value, '%Y-%m-%d %H:%M:%S') - def init_election(self, election): - """ - given the election, initialize some params - """ - self.questions = election.questions - self.public_key = election.public_key + if field_name == 'vote': + return EncryptedVote.fromJSONDict(field_value, self.election.public_key) - if not self.tally: - self.tally = [[0 for a in q['answers']] for q in self.questions] + def _process_value_out(self, field_name, field_value): + # the date + if field_name == 'cast_at': + return str(field_value) - def add_vote_batch(self, encrypted_votes, verify_p=True): - """ - Add a batch of votes. Eventually, this will be optimized to do an aggregate proof verification - rather than a whole proof verif for each vote. - """ - for vote in encrypted_votes: - self.add_vote(vote, verify_p) - - def add_vote(self, encrypted_vote, verify_p=True): - # do we verify? - if verify_p: - if not encrypted_vote.verify(self.election): - raise Exception('Bad Vote') - - # for each question - for question_num in range(len(self.questions)): - question = self.questions[question_num] - answers = question['answers'] - - # for each possible answer to each question - for answer_num in range(len(answers)): - # do the homomorphic addition into the tally - enc_vote_choice = encrypted_vote.encrypted_answers[question_num].choices[answer_num] - enc_vote_choice.pk = self.public_key - self.tally[question_num][answer_num] = encrypted_vote.encrypted_answers[question_num].choices[answer_num] * self.tally[question_num][answer_num] - - self.num_tallied += 1 - - def decryption_factors_and_proofs(self, sk): - """ - returns an array of decryption factors and a corresponding array of decryption proofs. - makes the decryption factors into strings, for general Helios / JS compatibility. - """ - # for all choices of all questions (double list comprehension) - decryption_factors = [] - decryption_proof = [] + if field_name == 'vote': + return field_value.toJSONDict() - for question_num, question in enumerate(self.questions): - answers = question['answers'] - question_factors = [] - question_proof = [] + def issues(self, election): + """ + Look for consistency problems + """ + issues = [] - for answer_num, answer in enumerate(answers): - # do decryption and proof of it - dec_factor, proof = sk.decryption_factor_and_proof(self.tally[question_num][answer_num]) + # check the election + if self.vote.election_uuid != election.uuid: + issues.append("the vote's election UUID does not match the election for which this vote is being cast") - # look up appropriate discrete log - # this is the string conversion - question_factors.append(str(dec_factor)) - question_proof.append(proof.toJSONDict()) + return issues - decryption_factors.append(question_factors) - decryption_proof.append(question_proof) - return decryption_factors, decryption_proof - - def decrypt_and_prove(self, sk, discrete_logs=None): +class DLogTable(object): """ - returns an array of tallies and a corresponding array of decryption proofs. + Keeping track of discrete logs """ - # who's keeping track of discrete logs? - if not discrete_logs: - discrete_logs = self.discrete_logs - - # for all choices of all questions (double list comprehension) - decrypted_tally = [] - decryption_proof = [] + def __init__(self, base, modulus): + self.dlogs = {1: 0} + self.last_dlog_result = 1 + self.counter = 0 - for question_num in range(len(self.questions)): - question = self.questions[question_num] - answers = question['answers'] - question_tally = [] - question_proof = [] + self.base = base + self.modulus = modulus - for answer_num in range(len(answers)): - # do decryption and proof of it - plaintext, proof = sk.prove_decryption(self.tally[question_num][answer_num]) + def increment(self): + self.counter += 1 - # look up appropriate discrete log - question_tally.append(discrete_logs[plaintext]) - question_proof.append(proof) + # new value + new_value = (self.last_dlog_result * self.base) % self.modulus - decrypted_tally.append(question_tally) - decryption_proof.append(question_proof) + # record the discrete log + self.dlogs[new_value] = self.counter - return decrypted_tally, decryption_proof + # record the last value + self.last_dlog_result = new_value - def verify_decryption_proofs(self, decryption_factors, decryption_proofs, public_key, challenge_generator): - """ - decryption_factors is a list of lists of dec factors - decryption_proofs are the corresponding proofs - public_key is, of course, the public key of the trustee - """ + def precompute(self, up_to): + while self.counter < up_to: + self.increment() - # go through each one - for q_num, q in enumerate(self.tally): - for a_num, answer_tally in enumerate(q): - # parse the proof - proof = algs.EGZKProof.fromJSONDict(decryption_proofs[q_num][a_num]) + def lookup(self, value): + return self.dlogs.get(value, None) - # check that g, alpha, y, dec_factor is a DH tuple - if not proof.verify(public_key.g, answer_tally.alpha, public_key.y, int(decryption_factors[q_num][a_num]), public_key.p, public_key.q, challenge_generator): - return False - return True - - def decrypt_from_factors(self, decryption_factors, public_key): +class Tally(HeliosObject): """ - decrypt a tally given decryption factors - - The decryption factors are a list of decryption factor sets, for each trustee. - Each decryption factor set is a list of lists of decryption factors (questions/answers). + A running homomorphic tally """ - # pre-compute a dlog table - dlog_table = DLogTable(base = public_key.g, modulus = public_key.p) - dlog_table.precompute(self.num_tallied) - - result = [] + FIELDS = ['num_tallied', 'tally'] + JSON_FIELDS = ['num_tallied', 'tally'] + + def __init__(self, *args, **kwargs): + super(Tally, self).__init__(*args, **kwargs) + + self.election = kwargs.get('election', None) + + if self.election: + self.init_election(self.election) + else: + self.questions = None + self.public_key = None + + if not self.tally: + self.tally = None + + # initialize + if self.num_tallied is None: + self.num_tallied = 0 + + def init_election(self, election): + """ + given the election, initialize some params + """ + self.questions = election.questions + self.public_key = election.public_key + + if not self.tally: + self.tally = [[0 for _ in q['answers']] for q in self.questions] + + def add_vote_batch(self, encrypted_votes, verify_p=True): + """ + Add a batch of votes. Eventually, this will be optimized to do an aggregate proof verification + rather than a whole proof verif for each vote. + """ + for vote in encrypted_votes: + self.add_vote(vote, verify_p) + + def add_vote(self, encrypted_vote, verify_p=True): + # do we verify? + if verify_p: + if not encrypted_vote.verify(self.election): + raise Exception('Bad Vote') + + # for each question + for question_num in range(len(self.questions)): + question = self.questions[question_num] + answers = question['answers'] + + # for each possible answer to each question + for answer_num in range(len(answers)): + # do the homomorphic addition into the tally + enc_vote_choice = encrypted_vote.encrypted_answers[question_num].choices[answer_num] + enc_vote_choice.pk = self.public_key + self.tally[question_num][answer_num] = encrypted_vote.encrypted_answers[question_num].choices[ + answer_num] * self.tally[question_num][answer_num] + + self.num_tallied += 1 + + def decryption_factors_and_proofs(self, sk): + """ + returns an array of decryption factors and a corresponding array of decryption proofs. + makes the decryption factors into strings, for general Helios / JS compatibility. + """ + # for all choices of all questions (double list comprehension) + decryption_factors = [] + decryption_proof = [] + + for question_num, question in enumerate(self.questions): + answers = question['answers'] + question_factors = [] + question_proof = [] + + for answer_num, answer in enumerate(answers): + # do decryption and proof of it + dec_factor, proof = sk.decryption_factor_and_proof(self.tally[question_num][answer_num]) + + # look up appropriate discrete log + # this is the string conversion + question_factors.append(str(dec_factor)) + question_proof.append(proof.toJSONDict()) + + decryption_factors.append(question_factors) + decryption_proof.append(question_proof) + + return decryption_factors, decryption_proof + + def decrypt_and_prove(self, sk, discrete_logs=None): + """ + returns an array of tallies and a corresponding array of decryption proofs. + """ + + # who's keeping track of discrete logs? + if not discrete_logs: + discrete_logs = self.discrete_logs + + # for all choices of all questions (double list comprehension) + decrypted_tally = [] + decryption_proof = [] + + for question_num in range(len(self.questions)): + question = self.questions[question_num] + answers = question['answers'] + question_tally = [] + question_proof = [] + + for answer_num in range(len(answers)): + # do decryption and proof of it + plaintext, proof = sk.prove_decryption(self.tally[question_num][answer_num]) + + # look up appropriate discrete log + question_tally.append(discrete_logs[plaintext]) + question_proof.append(proof) + + decrypted_tally.append(question_tally) + decryption_proof.append(question_proof) + + return decrypted_tally, decryption_proof + + def verify_decryption_proofs(self, decryption_factors, decryption_proofs, public_key, challenge_generator): + """ + decryption_factors is a list of lists of dec factors + decryption_proofs are the corresponding proofs + public_key is, of course, the public key of the trustee + """ + + # go through each one + for q_num, q in enumerate(self.tally): + for a_num, answer_tally in enumerate(q): + # parse the proof + proof = algs.EGZKProof.fromJSONDict(decryption_proofs[q_num][a_num]) + + # check that g, alpha, y, dec_factor is a DH tuple + if not proof.verify(public_key.g, answer_tally.alpha, public_key.y, + int(decryption_factors[q_num][a_num]), public_key.p, public_key.q, + challenge_generator): + return False + + return True + + def decrypt_from_factors(self, decryption_factors, public_key): + """ + decrypt a tally given decryption factors + + The decryption factors are a list of decryption factor sets, for each trustee. + Each decryption factor set is a list of lists of decryption factors (questions/answers). + """ + + # pre-compute a dlog table + dlog_table = DLogTable(base=public_key.g, modulus=public_key.p) + dlog_table.precompute(self.num_tallied) + + result = [] - # go through each one - for q_num, q in enumerate(self.tally): - q_result = [] + # go through each one + for q_num, q in enumerate(self.tally): + q_result = [] - for a_num, a in enumerate(q): - # coalesce the decryption factors into one list - dec_factor_list = [df[q_num][a_num] for df in decryption_factors] - raw_value = self.tally[q_num][a_num].decrypt(dec_factor_list, public_key) + for a_num, a in enumerate(q): + # coalesce the decryption factors into one list + dec_factor_list = [df[q_num][a_num] for df in decryption_factors] + raw_value = self.tally[q_num][a_num].decrypt(dec_factor_list, public_key) - q_result.append(dlog_table.lookup(raw_value)) + q_result.append(dlog_table.lookup(raw_value)) - result.append(q_result) + result.append(q_result) - return result + return result - def _process_value_in(self, field_name, field_value): - if field_name == 'tally': - return [[algs.EGCiphertext.fromJSONDict(a) for a in q] for q in field_value] + def _process_value_in(self, field_name, field_value): + if field_name == 'tally': + return [[algs.EGCiphertext.fromJSONDict(a) for a in q] for q in field_value] - def _process_value_out(self, field_name, field_value): - if field_name == 'tally': - return [[a.toJSONDict() for a in q] for q in field_value] + def _process_value_out(self, field_name, field_value): + if field_name == 'tally': + return [[a.toJSONDict() for a in q] for q in field_value] diff --git a/helios/crypto/elgamal.py b/helios/crypto/elgamal.py index 88a08c01ba6c7ab7366281faf989d72427c1d4a8..c6b89b3820284e5c9df17849be4c5fe77e02d917 100644 --- a/helios/crypto/elgamal.py +++ b/helios/crypto/elgamal.py @@ -8,12 +8,13 @@ Ben Adida ben@adida.net """ -import math, hashlib, logging -import randpool, number +import logging -import numtheory +from Crypto.Hash import SHA1 +from Crypto.Util.number import inverse + +from helios.crypto.utils import random -from algs import Utils class Cryptosystem(object): def __init__(self): @@ -21,30 +22,6 @@ class Cryptosystem(object): self.q = None self.g = None - @classmethod - def generate(cls, n_bits): - """ - generate an El-Gamal environment. Returns an instance - of ElGamal(), with prime p, group size q, and generator g - """ - - EG = cls() - - # find a prime p such that (p-1)/2 is prime q - EG.p = Utils.random_safe_prime(n_bits) - - # q is the order of the group - # FIXME: not always p-1/2 - EG.q = (EG.p-1)/2 - - # find g that generates the q-order subgroup - while True: - EG.g = Utils.random_mpz_lt(EG.p) - if pow(EG.g, EG.q, EG.p) == 1: - break - - return EG - def generate_keypair(self): """ generates a keypair in the setting @@ -68,7 +45,7 @@ class KeyPair(object): self.pk.p = p self.pk.q = q - self.sk.x = Utils.random_mpz_lt(q) + self.sk.x = random.mpz_lt(q) self.pk.y = pow(g, self.sk.x, p) self.sk.public_key = self.pk @@ -106,7 +83,7 @@ class PublicKey: """ Encrypt a plaintext and return the randomness just generated and used. """ - r = Utils.random_mpz_lt(self.q) + r = random.mpz_lt(self.q) ciphertext = self.encrypt_with_r(plaintext, r) return [ciphertext, r] @@ -181,7 +158,7 @@ class SecretKey: if not dec_factor: dec_factor = self.decryption_factor(ciphertext) - m = (Utils.inverse(dec_factor, self.pk.p) * ciphertext.beta) % self.pk.p + m = (inverse(dec_factor, self.pk.p) * ciphertext.beta) % self.pk.p if decode_m: # get m back from the q-order subgroup @@ -207,15 +184,15 @@ class SecretKey: and alpha^t = b * beta/m ^ c """ - m = (Utils.inverse(pow(ciphertext.alpha, self.x, self.pk.p), self.pk.p) * ciphertext.beta) % self.pk.p - beta_over_m = (ciphertext.beta * Utils.inverse(m, self.pk.p)) % self.pk.p + m = (inverse(pow(ciphertext.alpha, self.x, self.pk.p), self.pk.p) * ciphertext.beta) % self.pk.p + beta_over_m = (ciphertext.beta * inverse(m, self.pk.p)) % self.pk.p # pick a random w - w = Utils.random_mpz_lt(self.pk.q) + w = random.mpz_lt(self.pk.q) a = pow(self.pk.g, w, self.pk.p) b = pow(ciphertext.alpha, w, self.pk.p) - c = int(hashlib.sha1(str(a) + "," + str(b)).hexdigest(),16) + c = int(SHA1.new(str(a) + "," + str(b)).hexdigest(),16) t = (w + self.x * c) % self.pk.q @@ -232,7 +209,7 @@ class SecretKey: Verifier provides challenge modulo q. Prover computes response = w + x*challenge mod q, where x is the secret key. """ - w = Utils.random_mpz_lt(self.pk.q) + w = random.mpz_lt(self.pk.q) commitment = pow(self.pk.g, w, self.pk.p) challenge = challenge_generator(commitment) % self.pk.q response = (w + (self.x * challenge)) % self.pk.q @@ -287,7 +264,7 @@ class Ciphertext: """ Reencryption with fresh randomness, which is returned. """ - r = Utils.random_mpz_lt(self.pk.q) + r = random.mpz_lt(self.pk.q) new_c = self.reenc_with_r(r) return [new_c, r] @@ -311,7 +288,7 @@ class Ciphertext: Generate the disjunctive encryption proof of encryption """ # random W - w = Utils.random_mpz_lt(self.pk.q) + w = random.mpz_lt(self.pk.q) # build the proof proof = ZKProof() @@ -331,20 +308,20 @@ class Ciphertext: def simulate_encryption_proof(self, plaintext, challenge=None): # generate a random challenge if not provided if not challenge: - challenge = Utils.random_mpz_lt(self.pk.q) + challenge = random.mpz_lt(self.pk.q) proof = ZKProof() proof.challenge = challenge # compute beta/plaintext, the completion of the DH tuple - beta_over_plaintext = (self.beta * Utils.inverse(plaintext.m, self.pk.p)) % self.pk.p + beta_over_plaintext = (self.beta * inverse(plaintext.m, self.pk.p)) % self.pk.p # random response, does not even need to depend on the challenge - proof.response = Utils.random_mpz_lt(self.pk.q); + proof.response = random.mpz_lt(self.pk.q); # now we compute A and B - proof.commitment['A'] = (Utils.inverse(pow(self.alpha, proof.challenge, self.pk.p), self.pk.p) * pow(self.pk.g, proof.response, self.pk.p)) % self.pk.p - proof.commitment['B'] = (Utils.inverse(pow(beta_over_plaintext, proof.challenge, self.pk.p), self.pk.p) * pow(self.pk.y, proof.response, self.pk.p)) % self.pk.p + proof.commitment['A'] = (inverse(pow(self.alpha, proof.challenge, self.pk.p), self.pk.p) * pow(self.pk.g, proof.response, self.pk.p)) % self.pk.p + proof.commitment['B'] = (inverse(pow(beta_over_plaintext, proof.challenge, self.pk.p), self.pk.p) * pow(self.pk.y, proof.response, self.pk.p)) % self.pk.p return proof @@ -397,7 +374,7 @@ class Ciphertext: first_check = (pow(self.pk.g, proof.response, self.pk.p) == ((pow(self.alpha, proof.challenge, self.pk.p) * proof.commitment['A']) % self.pk.p)) # check that y^response = B * (beta/m)^challenge - beta_over_m = (self.beta * Utils.inverse(plaintext.m, self.pk.p)) % self.pk.p + beta_over_m = (self.beta * inverse(plaintext.m, self.pk.p)) % self.pk.p second_check = (pow(self.pk.y, proof.response, self.pk.p) == ((pow(beta_over_m, proof.challenge, self.pk.p) * proof.commitment['B']) % self.pk.p)) # print "1,2: %s %s " % (first_check, second_check) @@ -444,7 +421,7 @@ class Ciphertext: """ running_decryption = self.beta for dec_factor in decryption_factors: - running_decryption = (running_decryption * Utils.inverse(dec_factor, public_key.p)) % public_key.p + running_decryption = (running_decryption * inverse(dec_factor, public_key.p)) % public_key.p return running_decryption @@ -473,7 +450,7 @@ class ZKProof(object): """ # generate random w - w = Utils.random_mpz_lt(q) + w = random.mpz_lt(q) # create proof instance proof = cls() @@ -526,7 +503,7 @@ def disjunctive_challenge_generator(commitments): array_to_hash.append(str(commitment['B'])) string_to_hash = ",".join(array_to_hash) - return int(hashlib.sha1(string_to_hash).hexdigest(),16) + return int(SHA1.new(string_to_hash).hexdigest(),16) # a challenge generator for Fiat-Shamir with A,B commitment def fiatshamir_challenge_generator(commitment): @@ -534,5 +511,5 @@ def fiatshamir_challenge_generator(commitment): def DLog_challenge_generator(commitment): string_to_hash = str(commitment) - return int(hashlib.sha1(string_to_hash).hexdigest(),16) + return int(SHA1.new(string_to_hash).hexdigest(),16) diff --git a/helios/crypto/number.py b/helios/crypto/number.py deleted file mode 100644 index 9d50563e904ab50a5446916953b0091c2f228031..0000000000000000000000000000000000000000 --- a/helios/crypto/number.py +++ /dev/null @@ -1,201 +0,0 @@ -# -# number.py : Number-theoretic functions -# -# Part of the Python Cryptography Toolkit -# -# Distribute and use freely; there are no restrictions on further -# dissemination and usage except those imposed by the laws of your -# country of residence. This software is provided "as is" without -# warranty of fitness for use or suitability for any purpose, express -# or implied. Use at your own risk or not at all. -# - -__revision__ = "$Id: number.py,v 1.13 2003/04/04 18:21:07 akuchling Exp $" - -bignum = long -try: - from Crypto.PublicKey import _fastmath -except ImportError: - _fastmath = None - -# Commented out and replaced with faster versions below -## def long2str(n): -## s='' -## while n>0: -## s=chr(n & 255)+s -## n=n>>8 -## return s - -## import types -## def str2long(s): -## if type(s)!=types.StringType: return s # Integers will be left alone -## return reduce(lambda x,y : x*256+ord(y), s, 0L) - -def size (N): - """size(N:long) : int - Returns the size of the number N in bits. - """ - bits, power = 0,1L - while N >= power: - bits += 1 - power = power << 1 - return bits - -def getRandomNumber(N, randfunc): - """getRandomNumber(N:int, randfunc:callable):long - Return an N-bit random number.""" - - S = randfunc(N/8) - odd_bits = N % 8 - if odd_bits != 0: - char = ord(randfunc(1)) >> (8-odd_bits) - S = chr(char) + S - value = bytes_to_long(S) - value |= 2L ** (N-1) # Ensure high bit is set - assert size(value) >= N - return value - -def GCD(x,y): - """GCD(x:long, y:long): long - Return the GCD of x and y. - """ - x = abs(x) ; y = abs(y) - while x > 0: - x, y = y % x, x - return y - -def inverse(u, v): - """inverse(u:long, u:long):long - Return the inverse of u mod v. - """ - u3, v3 = long(u), long(v) - u1, v1 = 1L, 0L - while v3 > 0: - q=u3 / v3 - u1, v1 = v1, u1 - v1*q - u3, v3 = v3, u3 - v3*q - while u1<0: - u1 = u1 + v - return u1 - -# Given a number of bits to generate and a random generation function, -# find a prime number of the appropriate size. - -def getPrime(N, randfunc): - """getPrime(N:int, randfunc:callable):long - Return a random N-bit prime number. - """ - - number=getRandomNumber(N, randfunc) | 1 - while (not isPrime(number)): - number=number+2 - return number - -def isPrime(N): - """isPrime(N:long):bool - Return true if N is prime. - """ - if N == 1: - return 0 - if N in sieve: - return 1 - for i in sieve: - if (N % i)==0: - return 0 - - # Use the accelerator if available - if _fastmath is not None: - return _fastmath.isPrime(N) - - # Compute the highest bit that's set in N - N1 = N - 1L - n = 1L - while (n<N): - n=n<<1L - n = n >> 1L - - # Rabin-Miller test - for c in sieve[:7]: - a=long(c) ; d=1L ; t=n - while (t): # Iterate over the bits in N1 - x=(d*d) % N - if x==1L and d!=1L and d!=N1: - return 0 # Square root of 1 found - if N1 & t: - d=(x*a) % N - else: - d=x - t = t >> 1L - if d!=1L: - return 0 - return 1 - -# Small primes used for checking primality; these are all the primes -# less than 256. This should be enough to eliminate most of the odd -# numbers before needing to do a Rabin-Miller test at all. - -sieve=[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, - 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, - 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, - 197, 199, 211, 223, 227, 229, 233, 239, 241, 251] - -# Improved conversion functions contributed by Barry Warsaw, after -# careful benchmarking - -import struct - -def long_to_bytes(n, blocksize=0): - """long_to_bytes(n:long, blocksize:int) : string - Convert a long integer to a byte string. - - If optional blocksize is given and greater than zero, pad the front of the - byte string with binary zeros so that the length is a multiple of - blocksize. - """ - # after much testing, this algorithm was deemed to be the fastest - s = '' - n = long(n) - pack = struct.pack - while n > 0: - s = pack('>I', n & 0xffffffffL) + s - n = n >> 32 - # strip off leading zeros - for i in range(len(s)): - if s[i] != '\000': - break - else: - # only happens when n == 0 - s = '\000' - i = 0 - s = s[i:] - # add back some pad bytes. this could be done more efficiently w.r.t. the - # de-padding being done above, but sigh... - if blocksize > 0 and len(s) % blocksize: - s = (blocksize - len(s) % blocksize) * '\000' + s - return s - -def bytes_to_long(s): - """bytes_to_long(string) : long - Convert a byte string to a long integer. - - This is (essentially) the inverse of long_to_bytes(). - """ - acc = 0L - unpack = struct.unpack - length = len(s) - if length % 4: - extra = (4 - length % 4) - s = '\000' * extra + s - length = length + extra - for i in range(0, length, 4): - acc = (acc << 32) + unpack('>I', s[i:i+4])[0] - return acc - -# For backwards compatibility... -import warnings -def long2str(n, blocksize=0): - warnings.warn("long2str() has been replaced by long_to_bytes()") - return long_to_bytes(n, blocksize) -def str2long(s): - warnings.warn("str2long() has been replaced by bytes_to_long()") - return bytes_to_long(s) diff --git a/helios/crypto/randpool.py b/helios/crypto/randpool.py deleted file mode 100644 index 53a8acc035c225b23ea9e6edd2a8b27b39b75082..0000000000000000000000000000000000000000 --- a/helios/crypto/randpool.py +++ /dev/null @@ -1,422 +0,0 @@ -# -# randpool.py : Cryptographically strong random number generation -# -# Part of the Python Cryptography Toolkit -# -# Distribute and use freely; there are no restrictions on further -# dissemination and usage except those imposed by the laws of your -# country of residence. This software is provided "as is" without -# warranty of fitness for use or suitability for any purpose, express -# or implied. Use at your own risk or not at all. -# - -__revision__ = "$Id: randpool.py,v 1.14 2004/05/06 12:56:54 akuchling Exp $" - -import time, array, types, warnings, os.path -from number import long_to_bytes -try: - import Crypto.Util.winrandom as winrandom -except: - winrandom = None - -STIRNUM = 3 - -class RandomPool: - """randpool.py : Cryptographically strong random number generation. - - The implementation here is similar to the one in PGP. To be - cryptographically strong, it must be difficult to determine the RNG's - output, whether in the future or the past. This is done by using - a cryptographic hash function to "stir" the random data. - - Entropy is gathered in the same fashion as PGP; the highest-resolution - clock around is read and the data is added to the random number pool. - A conservative estimate of the entropy is then kept. - - If a cryptographically secure random source is available (/dev/urandom - on many Unixes, Windows CryptGenRandom on most Windows), then use - it. - - Instance Attributes: - bits : int - Maximum size of pool in bits - bytes : int - Maximum size of pool in bytes - entropy : int - Number of bits of entropy in this pool. - - Methods: - add_event([s]) : add some entropy to the pool - get_bytes(int) : get N bytes of random data - randomize([N]) : get N bytes of randomness from external source - """ - - - def __init__(self, numbytes = 160, cipher=None, hash=None): - if hash is None: - from hashlib import sha1 as hash - - # The cipher argument is vestigial; it was removed from - # version 1.1 so RandomPool would work even in the limited - # exportable subset of the code - if cipher is not None: - warnings.warn("'cipher' parameter is no longer used") - - if isinstance(hash, types.StringType): - # ugly hack to force __import__ to give us the end-path module - hash = __import__('Crypto.Hash.'+hash, - None, None, ['new']) - warnings.warn("'hash' parameter should now be a hashing module") - - self.bytes = numbytes - self.bits = self.bytes*8 - self.entropy = 0 - self._hash = hash - - # Construct an array to hold the random pool, - # initializing it to 0. - self._randpool = array.array('B', [0]*self.bytes) - - self._event1 = self._event2 = 0 - self._addPos = 0 - self._getPos = hash().digest_size - self._lastcounter=time.time() - self.__counter = 0 - - self._measureTickSize() # Estimate timer resolution - self._randomize() - - def _updateEntropyEstimate(self, nbits): - self.entropy += nbits - if self.entropy < 0: - self.entropy = 0 - elif self.entropy > self.bits: - self.entropy = self.bits - - def _randomize(self, N = 0, devname = '/dev/urandom'): - """_randomize(N, DEVNAME:device-filepath) - collects N bits of randomness from some entropy source (e.g., - /dev/urandom on Unixes that have it, Windows CryptoAPI - CryptGenRandom, etc) - DEVNAME is optional, defaults to /dev/urandom. You can change it - to /dev/random if you want to block till you get enough - entropy. - """ - data = '' - if N <= 0: - nbytes = int((self.bits - self.entropy)/8+0.5) - else: - nbytes = int(N/8+0.5) - if winrandom: - # Windows CryptGenRandom provides random data. - data = winrandom.new().get_bytes(nbytes) - # GAE fix, benadida - #elif os.path.exists(devname): - # # Many OSes support a /dev/urandom device - # try: - # f=open(devname) - # data=f.read(nbytes) - # f.close() - # except IOError, (num, msg): - # if num!=2: raise IOError, (num, msg) - # # If the file wasn't found, ignore the error - if data: - self._addBytes(data) - # Entropy estimate: The number of bits of - # data obtained from the random source. - self._updateEntropyEstimate(8*len(data)) - self.stir_n() # Wash the random pool - - def randomize(self, N=0): - """randomize(N:int) - use the class entropy source to get some entropy data. - This is overridden by KeyboardRandomize(). - """ - return self._randomize(N) - - def stir_n(self, N = STIRNUM): - """stir_n(N) - stirs the random pool N times - """ - for i in xrange(N): - self.stir() - - def stir (self, s = ''): - """stir(s:string) - Mix up the randomness pool. This will call add_event() twice, - but out of paranoia the entropy attribute will not be - increased. The optional 's' parameter is a string that will - be hashed with the randomness pool. - """ - - entropy=self.entropy # Save inital entropy value - self.add_event() - - # Loop over the randomness pool: hash its contents - # along with a counter, and add the resulting digest - # back into the pool. - for i in range(self.bytes / self._hash().digest_size): - h = self._hash(self._randpool) - h.update(str(self.__counter) + str(i) + str(self._addPos) + s) - self._addBytes( h.digest() ) - self.__counter = (self.__counter + 1) & 0xFFFFffffL - - self._addPos, self._getPos = 0, self._hash().digest_size - self.add_event() - - # Restore the old value of the entropy. - self.entropy=entropy - - - def get_bytes (self, N): - """get_bytes(N:int) : string - Return N bytes of random data. - """ - - s='' - i, pool = self._getPos, self._randpool - h=self._hash() - dsize = self._hash().digest_size - num = N - while num > 0: - h.update( self._randpool[i:i+dsize] ) - s = s + h.digest() - num = num - dsize - i = (i + dsize) % self.bytes - if i<dsize: - self.stir() - i=self._getPos - - self._getPos = i - self._updateEntropyEstimate(- 8*N) - return s[:N] - - - def add_event(self, s=''): - """add_event(s:string) - Add an event to the random pool. The current time is stored - between calls and used to estimate the entropy. The optional - 's' parameter is a string that will also be XORed into the pool. - Returns the estimated number of additional bits of entropy gain. - """ - event = time.time()*1000 - delta = self._noise() - s = (s + long_to_bytes(event) + - 4*chr(0xaa) + long_to_bytes(delta) ) - self._addBytes(s) - if event==self._event1 and event==self._event2: - # If events are coming too closely together, assume there's - # no effective entropy being added. - bits=0 - else: - # Count the number of bits in delta, and assume that's the entropy. - bits=0 - while delta: - delta, bits = delta>>1, bits+1 - if bits>8: bits=8 - - self._event1, self._event2 = event, self._event1 - - self._updateEntropyEstimate(bits) - return bits - - # Private functions - def _noise(self): - # Adds a bit of noise to the random pool, by adding in the - # current time and CPU usage of this process. - # The difference from the previous call to _noise() is taken - # in an effort to estimate the entropy. - t=time.time() - delta = (t - self._lastcounter)/self._ticksize*1e6 - self._lastcounter = t - self._addBytes(long_to_bytes(long(1000*time.time()))) - self._addBytes(long_to_bytes(long(1000*time.clock()))) - self._addBytes(long_to_bytes(long(1000*time.time()))) - self._addBytes(long_to_bytes(long(delta))) - - # Reduce delta to a maximum of 8 bits so we don't add too much - # entropy as a result of this call. - delta=delta % 0xff - return int(delta) - - - def _measureTickSize(self): - # _measureTickSize() tries to estimate a rough average of the - # resolution of time that you can see from Python. It does - # this by measuring the time 100 times, computing the delay - # between measurements, and taking the median of the resulting - # list. (We also hash all the times and add them to the pool) - interval = [None] * 100 - h = self._hash(`(id(self),id(interval))`) - - # Compute 100 differences - t=time.time() - h.update(`t`) - i = 0 - j = 0 - while i < 100: - t2=time.time() - h.update(`(i,j,t2)`) - j += 1 - delta=int((t2-t)*1e6) - if delta: - interval[i] = delta - i += 1 - t=t2 - - # Take the median of the array of intervals - interval.sort() - self._ticksize=interval[len(interval)/2] - h.update(`(interval,self._ticksize)`) - # mix in the measurement times and wash the random pool - self.stir(h.digest()) - - def _addBytes(self, s): - "XOR the contents of the string S into the random pool" - i, pool = self._addPos, self._randpool - for j in range(0, len(s)): - pool[i]=pool[i] ^ ord(s[j]) - i=(i+1) % self.bytes - self._addPos = i - - # Deprecated method names: remove in PCT 2.1 or later. - def getBytes(self, N): - warnings.warn("getBytes() method replaced by get_bytes()", - DeprecationWarning) - return self.get_bytes(N) - - def addEvent (self, event, s=""): - warnings.warn("addEvent() method replaced by add_event()", - DeprecationWarning) - return self.add_event(s + str(event)) - -class PersistentRandomPool (RandomPool): - def __init__ (self, filename=None, *args, **kwargs): - RandomPool.__init__(self, *args, **kwargs) - self.filename = filename - if filename: - try: - # the time taken to open and read the file might have - # a little disk variability, modulo disk/kernel caching... - f=open(filename, 'rb') - self.add_event() - data = f.read() - self.add_event() - # mix in the data from the file and wash the random pool - self.stir(data) - f.close() - except IOError: - # Oh, well; the file doesn't exist or is unreadable, so - # we'll just ignore it. - pass - - def save(self): - if self.filename == "": - raise ValueError, "No filename set for this object" - # wash the random pool before save, provides some forward secrecy for - # old values of the pool. - self.stir_n() - f=open(self.filename, 'wb') - self.add_event() - f.write(self._randpool.tostring()) - f.close() - self.add_event() - # wash the pool again, provide some protection for future values - self.stir() - -# non-echoing Windows keyboard entry -_kb = 0 -if not _kb: - try: - import msvcrt - class KeyboardEntry: - def getch(self): - c = msvcrt.getch() - if c in ('\000', '\xe0'): - # function key - c += msvcrt.getch() - return c - def close(self, delay = 0): - if delay: - time.sleep(delay) - while msvcrt.kbhit(): - msvcrt.getch() - _kb = 1 - except: - pass - -# non-echoing Posix keyboard entry -if not _kb: - try: - import termios - class KeyboardEntry: - def __init__(self, fd = 0): - self._fd = fd - self._old = termios.tcgetattr(fd) - new = termios.tcgetattr(fd) - new[3]=new[3] & ~termios.ICANON & ~termios.ECHO - termios.tcsetattr(fd, termios.TCSANOW, new) - def getch(self): - termios.tcflush(0, termios.TCIFLUSH) # XXX Leave this in? - return os.read(self._fd, 1) - def close(self, delay = 0): - if delay: - time.sleep(delay) - termios.tcflush(self._fd, termios.TCIFLUSH) - termios.tcsetattr(self._fd, termios.TCSAFLUSH, self._old) - _kb = 1 - except: - pass - -class KeyboardRandomPool (PersistentRandomPool): - def __init__(self, *args, **kwargs): - PersistentRandomPool.__init__(self, *args, **kwargs) - - def randomize(self, N = 0): - "Adds N bits of entropy to random pool. If N is 0, fill up pool." - import os, string, time - if N <= 0: - bits = self.bits - self.entropy - else: - bits = N*8 - if bits == 0: - return - print bits,'bits of entropy are now required. Please type on the keyboard' - print 'until enough randomness has been accumulated.' - kb = KeyboardEntry() - s='' # We'll save the characters typed and add them to the pool. - hash = self._hash - e = 0 - try: - while e < bits: - temp=str(bits-e).rjust(6) - os.write(1, temp) - s=s+kb.getch() - e += self.add_event(s) - os.write(1, 6*chr(8)) - self.add_event(s+hash.new(s).digest() ) - finally: - kb.close() - print '\n\007 Enough. Please wait a moment.\n' - self.stir_n() # wash the random pool. - kb.close(4) - -if __name__ == '__main__': - pool = RandomPool() - print 'random pool entropy', pool.entropy, 'bits' - pool.add_event('something') - print `pool.get_bytes(100)` - import tempfile, os - fname = tempfile.mktemp() - pool = KeyboardRandomPool(filename=fname) - print 'keyboard random pool entropy', pool.entropy, 'bits' - pool.randomize() - print 'keyboard random pool entropy', pool.entropy, 'bits' - pool.randomize(128) - pool.save() - saved = open(fname, 'rb').read() - print 'saved', `saved` - print 'pool ', `pool._randpool.tostring()` - newpool = PersistentRandomPool(fname) - print 'persistent random pool entropy', pool.entropy, 'bits' - os.remove(fname) diff --git a/helios/crypto/utils.py b/helios/crypto/utils.py index 258f4130cb45bf509bc62710674363830ebe6a5d..08df21b2ced75c8305dd223476c2a8bbb5c1f0eb 100644 --- a/helios/crypto/utils.py +++ b/helios/crypto/utils.py @@ -1,31 +1,41 @@ """ Crypto Utils """ -import hashlib -import hmac, base64, json +import base64 +import json +import math + +from Crypto.Hash import SHA256 +from Crypto.Random.random import StrongRandom + +random = StrongRandom() + + +def random_mpz_lt(maximum, strong_random=random): + n_bits = int(math.floor(math.log(maximum, 2))) + res = strong_random.getrandbits(n_bits) + while res >= maximum: + res = strong_random.getrandbits(n_bits) + return res + + +random.mpz_lt = random_mpz_lt + -from hashlib import sha256 - def hash_b64(s): - """ - hash the string using sha1 and produce a base64 output - removes the trailing "=" - """ - hasher = sha256(s) - result= base64.b64encode(hasher.digest())[:-1] - return result + """ + hash the string using sha256 and produce a base64 output + removes the trailing "=" + """ + hasher = SHA256.new(s.encode('utf-8')) + result = base64.b64encode(hasher.digest())[:-1] + return result -def to_json(d): - return json.dumps(d, sort_keys=True) -def from_json(json_str): - if not json_str: return None - return json.loads(json_str) +def to_json(d): + return json.dumps(d, sort_keys=True) -def do_hmac(k,s): - """ - HMAC a value with a key, hex output - """ - mac = hmac.new(k, s, hashlib.sha1) - return mac.hexdigest() \ No newline at end of file +def from_json(json_str): + if not json_str: return None + return json.loads(json_str) diff --git a/helios/models.py b/helios/models.py index 4252d1693e0c113c1007f22b69260851a9883bff..6cb505524e812dfc2da8b3ff0b73d04d6a834dfa 100644 --- a/helios/models.py +++ b/helios/models.py @@ -12,7 +12,6 @@ import bleach import copy import csv import io -import random import unicodecsv import uuid from django.conf import settings @@ -21,6 +20,8 @@ from django.db import models, transaction from crypto import algs, utils from helios import datatypes from helios import utils as heliosutils +from helios.crypto.elgamal import Cryptosystem +from helios.crypto.utils import random from helios.datatypes.djangofield import LDObjectField # useful stuff in helios_auth from helios_auth.jsonfield import JSONField @@ -526,6 +527,7 @@ class Election(HeliosModel): """ generate a trustee including the secret key, thus a helios-based trustee + :type params: Cryptosystem """ # FIXME: generate the keypair keypair = params.generate_keypair() diff --git a/helios/utils.py b/helios/utils.py index 03095a75074b55123571991a8e35cf0924248431..2ddc11c733f026a272b9b0f27b5cf54afae35bf2 100644 --- a/helios/utils.py +++ b/helios/utils.py @@ -7,12 +7,11 @@ Ben Adida - ben@adida.net import urllib, re, datetime, string +from helios.crypto.utils import random # utils from helios_auth, too from helios_auth.utils import * from django.conf import settings - -import random, logging def split_by_length(str, length, rejoin_with=None): @@ -107,11 +106,8 @@ def xss_strip_all_tags(s): return re.sub("(?s)<[^>]*>|&#?\w+;", fixup, s) - -random.seed() def random_string(length=20, alphabet=None): - random.seed() ALPHABET = alphabet or 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' r_string = '' for i in range(length): diff --git a/helios/workflows/homomorphic.py b/helios/workflows/homomorphic.py index c3c98eb715482af7421a90e42f3c23c74c71e12f..9becf335a1654fc1448385266bd106fd9b57c945 100644 --- a/helios/workflows/homomorphic.py +++ b/helios/workflows/homomorphic.py @@ -124,7 +124,7 @@ class EncryptedAnswer(WorkflowObject): num_selected_answers += 1 # randomness and encryption - randomness[answer_num] = algs.Utils.random_mpz_lt(pk.q) + randomness[answer_num] = algs.random.mpz_lt(pk.q) choices[answer_num] = pk.encrypt_with_r(plaintexts[plaintext_index], randomness[answer_num]) # generate proof diff --git a/requirements.txt b/requirements.txt index 8e69fb261c4417d9c3c41cdd8a10c8911ecb4713..d779153ddf26e55b5d21249a094de8481a5063f0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,3 +21,4 @@ django-ses==0.6.0 validate_email==1.2 oauth2client==1.2 rollbar==0.12.1 +pycryptodome==3.8.2