From 4fe1bec588aa89f9124f5a00f747e26b4432c340 Mon Sep 17 00:00:00 2001 From: Aitzol Date: Wed, 22 Nov 2023 20:45:37 +0100 Subject: [PATCH] 2fa-0.9 --- app.py | 8 +- ç | 1314 -------------------------------------------------------- 2 files changed, 5 insertions(+), 1317 deletions(-) delete mode 100644 ç diff --git a/app.py b/app.py index 94e440e..9ce6c54 100644 --- a/app.py +++ b/app.py @@ -54,7 +54,6 @@ def get_index(): @get('/user') def get_index(): - print('SESSION:',newSession().get()) try: print('SESSION:',newSession().get()) return user_tpl(data=newSession().get(), str=i18n.str) @@ -162,8 +161,11 @@ def post_user(): LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e)) return error(str(e)) + #data=[form('username'),form('password'),newSession().get()['id'],newSession().get()['authCode']] + #print('N:',newSession().get()['id']) key = cryptocode.encrypt(form('password'), newSession().get()['id']) + #key = cryptocode.encrypt(data, newSession().get()['id']) print(key) key = base64.urlsafe_b64encode(str.encode(key)) print(key) @@ -171,7 +173,7 @@ def post_user(): try: if(check_2fa_step1(form('username'))): print('kk') - logout(form('username')) + #logout(form('username')) return index_tpl(two_factor_authentication=True, key=key, str=i18n.str) except Error as e: LOG.warning("Erabiltzailea ez da aurkitu???") @@ -216,7 +218,7 @@ def post_user_step2(key): print("sid:",newSession().get()['id']) print('pwd:',password) - #logout(newSession().get()['username']) + logout(newSession().get()['username']) def error(msg): return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) diff --git a/ç b/ç deleted file mode 100644 index 7e22485..0000000 --- a/ç +++ /dev/null @@ -1,1314 +0,0 @@ -#!/usr/bin/env python3 - -''' -ldap-python-webui :: LDAP kudeaketarako Web Interfazea - Web UI for LDAP management. -Copyright (C) 2022 Aitzol Berasategi - Wproject - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see . -''' - -import bottle -from bottle import get, post, static_file, request, route, template -from bottle import SimpleTemplate -from bottle.ext import beaker -from configparser import ConfigParser -from ldap3 import Server, Connection, ALL -from ldap3 import SIMPLE, SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE, ALL_ATTRIBUTES -from ldap3.core.exceptions import LDAPBindError, LDAPConstraintViolationResult, \ - LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError, \ - LDAPSocketOpenError, LDAPExceptionError, LDAPAttributeOrValueExistsResult, \ - LDAPNoSuchAttributeResult -import logging -from os import getenv, environ, path, remove -from libs import flist, slist -from libs.localization import * -from libs.helper import tools -import random -from user_agents import parse as ua_parse -from datetime import datetime -import threading -import cryptocode - -BASE_DIR = path.dirname(__file__) -LOG = logging.getLogger(__name__) -LOG_FORMAT = '%(asctime)s %(levelname)s: %(message)s' -VERSION = '0.0.2' - -@get('/') -def get_index(): - try: - ''' - while(newSession().get()['secureAuth'] and not newSession().secure_logged_in): - logout(newSession().get()['username']) - ''' - return user_tpl(data=newSession().get(), str=i18n.str) - except Exception as e: - return index_tpl(str=i18n.str) - -@get('/user') -def get_index(): - try: - print(newSession().get()) - print(newSession().secure_logged_in) - print(newSession().id); - ''' - while(newSession().get()['secureAuth'] and not newSession().secure_logged_in): - logout(newSession().get()['username']) - ''' - return user_tpl(data=newSession().get(), str=i18n.str) - except Exception as e: - return index_tpl(str=i18n.str) - -@get('/signup') -def get_index(): - newSession() - if allowed: - return signup_tpl(str=i18n.str) - else: - return index_tpl(alerts=[('error', i18n.msg[28], 'upDown')], str=i18n.str) - -@get('/change_pwd') -def get_index(): - try: - return change_pwd_tpl(data=newSession().get(), str=i18n.str) - except Exception as e: - return index_tpl(str=i18n.str) - -@get('/edit_fullname') -def get_index(): - try: - return edit_fullname_tpl(data=newSession().get(), str=i18n.str) - except Exception as e: - return index_tpl(str=i18n.str) - -@get('/edit_email') -def get_index(): - try: - return edit_email_tpl(data=newSession().get(), str=i18n.str) - except Exception as e: - return index_tpl(str=i18n.str) - -@get('/logs') -def get_index(): - try: - return logs_tpl(data=newSession().get(), str=i18n.str) - except Exception as e: - return index_tpl(str=i18n.str) - -@get('/delete') -def get_index(): - try: - return delete_tpl(data=newSession().get(), str=i18n.str) - except Exception as e: - return index_tpl(str=i18n.str) - -@get('/logout') -def get_index(): - - def error(msg): - return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) - - try: - username = newSession().get()['username'] - if(username is not None): - logout(username) - except Error as e: - LOG.warning("Unsuccessful attempt to log out: %s" % e) - return error(str(e)) - - return index_tpl(alerts=[('success', i18n.msg[0], 'fadeOut')], str=i18n.str) - -@get('/_2fa') -def get_index(): - #newSession().get() - try: - reload(newSession().get()['username'], None, None) - #add_auth_attribute_step1(newSession().get()['username'], None, None) - return _2fa_tpl(data=newSession().get(), str=i18n.str) - except Exception as e: - return index_tpl(str=i18n.str) - -@get('/enable_2fa') -@get('/disable_2fa') -def get_index(): - try: - return user_tpl(data=newSession().get(), str=i18n.str) - except Exception as e: - return index_tpl(str=i18n.str) - -@post('/auth') -def post_user(): - form = request.forms.getunicode - - def error(msg): - return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) - - if len(form('username')) < 3: - return error(i18n.msg[2]) - elif not tools.input_validation(form('username')): - return error(i18n.msg[3]) - - if not tools.pwd_validation(form('password')): - return error(i18n.msg[21]) - - username = form('username') - password = form('password') - - ''' - try: - if(check_2fa_step1(form('username'))): - print('kk') - #return index_tpl(two_factor_authentication=True, u=(form('username')), p=(form('password')), str=i18n.str) - except Error as e: - LOG.warning("Erabiltzailea ez da aurkitu???") - ''' - - try: - #th = threading.Thread(target=login, args=(form('username'), form('password'),check_2fa_step1(form('username')))) - #th.start() - login(form('username'), form('password')) - except Error as e: - LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e)) - return error(str(e)) - print(newSession().id) - key = cryptocode.encrypt(form('password'), newSession().id) - try: - if(check_2fa_step1(form('username'))): - print('kk') - return index_tpl(two_factor_authentication=True, key=key, str=i18n.str) - except Error as e: - LOG.warning("Erabiltzailea ez da aurkitu???") - #th.start() - ''' - if(not newSession().get()['secureAuth']): - return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) - elif(newSession().get()['secureAuth']): - return index_tpl(two_factor_authentication=True, str=i18n.str) - ''' - return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) - -@post('/user') -def post_user(): - form = request.forms.getunicode - - def error(msg): - return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) - - if len(form('username')) < 3: - return error(i18n.msg[2]) - elif not tools.input_validation(form('username')): - return error(i18n.msg[3]) - - if not tools.pwd_validation(form('password')): - return error(i18n.msg[21]) - - try: - login(form('username'), form('password')) - except Error as e: - LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e)) - return error(str(e)) - ''' - if(not newSession().get()['secureAuth']): - return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) - elif(newSession().get()['secureAuth']): - return index_tpl(two_factor_authentication=True, str=i18n.str) - ''' - return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) -@post('/user_step2/') -def post_user_step2(key): - form = request.forms.getunicode - - secret = newSession().get()['authCode'] - username = newSession().get()['username'] - password = cryptocode.decrypt(key, newSession().id) - - logout(newSession().get()['username']) - - def error(msg): - return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) - - #if not tools._2fa_validation(form('code'), newSession().get()['authCode']): - if not tools._2fa_validation(form('code'), secret): - #logout(newSession().get()['username']) - #logout(username) - return error('Kode okerra. Saio hasierak huts egin du.') - else: - - try: - login(username, password) - except Error as e: - LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e)) - return error(str(e)) - - newSession().secure_logged_in = True - return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], newSession().get()['username']), 'fadeOut' )], data=newSession().get(), str=i18n.str) - -@post('/signup') -def post_signup(): - - #ensure that i18n exists - if 'i18n' not in globals(): - newSession() - - form = request.forms.getunicode - isFake = False - - db = 'data/invite-codes.db' - - def auto_complete(arg): - if arg == 'firstname': - result = random.choice(flist.firstname) - elif arg == 'surname': - result = random.choice(slist.surname) - return(result.capitalize()) - - def error(msg): - return signup_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) - - if not tools.code_is_valid(form('invite_code'), db): - return(error(i18n.msg[6])) - - if len(form('username')) < 3: - return error(i18n.msg[2]) - - username = form('username').lower() - if not tools.input_validation(username): - return error(i18n.msg[3]) - - if len(form('firstname')) == 0: - firstname = auto_complete('firstname') - isFake = True - else: - firstname = form('firstname').title() - - if len(form('surname')) == 0: - surname = auto_complete('surname') - isFake = True - else: - surname = form('surname').title() - - email = form('email').lower() - if not tools.email_validation(email): - return error(i18n.msg[15]) - - if not tools.pwd_validation(form('password')): - return error(i18n.msg[8]) #mezua ALDATU egin behar da - elif form('password') != form('confirm-password'): - return error(i18n.msg[7]) - - try: - account_request(username, firstname, surname, form('password'), email, isFake, get_dev()) - except Error as e: - LOG.warning("Unsuccessful attempt to create an account for %s: %s" % (form('username'), e)) - return error(str(e)) - - try: - tools.mark_code_as_used(form('invite_code'), db) - except Error as e: - LOG.warning("There was a problem verifying the invitation code, please try again later.", e) - return error(str(e)) - - LOG.info("New account successfully created for %s" % form('username')) - - return index_tpl(alerts=[('success', i18n.msg[9], 'fadeOut')], str=i18n.str) - -@post('/edit_fullname') -def post_edit_fullname(): - form = request.forms.getunicode - - try: - username = newSession().get()['username'] - old_firstname = newSession().get()['firstname'] - old_surname = newSession().get()['surname'] - except Error as e: - return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str) - - def error(msg): - return edit_fullname_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str) - - if len(form('firstname')) < 3: - return error(i18n.msg[11]) - elif not tools.input_validation(form('firstname'), True): - return error(i18n.msg[4]) - - if len(form('surname')) < 3: - return error(i18n.msg[12]) - elif not tools.input_validation(form('surname'), True): - return error(i18n.msg[5]) - - try: - edit_fullname(username, old_firstname, old_surname, form('firstname').title(), form('surname').title()) - except Error as e: - LOG.warning("Unsuccessful attempt to edit fullname for %s: %s" % (username, e)) - return error(str(e)) - - return user_tpl(alerts=[('success', i18n.msg[13], 'fadeOut' )], data=newSession().get(), str=i18n.str) - -@post('/edit_email') -def post_edit_email(): - form = request.forms.getunicode - - try: - username = newSession().get()['username'] - old_email = newSession().get()['mail'] - except Error as e: - return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str) - - def error(msg): - return edit_email_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str) - - if not tools.email_validation(form('email')): - return(error(i18n.msg[14])) - - try: - edit_email(username, old_email, form('email').lower()) - except Error as e: - LOG.warning("Unsuccessful attempt to change email address for %s: %s" % (username, e)) - return error(str(e)) - - return user_tpl(alerts=[('success', i18n.msg[16], 'fadeOut' )], data=newSession().get(), str=i18n.str) - -@post('/enable_2fa') -def post_enable_2fa(): - - def error(msg): - return _2fa_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str) - - try: - if(not newSession().get()['secureAuth']): - try: - username=newSession().get()['username'] - add_auth_attribute_step1(username, tools.gen_secret(), action='enable') - except Error as e: - #add_auth_attribute_step1(newSession().get()['username'], None, None) - reload(newSession().get()['username'], None, None) - LOG.warning(e) - return error('2 urratseko autentifikazioa birgaitua izan da.') - except Error as e: - LOG.warning(e) - return index_tpl(alerts=[('error', e, 'fadeOut')], str=i18n.str) - - return _2fa_tpl(alerts=[('success', 'Bikain, 2 urratseko autentifikazioa gaitu da.', 'fadeOut')], data=newSession().get(), str=i18n.str) - -@post('/disable_2fa') -def post_disable_2fa(): - - def error(msg): - return _2fa_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str) - - try: - if(newSession().get()['secureAuth']): - try: - username=newSession().get()['username'] - add_auth_attribute_step1(username, None, action='disable') - except Error as e: - #add_auth_attribute_step1(newSession().get()['username'], None, None) - reload(newSession().get()['username'], None, None) - LOG.warning(e) - return error(str(e)) - except Error as e: - LOG.warning(e) - return index_tpl(alerts=[('error', e, 'fadeOut')], str=i18n.str) - - return _2fa_tpl(alerts=[('error', '2 urratseko autentifikazioa desgaitua izan da.', 'fadeOut')], data=newSession().get(), str=i18n.str) - -@post('/change_pwd') -def post_change_pwd(): - form = request.forms.getunicode - - try: - username=newSession().get()['username'] - except Error as e: - return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str) - - def error(msg): - return change_pwd_tpl(username=username, alerts=[('error', msg, 'fadeOut')], str=i18n.str) - - if not tools.pwd_validation(form('old-password')): - return error(i18n.msg[26]) - elif (not tools.pwd_validation(form('new-password')) or not tools.pwd_validation(form('confirm-password'))): - return error(i18n.msg[8]) #mezua aldatu egin behar da - elif form('new-password') != form('confirm-password'): - return error(i18n.msg[7]) - elif form('old-password') == form('confirm-password'): - return error(i18n.msg[17]) - - try: - change_passwords(username, form('old-password'), form('new-password')) - logout(username) - except Error as e: - LOG.warning("Unsuccessful attempt to change password for %s: %s" % (username, e)) - return error(str(e)) - - LOG.info("Password successfully changed for: %s" % username) - - return index_tpl(alerts=[('success', i18n.msg[18], 'fadeOut')], username=username, str=i18n.str) - -@post('/delete') -def post_delete(): - form = request.forms.getunicode - - def error(msg): - return delete_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) - - try: - username = newSession().get()['username'] - if(tools.input_validation(form('username')) and form('username').lower() == username): - del_user(username) - else: - return(error(i18n.msg[19])) - - except Error as e: - LOG.warning("Unsuccessful attempt to delete the account: %s" % e) - return error(str(e)) - - LOG.info("Account successfully deleted") - - return index_tpl(alerts=[('success', i18n.msg[20], 'fadeOut')], str=i18n.str) - -@route('/static/', name='static') -def serve_static(filename): - return static_file(filename, root=path.join(BASE_DIR, 'static')) - -@get("/static/fonts/") -def font(filepath): - return static_file(filepath, root="static/fonts") - -@get("/static/tmp/") -def font(filepath): - return static_file(filepath, root="static/tmp") - -def index_tpl(**kwargs): - return template('index', **kwargs) - -def user_tpl(**kwargs): - return template('user', **kwargs) - -def signup_tpl(**kwargs): - return template('signup', **kwargs) - -def change_pwd_tpl(**kwargs): - return template('change_pwd', **kwargs) - -def edit_email_tpl(**kwargs): - return template('edit_email', **kwargs) - -def edit_fullname_tpl(**kwargs): - return template('edit_fullname', **kwargs) - -def delete_tpl(**kwargs): - return template('delete', **kwargs) - -def logs_tpl(**kwargs): - return template('logs', **kwargs) - -def _2fa_tpl(**kwargs): - return template('_2fa', **kwargs) - -def connect_ldap(conf, **kwargs): - server = Server(host=conf['host'], - port=conf.getint('port', None), - use_ssl=conf.getboolean('use_ssl', False), - connect_timeout=5) - - return Connection(server, raise_exceptions=True, **kwargs) - -#LOGIN -def login(username, password): - - n = N - for key in (key for key in CONF.sections() - if key == 'ldap' or key.startswith('ldap:')): - - LOG.debug("%s is trying to logging in %s" % (username, key)) - n -= 1 - try: - login_user(CONF[key], username, password) - except Error as e: - if n >=1: - e = [] - continue - else: - raise e - break - -def login_user(conf, *args): - try: - login_user_ldap(conf, *args) - - except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): - raise Error(i18n.msg[21]) - - except LDAPSocketOpenError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - - except LDAPExceptionError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - -def login_user_ldap(conf, username, password): - #set current LDAP - superUser = SuperUsers(conf) - - #with connect_ldap(conf) as c: - with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c: - user_dn = find_user_dn(conf, c, username) - # Note: raises LDAPUserNameIsMandatoryError when user_dn is None. - with connect_ldap(conf, authentication=SIMPLE, user=user_dn, password=password) as c: - c.bind() - if is_trusted_device(conf, user_dn): - newSession().set(get_user_data(user_dn, c)) - #new_session(user_dn, c, conf, lambda: check_2fa_step1()) - #update timestamp + ip address - update_login_info(conf, user_dn) - LOG.debug("%s logged in to %s" % (username, conf['base'])) - #check if exists 2fa qr image - if(newSession().get()['secureAuth']): - tools.gen_qr(newSession().get()['authCode']) - #if(newSession().get()['secureAuth'] and not newSession().secure_logged_in): - #logout(newSession().get()['username']) -''' -def new_session(user_dn, c, conf, two_factor_auth): - while(two_factor_auth): - newSession().set(get_user_data(user_dn, c)) - update_login_info(conf, user_dn) - if(newSession().get()['secureAuth']): - tools.gen_qr(newSession().get()['authCode']) - LOG.debug("%s logged in to %s" % (newSession().get()['username'], conf['base'])) -''' -#LOGOUT -def logout(username): - n = N - for key in (key for key in CONF.sections() - if key == 'ldap' or key.startswith('ldap:')): - - LOG.debug("Logging out %s from %s" % (username, key)) - n -= 1 - try: - logout_user(CONF[key], username) - except Error as e: - if n >=1: - e = [] - continue - else: - raise e - - break - -def logout_user(conf, *args): - try: - logout_user_ldap(conf, *args) - - except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): - raise Error(i18n.msg[21]) - - except LDAPSocketOpenError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - - except LDAPExceptionError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - -def logout_user_ldap(conf, username): - #set current LDAP - superUser = SuperUsers(conf) - - #with connect_ldap(conf) as c: - with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c: - user_dn = find_user_dn(conf, c, username) - c.unbind() - #newSession().close() - newSession().delete() - LOG.info("%s LOGED OUT" % (username)) - -#SIGN UP -def account_request(username, firstname, surname, password, email, isFake, device): - created = [] - for key in (key for key in CONF.sections() - if key == 'ldap' or key.startswith('ldap:')): - - LOG.debug("Creating account for %s on %s server" % (username, key)) - try: - new_user_account(CONF[key], username, firstname, surname, password, email, isFake, device) - created.append(key) - except Error as e: - for key in reversed(created): - LOG.info("Reverting account creation in %s for %s" % (key, username)) - try: - #Akatsen bat gertatzen bada LDAP instantzia guztietan kontua ezabatu - del_account(CONF[key], username) - except Error as e2: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) - raise e - -def new_user_account(conf, *args): - try: - register(conf, *args) - - except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): - raise Error(i18n.msg[21]) - - except LDAPSocketOpenError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[22]) - - except LDAPExceptionError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - -def register(conf, username, firstname, surname, password, email, isFake, device): - - def to_ascii(str): - ascii_str="" - for c in str: - if 0 <= ord(c) <= 127: - ascii_str=ascii_str+c - else: - ascii_str=ascii_str+"X" - return(ascii_str) - - #set current LDAP - superUser = SuperUsers(conf) - - with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: - - try: - if (find_user_dn(conf,c,username) is not None): - raise Error(i18n.msg[24]) - - if (find_email(conf,c,email)): - raise Error(i18n.msg[25]) - - except Error as e: - raise e - - else: - #create new account - uidNumber = find_uid_number(conf,c)+1 - directory = 'home/user/'+to_ascii(username) - OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement'] - ts = datetime.now().strftime('%Y%m%d%H%M%S')+'Z' - attributes = {'gidNumber': '501', 'uidNumber': uidNumber, 'homeDirectory': directory, 'givenName': - firstname, 'sn': surname, 'uid' : username, 'mail': email, 'active': False, 'fakeCn': isFake, - 'devices':device, 'ip':request.environ.get('HTTP_X_REAL_IP', request.remote_addr), 'lastLogin': ts, - 'secureAuth': False} - new_user_dn = "cn="+firstname+" "+surname+" - "+username+",cn=users,"+conf['base'] - c.add(dn=new_user_dn,object_class=OBJECT_CLASS, attributes=attributes) - #create/change user password - c.extend.standard.modify_password(new_user_dn, '', password) - LOG.info("%s has registered on %s" % (username, conf)) - -#EDIT FULLNAME -def edit_fullname(username, old_firstname, old_surname, firstname, surname,): - changed = [] - - for key in (key for key in CONF.sections() - if key == 'ldap' or key.startswith('ldap:')): - - LOG.debug("Changing fullname in %s for %s" % (key, username)) - try: - new_fullname(CONF[key], username, firstname, surname) - changed.append(key) - LOG.debug("%s changed fullname on %s" % (username, key)) - except Error as e: - for key in reversed(changed): - LOG.info("Reverting fullname change in %s for %s" % (key, username)) - try: - new_fullname(CONF[key], username, old_firstname, old_surname) - except Error as e2: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) - raise e - -def new_fullname(conf, *args): - try: - update_fullname(conf, *args) - - except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): - raise Error(i18n.msg[26]) - - except LDAPConstraintViolationResult as e: - # Extract useful part of the error message (for Samba 4 / AD). - msg = e.message.split('check_password_restrictions: ')[-1].capitalize() - raise Error(msg) - - except LDAPSocketOpenError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - - except LDAPExceptionError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - -def update_fullname(conf, username, firstname, surname): - #set current LDAP - superUser = SuperUsers(conf) - - with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: - #with connect_ldap(conf) as c: - user_dn = find_user_dn(conf, c, username) - c.modify(user_dn, {'givenName': [( MODIFY_REPLACE, firstname )], 'sn': [( MODIFY_REPLACE, surname )]}) - - new_cn = "cn="+firstname+" "+ surname+" - "+ username - c.modify_dn(user_dn, new_cn) - new_user_dn = new_cn+",cn=users,"+conf['base'] - - base = ",cn=users," + conf['base'] - fakeFullName = user_dn[3:-len(base)].split(" ") - - if(user_dn == new_user_dn): - raise Error(i18n.msg[10]) - - c.modify(new_user_dn, {'fakeCn': [(MODIFY_REPLACE, 'false' )]}) - newSession().set(get_user_data(new_user_dn, c)) - -#EDIT EMAIL -def edit_email(username, old_email, new_email): - changed = [] - - for key in (key for key in CONF.sections() - if key == 'ldap' or key.startswith('ldap:')): - - LOG.debug("Changing email in %s for %s" % (key, username)) - try: - new_email_address(CONF[key], username, old_email, new_email) - changed.append(key) - LOG.debug("%s changed email address on %s" % (username, key)) - except Error as e: - for key in reversed(changed): - LOG.info("Reverting email change in %s for %s" % (key, username)) - try: - new_email_address(CONF[key], username, new_email, old_email) - except Error as e2: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) - raise e - -def new_email_address(conf, *args): - try: - update_email_address(conf, *args) - - except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): - raise Error(i18n.msg[26]) - - except LDAPConstraintViolationResult as e: - # Extract useful part of the error message (for Samba 4 / AD). - msg = e.message.split('check_password_restrictions: ')[-1].capitalize() - raise Error(msg) - - except LDAPSocketOpenError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - - except LDAPExceptionError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - -def update_email_address(conf, username, old_email, new_email): - if(old_email == new_email): - raise Error(i18n.msg[15]) - - #set current LDAP - superUser = SuperUsers(conf) - - with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: - user_dn = find_user_dn(conf, c, username) - new_email_addresses = get_user_email_array(user_dn, c, old_email, new_email) - c.modify(user_dn, {'mail': [( MODIFY_REPLACE, new_email_addresses )]}) - newSession().set(get_user_data(user_dn, c)) - -# ADD AUTHCODE ATTRIBUTE - 2FA -def add_auth_attribute_step1(username, code, action): - changed = [] - - for key in (key for key in CONF.sections() - if key == 'ldap' or key.startswith('ldap:')): - - LOG.debug("Adding secureAuth attribute %s to %s" % (key, username)) - try: - add_auth_attribute_step2(CONF[key], username, code, action) - changed.append(key) - LOG.debug("%s changed email address on %s" % (username, key)) - except Error as e: - for key in reversed(changed): - LOG.info("Reverting email change in %s for %s" % (key, username)) - try: - new_email_address(CONF[key], username, new_email, old_email) - except Error as e2: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) - raise e - -def add_auth_attribute_step2(conf, *args): - try: - add_auth_attribute_step3(conf, *args) - - except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): - raise Error(i18n.msg[26]) - - except LDAPConstraintViolationResult as e: - # Extract useful part of the error message (for Samba 4 / AD). - msg = e.message.split('check_password_restrictions: ')[-1].capitalize() - raise Error(msg) - - except LDAPSocketOpenError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - - except LDAPNoSuchAttributeResult as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error('Dagoeneko desgaiturik zeneukan 2 urratseko autentifikazioa.') - - except LDAPExceptionError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - -def add_auth_attribute_step3(conf, username, code, action): - - #set current LDAP - superUser = SuperUsers(conf) - - print(action) - with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: - user_dn = find_user_dn(conf, c, username) - if(action == 'enable'): - c.modify(user_dn,{'authCode': [(MODIFY_ADD, [code])]}) - c.modify(user_dn,{'secureAuth': [MODIFY_REPLACE, [True]]}) - elif(action == 'disable'): - c.modify(user_dn,{'authCode': [(MODIFY_DELETE, [])]}) - c.modify(user_dn,{'secureAuth': [MODIFY_REPLACE, [False]]}) - #remove file - try: - remove('static/tmp/'+newSession().get()['authCode']+'.png') - except OSError as e: - LOG.warning(str(e)) - #raise Error(e) - pass - - newSession().set(get_user_data(user_dn, c)) - -reload=add_auth_attribute_step1 - -# CHECK SECUREAUTH -def check_2fa_step1(username): - changed = [] - - for key in (key for key in CONF.sections() - if key == 'ldap' or key.startswith('ldap:')): - - LOG.debug("Changing email in %s for %s" % (key, username)) - try: - return check_2fa_step2(CONF[key], username) - changed.append(key) - LOG.debug("%s changed email address on %s" % (username, key)) - except Error as e: - for key in reversed(changed): - LOG.info("Reverting email change in %s for %s" % (key, username)) - try: - return check_2fa_step2(CONF[key], username) - except Error as e2: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) - raise e - -def check_2fa_step2(conf, *args): - try: - return check_2fa_step3(conf, *args) - - except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): - raise Error(i18n.msg[26]) - - except LDAPConstraintViolationResult as e: - # Extract useful part of the error message (for Samba 4 / AD). - msg = e.message.split('check_password_restrictions: ')[-1].capitalize() - raise Error(msg) - - except LDAPSocketOpenError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - - except LDAPExceptionError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - -def check_2fa_step3(conf, username): - #set current LDAP - superUser = SuperUsers(conf) - - with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: - user_dn = find_user_dn(conf, c, username) - secure_auth_status = check_secure_auth(user_dn, c) - print(secure_auth_status) - return(secure_auth_status) - #c.modify(user_dn, {'mail': [( MODIFY_REPLACE, new_email_addresses )]}) - #newSession().set(get_user_data(user_dn, c)) - -#CHANGE PASSWORD -def change_passwords(username, old_pass, new_pass): - changed = [] - - for key in (key for key in CONF.sections() - if key == 'ldap' or key.startswith('ldap:')): - - LOG.debug("Changing password in %s for %s" % (key, username)) - try: - change_password(CONF[key], username, old_pass, new_pass) - changed.append(key) - LOG.debug("%s changed pwd on %s" % (username, key)) - except Error as e: - for key in reversed(changed): - LOG.info("Reverting password change in %s for %s" % (key, username)) - try: - change_password(CONF[key], username, new_pass, old_pass) - except Error as e2: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) - raise e - -def change_password(conf, *args): - try: - if conf.get('type') == 'ad': - change_password_ad(conf, *args) - else: - change_password_ldap(conf, *args) - - except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): - raise Error(i18n.msg[26]) - - except LDAPConstraintViolationResult as e: - # Extract useful part of the error message (for Samba 4 / AD). - msg = e.message.split('check_password_restrictions: ')[-1].capitalize() - raise Error(msg) - - except LDAPSocketOpenError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - - except LDAPExceptionError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - - -def change_password_ldap(conf, username, old_pass, new_pass): - #set current LDAP - superUser = SuperUsers(conf) - - with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c: - #with connect_ldap(conf) as c: - user_dn = find_user_dn(conf, c, username) - - # Note: raises LDAPUserNameIsMandatoryError when user_dn is None. - with connect_ldap(conf, authentication=SIMPLE, user=user_dn, password=old_pass) as c: - c.bind() - c.extend.standard.modify_password(user_dn, old_pass, new_pass) - -def change_password_ad(conf, username, old_pass, new_pass): - user = username + '@' + conf['ad_domain'] - - with connect_ldap(conf, authentication=SIMPLE, user=user, password=old_pass) as c: - c.bind() - user_dn = find_user_dn(conf, c, username) - c.extend.microsoft.modify_password(user_dn, new_pass, old_pass) - -#DELETE ACCOUNT -def del_user(username): - n = N - for key in (key for key in CONF.sections() - if key == 'ldap' or key.startswith('ldap:')): - LOG.debug("Deleting account for %s from %s" % (username, key)) - n -= 1 - try: - del_account(CONF[key], username) - LOG.debug("Account for %s deleted on -> %s" % (username, CONF[key])) - if(n == 0 and newSession().get()['username'] is not None): - newSession().delete() - - except Error as e: - raise e - -def del_account(conf, *args): - try: - delete(conf, *args) - - except LDAPSocketOpenError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - - except LDAPExceptionError as e: - LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) - raise Error(i18n.msg[23]) - -def delete(conf, username): - #set current LDAP - superUser = SuperUsers(conf) - - with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: - try: - user_dn = find_user_dn(conf, c, username) - c.delete(user_dn) - - except Error as e: - raise e - -#AUXILIARY FUNCTIONS -#find user -def find_user_dn(conf, conn, uid): - search_filter = conf['search_filter'].replace('{uid}', uid) - conn.search(conf['base'], "(%s)" % search_filter, SUBTREE) - - return conn.response[0]['dn'] if conn.response else None - -#find email -def find_email(conf, conn, email): - search_filter = '(uid=*)' - if conn.search(conf['base'], search_filter, attributes=['mail']): - for i in conn.response: - for j in i['attributes']['mail']: - if(j == email): - return True - - return False - -#find device -def find_device(user_dn, conn, device): - search_filter = '(objectClass=*)' - if conn.search(user_dn, search_filter, attributes=['devices']): - for i in conn.response: - for j in i['attributes']['devices']: - if(j == device): - return True - - return False - -#find highest uidNumber -def find_uid_number(conf, conn): - search_filter = '(uid=*)' - if conn.search(conf['base'], search_filter, attributes=['uidNumber']): - - uidNumbersList=[] - for i in conn.response: - uidNumbersList.append(i['attributes']['uidNumber']) - - uidNumbersList = list(filter(lambda i: type(i) is int, uidNumbersList)) - return max(uidNumbersList) - - else: - return(999) - -def get_user_email_array(user_dn, conn, old_email, new_email): - search_filter = '(objectClass=*)' - conn.search(user_dn, search_filter, attributes=['mail']) - emails = conn.entries[0].mail.values - for i in range(len(emails)): - if(emails[i] == old_email): - emails[i] = new_email - return(emails) - -def check_secure_auth(user_dn, conn): - search_filter = '(objectClass=*)' - conn.search(user_dn, search_filter, attributes=['secureAuth']) - status = conn.entries[0].secureAuth - return(status) - -def get_user_data(user_dn, conn): - search_filter = '(objectClass=*)' - conn.search(user_dn, search_filter, - attributes=['active','fakeCn','givenName','sn','uid','mail','devices','ip','lastLogin','secureAuth', - 'authCode']) - data = [] - data.append(conn.entries[0].active.values[0]) - data.append(conn.entries[0].fakeCn.values[0]) - data.append(conn.entries[0].givenName.values[0]) - data.append(conn.entries[0].sn.values[0]) - data.append(conn.entries[0].uid.values[0]) - data.append(conn.entries[0].mail.values[0]) - data.append(conn.entries[0].devices.values) - data.append(conn.entries[0].ip.values[0]) - #ts = conn.entries[0].lastLogin.values[0] - #ts = datetime.strptime(ts, '%Y-%m-%d %H:%M:%S%z') - #ts = datetime.strftime(t, '%Y-%m-%d %H:%M:%S') - data.append(str(conn.entries[0].lastLogin.values[0])[:-6]) - data.append(conn.entries[0].secureAuth.values[0]) - if(conn.entries[0].authCode): - data.append(conn.entries[0].authCode.values[0]) - - return(data) - -def read_config(): - config = ConfigParser() - config.read([path.join(BASE_DIR, 'settings.ini'), getenv('CONF_FILE', '')]) - return config - -CONF = read_config() - -def ldaps_count(): - keys = [] - for i, key in enumerate(CONF.sections()): - if key == 'ldap' or key.startswith('ldap:'): - keys.append(key) - #n=len(keys) - return(len(keys)) - -N = ldaps_count() - -def reg(): - if CONF['general']['allow_registration'] == 'True': - return True - -allowed = reg() - -def get_dev(): - ua_string = bottle.request.environ.get('HTTP_USER_AGENT') - user_agent = ua_parse(ua_string) - return str(user_agent) - -def is_trusted_device(conf, user_dn): - superUser = SuperUsers(conf) - with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: - d = get_dev() - try: - if not find_device(user_dn, c, d): - OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement'] - c.modify(user_dn, {'devices': [( MODIFY_ADD, d )] }) - ''' - if find_device(user_dn, c, 'unknown'): - OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement'] - c.modify(user_dn, {'devices': [( MODIFY_REPLACE, d )] }) - else: - OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement'] - c.modify(user_dn, {'devices': [( MODIFY_ADD, d )] }) - ''' - c.unbind() - return True - except Exception as e: - print(e) - return True - -def update_login_info(conf, user_dn): - superUser = SuperUsers(conf) - with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: - ip = request.environ.get('HTTP_X_REAL_IP', request.remote_addr) - ts = datetime.now().strftime('%Y%m%d%H%M%S')+'Z' - c.modify(user_dn, {'ip': [( MODIFY_REPLACE, str(ip) )], 'lastLogin': [( MODIFY_REPLACE, ts )] }) - c.unbind() - -class Error(Exception): - pass - -#SESSIONS MANAGEMENT -def newSession(): - - class Session(object): - """docstring for Session""" - def __init__(self): - super(Session, self).__init__() - self.data = bottle.request.environ.get('beaker.session') - self.id = tools.session_id() - self.secure_logged_in = False - self.lang = self.get_lang() - #localization - self.lang = self.get_lang() - global i18n - i18n = LocalizeTo(self.lang, CONF) - - def get_lang(self): - if 'HTTP_ACCEPT_LANGUAGE' in bottle.request.environ: - lang = bottle.request.get('HTTP_ACCEPT_LANGUAGE') - return str(lang[:2]) - else: - return CONF['locale']['lang'] - - def get(self): - if 'username' in self.data: - return(self.data) - else: - raise Error(i18n.msg[27]) - - def set(self, data): - self.active = data[0] - self.fakeCn = data[1] - self.firstname = data[2] - self.surname = data[3] - self.username = data[4] - self.mail = data[5] - self.devices = data[6] - self.ip = data[7] - self.lastLogin = data[8] - self.secureAuth = data[9] - try: - self.authCode = data[10] - except: - self.authCode = None - - self.data['active'] = self.active - self.data['fakeCn'] = self.fakeCn - self.data['firstname'] = self.firstname - self.data['surname'] = self.surname - self.data['username'] = self.username - self.data['mail'] = self.mail - self.data['devices'] = self.devices - self.data['ip'] = self.ip - self.data['lastLogin'] = self.lastLogin - self.data['secureAuth'] = self.secureAuth - self.data['authCode'] = self.authCode - - def close(self): - self.data.pop('username') - - def delete(self): - self.data.delete() - - s=Session() - return s - -if environ.get('DEBUG'): - bottle.debug(True) - -# Set up logging. -logging.basicConfig(format=LOG_FORMAT) -LOG.setLevel(logging.INFO) -LOG.info("Starting ldap-python-webui %s" % VERSION) - -session_opts = { - 'session.type': CONF['session']['type'], - 'session.cookie_expires': CONF['session']['expire'], - 'session.data_dir': CONF['session']['data_dir'], - 'session.auto': CONF['session']['auto'] -} - -class SuperUsers(object): - """docstring for Session""" - def __init__(self, conf): - super(SuperUsers, self).__init__() - self.domain=conf['base'][conf['base'].find(","):] - self.admin_dn="cn=admin"+self.domain - self.admin_pwd=environ['LDAP_ADMIN_PASSWORD'] - self.readonly_dn="cn=readonly"+self.domain - self.readonly_pwd=environ['LDAP_READONLY_PASSWORD'] - -superUser = SuperUsers(CONF['ldap:0']) - -app = beaker.middleware.SessionMiddleware(bottle.app(), session_opts) - -bottle.TEMPLATE_PATH = [BASE_DIR] - -# Set default attributes to pass into templates. -#SimpleTemplate.defaults = dict(CONF['html']) -SimpleTemplate.defaults['url'] = bottle.url - - -# Run bottle internal server when invoked directly (mainly for development). -if __name__ == '__main__': - bottle.run(app, **CONF['server']) -# Run bottle in application mode (in production under uWSGI server). -else: - #application = bottle.default_app() - application = app