Skip to content
Snippets Groups Projects
Commit 3f12431d authored by jan.bednarik's avatar jan.bednarik
Browse files

Pirati SSO auth system

parent dd014684
Branches
No related tags found
No related merge requests found
...@@ -8,3 +8,6 @@ celerybeat-* ...@@ -8,3 +8,6 @@ celerybeat-*
env.sh env.sh
.cache .cache
.idea/ .idea/
docker-compose.yml
.venv
.envrc
from django.conf import settings from django.conf import settings
from . import password, twitter, linkedin, cas, facebook, google, yahoo, clever, github from . import password, twitter, linkedin, cas, facebook, google, yahoo, clever, github, pirati
AUTH_SYSTEMS = {} AUTH_SYSTEMS = {}
...@@ -12,6 +12,7 @@ AUTH_SYSTEMS['google'] = google ...@@ -12,6 +12,7 @@ AUTH_SYSTEMS['google'] = google
AUTH_SYSTEMS['yahoo'] = yahoo AUTH_SYSTEMS['yahoo'] = yahoo
AUTH_SYSTEMS['clever'] = clever AUTH_SYSTEMS['clever'] = clever
AUTH_SYSTEMS['github'] = github AUTH_SYSTEMS['github'] = github
AUTH_SYSTEMS['pirati'] = pirati
# not ready # not ready
#import live #import live
......
"""
Pirati Authentication
"""
import json
import re
from urllib import request
from celery import shared_task
from django.core.mail import send_mail
from django.conf import settings
from requests_oauthlib import OAuth2Session
from unidecode import unidecode
# some parameters to indicate that status updating is not possible
STATUS_UPDATES = False
# display tweaks
LOGIN_MESSAGE = "Přihlásit se pirátskou identitou"
PIRATI_ENDPOINT_URL = f"{settings.PIRATI_REALM_URL}/protocol/openid-connect/auth"
PIRATI_TOKEN_URL = f"{settings.PIRATI_REALM_URL}/protocol/openid-connect/token"
PIRATI_USERINFO_URL = f"{settings.PIRATI_REALM_URL}/protocol/openid-connect/userinfo"
###############################################################################
# Custom helper functions
def call_octopus_api(query, variables=None):
payload = json.dumps({"query": query, "variables": variables or {}}).encode("utf-8")
req = request.Request(settings.OCTOPUS_API_URL, method="POST")
req.add_header("Content-Type", "application/json; charset=utf-8")
req.add_header("Authorization", f"Bearer {settings.OCTOPUS_API_TOKEN}")
response = request.urlopen(req, payload)
data = json.loads(response.read())
if "errors" in data:
raise RuntimeError(
f"API call failed!\n - query:\n----\n{query}\n----\n - variables:\n----\n{variables}\n----\n- response:\n----\n{data}\n----\n"
)
return data
def get_octopus_person(username):
query = """
query data ($username: String!) {
allPeople (first: 1, filters: {username: {iExact: $username}}) {
edges {
node {
username
displayName
email
}
}
}
}
"""
variables = {"username": username}
data = call_octopus_api(query, variables)
return data["data"]["allPeople"]["edges"][0]["node"]
def get_octopus_groups():
query = """
query {
allGroups (filters: {voting: true}) {
edges {
node {
id
name
}
}
}
}
"""
data = call_octopus_api(query)
return [edge["node"] for edge in data["data"]["allGroups"]["edges"]]
def get_octopus_group(group_id):
query = """
query data ($id: GlobalID!) {
group (id: $id) {
id
name
}
}
"""
variables = {"id": group_id}
data = call_octopus_api(query, variables)
return data["data"]["group"]
def get_octopus_group_members(group_id):
query = """
query data ($id: GlobalID!) {
group (id: $id) {
id
memberships {
person {
username
displayName
email
}
}
}
}
"""
variables = {"id": group_id}
data = call_octopus_api(query, variables)
return [m["person"] for m in data["data"]["group"]["memberships"]]
def group_sorter(group):
name = unidecode(group["name"])
if name == "Celostatni forum":
return f"0_{name}"
if re.match(r"^K[FS] ", name):
return f"1_{name[3:]}"
if re.match(r"^M[FS] Ch", name):
return f"3_{name[3:]}"
if re.match(r"^M[FS] [ABCDEFGH]", name):
return f"2_{name[3:]}"
if re.match(r"^M[FS] ", name):
return f"4_{name[3:]}"
return f"9_{name}"
def is_old_group(group_id):
try:
int(group_id)
return True
except:
pass
if group_id.startswith("deadbeef-babe-"):
return True
return False
def person_to_user_info(person):
return {
"type": "pirati",
"id": person["username"].lower(),
"name": person["displayName"],
"info": {"email": person["email"], "name": person["displayName"]},
"token": {},
}
def old_member_to_user_info(member):
return {
"type": "pirati",
"id": member["username"].lower(),
"name": member["username"],
"info": {"email": member["email"], "name": member["username"]},
"token": {},
}
###############################################################################
# Celery tasks
@shared_task(
autoretry_for=(Exception,),
max_retries=160, # 3 days
retry_backoff=True,
retry_backoff_max=1800,
)
def send_email_task(recipient, subject, body):
send_mail(subject, body, settings.SERVER_EMAIL, [recipient], fail_silently=False)
###############################################################################
# Helios stuff
can_list_category_members = True
def get_auth_url(request, redirect_url):
request.session["pirate_redirect_url"] = redirect_url
oauth = OAuth2Session(settings.PIRATI_CLIENT_ID, redirect_uri=redirect_url)
url, state = oauth.authorization_url(PIRATI_ENDPOINT_URL)
return url
def get_user_info_after_auth(request):
oauth = OAuth2Session(
settings.PIRATI_CLIENT_ID, redirect_uri=request.session["pirate_redirect_url"]
)
token = oauth.fetch_token(
PIRATI_TOKEN_URL,
client_secret=settings.PIRATI_CLIENT_SECRET,
code=request.GET["code"],
)
response = oauth.get(PIRATI_USERINFO_URL)
data = response.json()
person = get_octopus_person(data["preferred_username"])
info = person_to_user_info(person)
info["user_id"] = info.pop("id")
return info
def do_logout(user):
return None
def update_status(token, message):
pass
def send_message(user_id, user_name, user_info, subject, body):
recipient = "%s <%s>" % (user_info.get("name", user_name), user_info["email"])
send_email_task.delay(recipient, subject, body)
def generate_constraint(category_id, user):
return category_id
def eligibility_category_id(constraint):
return constraint
def check_constraint(constraint, user):
if is_old_group(constraint):
userinfo = json.load(
request.urlopen("https://graph.pirati.cz/user/" + user.user_id)
)
id = userinfo["id"]
usergroups = json.load(
request.urlopen("https://graph.pirati.cz/" + id + "/groups")
)
for usergroup in usergroups:
if usergroup["id"] == constraint:
return True
else:
people = get_octopus_group_members(constraint)
usernames = [person["username"].lower() for person in people]
if user.user_id in usernames:
return True
return False
def list_categories(user):
return sorted(get_octopus_groups(), key=group_sorter)
def list_category_members(category_id):
if is_old_group(category_id):
members = json.load(
request.urlopen("https://graph.pirati.cz/" + category_id + "/members")
)
return [old_member_to_user_info(member) for member in members]
else:
people = get_octopus_group_members(category_id)
return [person_to_user_info(person) for person in people]
def pretty_eligibility(constraint):
if is_old_group(constraint):
group = json.load(request.urlopen("https://graph.pirati.cz/" + constraint))
name = group["username"]
else:
group = get_octopus_group(constraint)
name = group["name"]
return f"Osoby ve skupině „{name}"
#
# Election Creation
#
def can_create_election(user_id, user_info):
return True
helios_auth/media/login-icons/pirati.png

6.33 KiB

...@@ -22,3 +22,6 @@ boto==2.49.0 ...@@ -22,3 +22,6 @@ boto==2.49.0
django-ses==0.8.14 django-ses==0.8.14
oauth2client==4.1.3 oauth2client==4.1.3
rollbar==0.14.7 rollbar==0.14.7
requests-oauthlib==1.3.1
unidecode==1.3.6
...@@ -14,7 +14,7 @@ def get_from_env(var, default): ...@@ -14,7 +14,7 @@ def get_from_env(var, default):
else: else:
return default return default
DEBUG = (get_from_env('DEBUG', '1') == '1') DEBUG = (get_from_env('DEBUG', '0') == '1')
# add admins of the form: # add admins of the form:
# ('Ben Adida', 'ben@adida.net'), # ('Ben Adida', 'ben@adida.net'),
...@@ -212,7 +212,7 @@ HELP_EMAIL_ADDRESS = get_from_env('HELP_EMAIL_ADDRESS', 'help@heliosvoting.org') ...@@ -212,7 +212,7 @@ HELP_EMAIL_ADDRESS = get_from_env('HELP_EMAIL_ADDRESS', 'help@heliosvoting.org')
AUTH_TEMPLATE_BASE = "server_ui/templates/base.html" AUTH_TEMPLATE_BASE = "server_ui/templates/base.html"
HELIOS_TEMPLATE_BASE = "server_ui/templates/base.html" HELIOS_TEMPLATE_BASE = "server_ui/templates/base.html"
HELIOS_ADMIN_ONLY = False HELIOS_ADMIN_ONLY = False
HELIOS_VOTERS_UPLOAD = True HELIOS_VOTERS_UPLOAD = (get_from_env('HELIOS_VOTERS_UPLOAD', 1) == 1)
HELIOS_VOTERS_EMAIL = True HELIOS_VOTERS_EMAIL = True
# are elections private by default? # are elections private by default?
...@@ -221,9 +221,9 @@ HELIOS_PRIVATE_DEFAULT = False ...@@ -221,9 +221,9 @@ HELIOS_PRIVATE_DEFAULT = False
# authentication systems enabled # authentication systems enabled
# AUTH_ENABLED_SYSTEMS = ['password','facebook','twitter', 'google', 'yahoo'] # AUTH_ENABLED_SYSTEMS = ['password','facebook','twitter', 'google', 'yahoo']
AUTH_ENABLED_SYSTEMS = get_from_env('AUTH_ENABLED_SYSTEMS', AUTH_ENABLED_SYSTEMS = get_from_env('AUTH_ENABLED_SYSTEMS',
get_from_env('AUTH_ENABLED_AUTH_SYSTEMS', 'password,google,facebook') get_from_env('AUTH_ENABLED_AUTH_SYSTEMS', 'pirati')
).split(",") ).split(",")
AUTH_DEFAULT_SYSTEM = get_from_env('AUTH_DEFAULT_SYSTEM', get_from_env('AUTH_DEFAULT_AUTH_SYSTEM', None)) AUTH_DEFAULT_SYSTEM = get_from_env('AUTH_DEFAULT_SYSTEM', get_from_env('AUTH_DEFAULT_AUTH_SYSTEM', 'pirati'))
# google # google
GOOGLE_CLIENT_ID = get_from_env('GOOGLE_CLIENT_ID', '') GOOGLE_CLIENT_ID = get_from_env('GOOGLE_CLIENT_ID', '')
...@@ -297,3 +297,14 @@ if ROLLBAR_ACCESS_TOKEN: ...@@ -297,3 +297,14 @@ if ROLLBAR_ACCESS_TOKEN:
'access_token': ROLLBAR_ACCESS_TOKEN, 'access_token': ROLLBAR_ACCESS_TOKEN,
'environment': 'development' if DEBUG else 'production', 'environment': 'development' if DEBUG else 'production',
} }
# auth setup
PIRATI_REALM_URL = get_from_env('PIRATI_REALM_URL', '')
PIRATI_CLIENT_ID = get_from_env('PIRATI_CLIENT_ID', '')
PIRATI_CLIENT_SECRET = get_from_env('PIRATI_CLIENT_SECRET', '')
OCTOPUS_API_URL = get_from_env('OCTOPUS_API_URL', '')
OCTOPUS_API_TOKEN = get_from_env('OCTOPUS_API_TOKEN', '')
if DEBUG:
EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment