Skip to content
Snippets Groups Projects
Commit ec792d02 authored by jan.bednarik's avatar jan.bednarik
Browse files

Use Octopus API instead of Graph API

parent b9f28fff
No related branches found
No related tags found
2 merge requests!14Use Octopus API instead of Graph API,!13Use Octopus API instead of Graph API
Pipeline #7993 passed
......@@ -2,27 +2,163 @@
Pirati Authentication
"""
from django.http import *
import json
from urllib import request
from django.core.mail import send_mail
from django.conf import settings
from urllib.request import urlopen
from requests_oauthlib import OAuth2Session
import json
# some parameters to indicate that status updating is not possible
STATUS_UPDATES = False
# display tweaks
LOGIN_MESSAGE = "Přihlásit se pirátskou identitou"
PIRATI_ENDPOINT_URL = f"{settings.PIRATI_REALM_URL}/protocol/openid-connect/auth"
PIRATI_TOKEN_URL = f"{settings.PIRATI_REALM_URL}/protocol/openid-connect/token"
PIRATI_USERINFO_URL = f"{settings.PIRATI_REALM_URL}/protocol/openid-connect/userinfo"
###############################################################################
# Custom helper functions
def call_octopus_api(query, variables=None):
payload = json.dumps({"query": query, "variables": variables or {}}).encode("utf-8")
req = request.Request(settings.OCTOPUS_API_URL, method="POST")
req.add_header("Content-Type", "application/json; charset=utf-8")
req.add_header("Authorization", f"Bearer {settings.OCTOPUS_API_TOKEN}")
response = request.urlopen(req, payload)
data = json.loads(response.read())
if "errors" in data:
raise RuntimeError(
f"API call failed!\n - query:\n----\n{query}\n----\n - variables:\n----\n{variables}\n----\n- response:\n----\n{data}\n----\n"
)
return data
def get_octopus_person(username):
query = """
query data ($username: String!) {
allPeople (first: 1, filters: {username: {iExact: $username}}) {
edges {
node {
username
displayName
email
}
}
}
}
"""
variables = {"username": username}
data = call_octopus_api(query, variables)
return data["data"]["allPeople"]["edges"][0]["node"]
def get_octopus_groups():
query = """
query {
allGroups (filters: {voting: true}) {
edges {
node {
id
name
}
}
}
}
"""
data = call_octopus_api(query)
return [edge["node"] for edge in data["data"]["allGroups"]["edges"]]
def get_octopus_group(group_id):
query = """
query data ($id: GlobalID!) {
group (id: $id) {
id
name
}
}
"""
variables = {"id": group_id}
data = call_octopus_api(query, variables)
return data["data"]["group"]
def get_octopus_group_members(group_id):
query = """
query data ($id: GlobalID!) {
group (id: $id) {
id
memberships {
person {
username
displayName
email
}
}
}
}
"""
variables = {"id": group_id}
data = call_octopus_api(query, variables)
return [m["person"] for m in data["data"]["group"]["memberships"]]
def group_sorter(group):
name = group["name"]
if name == "Celostátní fórum":
return f"0_{name}"
if name.startswith("KF "):
return f"1_{name[3:]}"
if name.startswith("KS "):
return f"1_{name[3:]}"
if name.startswith("MF "):
return f"2_{name[3:]}"
if name.startswith("MS "):
return f"2_{name[3:]}"
return f"9_{name}"
def is_old_group(group_id):
try:
int(group_id)
return True
except:
return False
def person_to_user_info(person):
return {
"type": "pirati",
"id": person["username"].lower(),
"name": f"{person['displayName']} ({person['username']})",
"info": {"email": person["email"]},
"token": {},
}
def old_member_to_user_info(member):
return {
"type": "pirati",
"id": member["username"].lower(),
"name": member["username"].lower(),
"info": {"email": member["email"]},
"token": {},
}
###############################################################################
# Helios stuff
can_list_category_members = True
def get_auth_url(request, redirect_url):
request.session["pirate_redirect_url"] = redirect_url
oauth = OAuth2Session(settings.PIRATI_CLIENT_ID, redirect_uri=redirect_url)
......@@ -41,38 +177,26 @@ def get_user_info_after_auth(request):
)
response = oauth.get(PIRATI_USERINFO_URL)
data = response.json()
return {
"type": "pirati",
"user_id": data["preferred_username"].lower(),
"name": data["preferred_username"].lower(),
"info": {"email": data["email"]},
"token": {},
}
person = get_octopus_person(data["preferred_username"])
info = person_to_user_info(person)
info["user_id"] = info.pop("id")
return info
def do_logout(user):
"""
logout of Pirate
"""
return None
def update_status(token, message):
"""
simple update
"""
pass
def send_message(user_id, user_name, user_info, subject, body):
"""
send email to pirate user, user_id is combined with the domain to get the email.
"""
send_mail(
subject,
body,
settings.SERVER_EMAIL,
["%s <%s>" % (user_name, user_info["email"])],
["%s <%s>" % (user_info["name"], user_info["email"])],
fail_silently=False,
)
......@@ -86,86 +210,48 @@ def eligibility_category_id(constraint):
def check_constraint(constraint, user):
"""
for eligibility
"""
userinfo = json.load(urlopen("https://graph.pirati.cz/user/" + user.user_id))
if is_old_group(constraint):
userinfo = json.load(
request.urlopen("https://graph.pirati.cz/user/" + user.user_id)
)
id = userinfo["id"]
usergroups = json.load(urlopen("https://graph.pirati.cz/" + id + "/groups"))
usergroups = json.load(
request.urlopen("https://graph.pirati.cz/" + id + "/groups")
)
for usergroup in usergroups:
if usergroup["id"] == constraint:
return True
return False
def can_list_categories():
"""
yep, we can
"""
else:
people = get_octopus_group_members(constraint)
usernames = [person["username"].lower() for person in people]
if user.user_id in usernames:
return True
def filter_groups(groups):
for group in groups:
if group["username"].startswith("@"):
continue
if group["username"].startswith("[archiv]"):
continue
if group["username"].startswith("Schovat forum"):
continue
if group["username"].startswith("Zobrazit"):
continue
if group["username"] == group["username"].upper():
continue
yield group
def group_sorter(group):
name = group["username"].lower()
if name == "celostatni forum":
return f"0_{name}"
if name.startswith("ks - "):
return f"2_{name}"
if name.startswith("ks "):
return f"1_{name}"
if name.startswith("ms "):
return f"3_{name}"
return f"9_{name}"
return False
def list_categories(user):
"""
list groups from the graph api
"""
groups = json.load(urlopen("https://graph.pirati.cz/groups"))
groups = filter_groups(groups)
groups = sorted(groups, key=group_sorter)
return [{"id": group["id"], "name": group["username"]} for group in groups]
def can_list_category_members():
return True
return sorted(get_octopus_groups(), key=group_sorter)
def list_category_members(category_id):
members = json.load(urlopen("https://graph.pirati.cz/" + category_id + "/members"))
users = []
for member in members:
users.append(
{
"type": "pirati",
"id": member["username"].lower(),
"name": member["username"].lower(),
"info": {"email": member["email"]},
"token": {},
}
if is_old_group(category_id):
members = json.load(
request.urlopen("https://graph.pirati.cz/" + category_id + "/members")
)
return users
return [old_member_to_user_info(member) for member in members]
else:
people = get_octopus_group_members(category_id)
return [person_to_user_info(person) for person in people]
def pretty_eligibility(constraint):
group = json.load(urlopen("https://graph.pirati.cz/" + constraint))
return 'Pirate users in "%s" group' % group["username"]
if is_old_group(constraint):
group = json.load(request.urlopen("https://graph.pirati.cz/" + constraint))
name = group["username"]
else:
group = get_octopus_group(constraint)
name = group["name"]
return f"Osoby ve skupině „{name}"
#
......
......@@ -304,3 +304,9 @@ if ROLLBAR_ACCESS_TOKEN:
PIRATI_REALM_URL = get_from_env('PIRATI_REALM_URL', '')
PIRATI_CLIENT_ID = get_from_env('PIRATI_CLIENT_ID', '')
PIRATI_CLIENT_SECRET = get_from_env('PIRATI_CLIENT_SECRET', '')
OCTOPUS_API_URL = get_from_env('OCTOPUS_API_URL', '')
OCTOPUS_API_TOKEN = get_from_env('OCTOPUS_API_TOKEN', '')
if DEBUG:
EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment