#!/usr/bin/env python3 ''' ldap-python-webui :: LDAP kudeaketarako Web Interfazea - Web UI for LDAP management. Copyright (C) 2022 Aitzol Berasategi - Wproject This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . ''' import bottle from bottle import get, post, static_file, request, route, template from bottle import SimpleTemplate from bottle.ext import beaker from configparser import ConfigParser from ldap3 import Server, Connection, ALL from ldap3 import SIMPLE, SUBTREE, MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE, ALL_ATTRIBUTES from ldap3.core.exceptions import LDAPBindError, LDAPConstraintViolationResult, \ LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError, \ LDAPSocketOpenError, LDAPExceptionError, LDAPAttributeOrValueExistsResult, \ LDAPNoSuchAttributeResult import logging from os import getenv, environ, path, remove from libs import flist, slist from libs.localization import * from libs.helper import tools import random from user_agents import parse as ua_parse from datetime import datetime import threading import cryptocode BASE_DIR = path.dirname(__file__) LOG = logging.getLogger(__name__) LOG_FORMAT = '%(asctime)s %(levelname)s: %(message)s' VERSION = '0.0.2' @get('/') def get_index(): try: ''' while(newSession().get()['secureAuth'] and not newSession().secure_logged_in): logout(newSession().get()['username']) ''' return user_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @get('/user') def get_index(): try: print(newSession().get()) print(newSession().secure_logged_in) print(newSession().id); ''' while(newSession().get()['secureAuth'] and not newSession().secure_logged_in): logout(newSession().get()['username']) ''' return user_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @get('/signup') def get_index(): newSession() if allowed: return signup_tpl(str=i18n.str) else: return index_tpl(alerts=[('error', i18n.msg[28], 'upDown')], str=i18n.str) @get('/change_pwd') def get_index(): try: return change_pwd_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @get('/edit_fullname') def get_index(): try: return edit_fullname_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @get('/edit_email') def get_index(): try: return edit_email_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @get('/logs') def get_index(): try: return logs_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @get('/delete') def get_index(): try: return delete_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @get('/logout') def get_index(): def error(msg): return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) try: username = newSession().get()['username'] if(username is not None): logout(username) except Error as e: LOG.warning("Unsuccessful attempt to log out: %s" % e) return error(str(e)) return index_tpl(alerts=[('success', i18n.msg[0], 'fadeOut')], str=i18n.str) @get('/_2fa') def get_index(): #newSession().get() try: reload(newSession().get()['username'], None, None) #add_auth_attribute_step1(newSession().get()['username'], None, None) return _2fa_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @get('/enable_2fa') @get('/disable_2fa') def get_index(): try: return user_tpl(data=newSession().get(), str=i18n.str) except Exception as e: return index_tpl(str=i18n.str) @post('/auth') def post_user(): form = request.forms.getunicode def error(msg): return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) if len(form('username')) < 3: return error(i18n.msg[2]) elif not tools.input_validation(form('username')): return error(i18n.msg[3]) if not tools.pwd_validation(form('password')): return error(i18n.msg[21]) username = form('username') password = form('password') ''' try: if(check_2fa_step1(form('username'))): print('kk') #return index_tpl(two_factor_authentication=True, u=(form('username')), p=(form('password')), str=i18n.str) except Error as e: LOG.warning("Erabiltzailea ez da aurkitu???") ''' try: #th = threading.Thread(target=login, args=(form('username'), form('password'),check_2fa_step1(form('username')))) #th.start() login(form('username'), form('password')) except Error as e: LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e)) return error(str(e)) print(newSession().id) key = cryptocode.encrypt(form('password'), newSession().id) try: if(check_2fa_step1(form('username'))): print('kk') return index_tpl(two_factor_authentication=True, key=key, str=i18n.str) except Error as e: LOG.warning("Erabiltzailea ez da aurkitu???") #th.start() ''' if(not newSession().get()['secureAuth']): return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) elif(newSession().get()['secureAuth']): return index_tpl(two_factor_authentication=True, str=i18n.str) ''' return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) @post('/user') def post_user(): form = request.forms.getunicode def error(msg): return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) if len(form('username')) < 3: return error(i18n.msg[2]) elif not tools.input_validation(form('username')): return error(i18n.msg[3]) if not tools.pwd_validation(form('password')): return error(i18n.msg[21]) try: login(form('username'), form('password')) except Error as e: LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e)) return error(str(e)) ''' if(not newSession().get()['secureAuth']): return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) elif(newSession().get()['secureAuth']): return index_tpl(two_factor_authentication=True, str=i18n.str) ''' return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], form('username').capitalize()), 'fadeOut' )], data=newSession().get(), str=i18n.str) @post('/user_step2/') def post_user_step2(key): form = request.forms.getunicode secret = newSession().get()['authCode'] username = newSession().get()['username'] password = cryptocode.decrypt(key, newSession().id) logout(newSession().get()['username']) def error(msg): return index_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) #if not tools._2fa_validation(form('code'), newSession().get()['authCode']): if not tools._2fa_validation(form('code'), secret): #logout(newSession().get()['username']) #logout(username) return error('Kode okerra. Saio hasierak huts egin du.') else: try: login(username, password) except Error as e: LOG.warning("Unsuccessful attempt to login %s: %s" % (form('username'), e)) return error(str(e)) newSession().secure_logged_in = True return user_tpl(alerts=[('success', '%s %s' % (i18n.msg[1], newSession().get()['username']), 'fadeOut' )], data=newSession().get(), str=i18n.str) @post('/signup') def post_signup(): #ensure that i18n exists if 'i18n' not in globals(): newSession() form = request.forms.getunicode isFake = False db = 'data/invite-codes.db' def auto_complete(arg): if arg == 'firstname': result = random.choice(flist.firstname) elif arg == 'surname': result = random.choice(slist.surname) return(result.capitalize()) def error(msg): return signup_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) if not tools.code_is_valid(form('invite_code'), db): return(error(i18n.msg[6])) if len(form('username')) < 3: return error(i18n.msg[2]) username = form('username').lower() if not tools.input_validation(username): return error(i18n.msg[3]) if len(form('firstname')) == 0: firstname = auto_complete('firstname') isFake = True else: firstname = form('firstname').title() if len(form('surname')) == 0: surname = auto_complete('surname') isFake = True else: surname = form('surname').title() email = form('email').lower() if not tools.email_validation(email): return error(i18n.msg[15]) if not tools.pwd_validation(form('password')): return error(i18n.msg[8]) #mezua ALDATU egin behar da elif form('password') != form('confirm-password'): return error(i18n.msg[7]) try: account_request(username, firstname, surname, form('password'), email, isFake, get_dev()) except Error as e: LOG.warning("Unsuccessful attempt to create an account for %s: %s" % (form('username'), e)) return error(str(e)) try: tools.mark_code_as_used(form('invite_code'), db) except Error as e: LOG.warning("There was a problem verifying the invitation code, please try again later.", e) return error(str(e)) LOG.info("New account successfully created for %s" % form('username')) return index_tpl(alerts=[('success', i18n.msg[9], 'fadeOut')], str=i18n.str) @post('/edit_fullname') def post_edit_fullname(): form = request.forms.getunicode try: username = newSession().get()['username'] old_firstname = newSession().get()['firstname'] old_surname = newSession().get()['surname'] except Error as e: return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str) def error(msg): return edit_fullname_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str) if len(form('firstname')) < 3: return error(i18n.msg[11]) elif not tools.input_validation(form('firstname'), True): return error(i18n.msg[4]) if len(form('surname')) < 3: return error(i18n.msg[12]) elif not tools.input_validation(form('surname'), True): return error(i18n.msg[5]) try: edit_fullname(username, old_firstname, old_surname, form('firstname').title(), form('surname').title()) except Error as e: LOG.warning("Unsuccessful attempt to edit fullname for %s: %s" % (username, e)) return error(str(e)) return user_tpl(alerts=[('success', i18n.msg[13], 'fadeOut' )], data=newSession().get(), str=i18n.str) @post('/edit_email') def post_edit_email(): form = request.forms.getunicode try: username = newSession().get()['username'] old_email = newSession().get()['mail'] except Error as e: return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str) def error(msg): return edit_email_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str) if not tools.email_validation(form('email')): return(error(i18n.msg[14])) try: edit_email(username, old_email, form('email').lower()) except Error as e: LOG.warning("Unsuccessful attempt to change email address for %s: %s" % (username, e)) return error(str(e)) return user_tpl(alerts=[('success', i18n.msg[16], 'fadeOut' )], data=newSession().get(), str=i18n.str) @post('/enable_2fa') def post_enable_2fa(): def error(msg): return _2fa_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str) try: if(not newSession().get()['secureAuth']): try: username=newSession().get()['username'] add_auth_attribute_step1(username, tools.gen_secret(), action='enable') except Error as e: #add_auth_attribute_step1(newSession().get()['username'], None, None) reload(newSession().get()['username'], None, None) LOG.warning(e) return error('2 urratseko autentifikazioa birgaitua izan da.') except Error as e: LOG.warning(e) return index_tpl(alerts=[('error', e, 'fadeOut')], str=i18n.str) return _2fa_tpl(alerts=[('success', 'Bikain, 2 urratseko autentifikazioa gaitu da.', 'fadeOut')], data=newSession().get(), str=i18n.str) @post('/disable_2fa') def post_disable_2fa(): def error(msg): return _2fa_tpl(alerts=[('error', msg, 'fadeOut')], data=newSession().get(), str=i18n.str) try: if(newSession().get()['secureAuth']): try: username=newSession().get()['username'] add_auth_attribute_step1(username, None, action='disable') except Error as e: #add_auth_attribute_step1(newSession().get()['username'], None, None) reload(newSession().get()['username'], None, None) LOG.warning(e) return error(str(e)) except Error as e: LOG.warning(e) return index_tpl(alerts=[('error', e, 'fadeOut')], str=i18n.str) return _2fa_tpl(alerts=[('error', '2 urratseko autentifikazioa desgaitua izan da.', 'fadeOut')], data=newSession().get(), str=i18n.str) @post('/change_pwd') def post_change_pwd(): form = request.forms.getunicode try: username=newSession().get()['username'] except Error as e: return index_tpl(alerts=[('error', str(e), 'fadeOut')], str=i18n.str) def error(msg): return change_pwd_tpl(username=username, alerts=[('error', msg, 'fadeOut')], str=i18n.str) if not tools.pwd_validation(form('old-password')): return error(i18n.msg[26]) elif (not tools.pwd_validation(form('new-password')) or not tools.pwd_validation(form('confirm-password'))): return error(i18n.msg[8]) #mezua aldatu egin behar da elif form('new-password') != form('confirm-password'): return error(i18n.msg[7]) elif form('old-password') == form('confirm-password'): return error(i18n.msg[17]) try: change_passwords(username, form('old-password'), form('new-password')) logout(username) except Error as e: LOG.warning("Unsuccessful attempt to change password for %s: %s" % (username, e)) return error(str(e)) LOG.info("Password successfully changed for: %s" % username) return index_tpl(alerts=[('success', i18n.msg[18], 'fadeOut')], username=username, str=i18n.str) @post('/delete') def post_delete(): form = request.forms.getunicode def error(msg): return delete_tpl(alerts=[('error', msg, 'fadeOut')], str=i18n.str) try: username = newSession().get()['username'] if(tools.input_validation(form('username')) and form('username').lower() == username): del_user(username) else: return(error(i18n.msg[19])) except Error as e: LOG.warning("Unsuccessful attempt to delete the account: %s" % e) return error(str(e)) LOG.info("Account successfully deleted") return index_tpl(alerts=[('success', i18n.msg[20], 'fadeOut')], str=i18n.str) @route('/static/', name='static') def serve_static(filename): return static_file(filename, root=path.join(BASE_DIR, 'static')) @get("/static/fonts/") def font(filepath): return static_file(filepath, root="static/fonts") @get("/static/tmp/") def font(filepath): return static_file(filepath, root="static/tmp") def index_tpl(**kwargs): return template('index', **kwargs) def user_tpl(**kwargs): return template('user', **kwargs) def signup_tpl(**kwargs): return template('signup', **kwargs) def change_pwd_tpl(**kwargs): return template('change_pwd', **kwargs) def edit_email_tpl(**kwargs): return template('edit_email', **kwargs) def edit_fullname_tpl(**kwargs): return template('edit_fullname', **kwargs) def delete_tpl(**kwargs): return template('delete', **kwargs) def logs_tpl(**kwargs): return template('logs', **kwargs) def _2fa_tpl(**kwargs): return template('_2fa', **kwargs) def connect_ldap(conf, **kwargs): server = Server(host=conf['host'], port=conf.getint('port', None), use_ssl=conf.getboolean('use_ssl', False), connect_timeout=5) return Connection(server, raise_exceptions=True, **kwargs) #LOGIN def login(username, password): n = N for key in (key for key in CONF.sections() if key == 'ldap' or key.startswith('ldap:')): LOG.debug("%s is trying to logging in %s" % (username, key)) n -= 1 try: login_user(CONF[key], username, password) except Error as e: if n >=1: e = [] continue else: raise e break def login_user(conf, *args): try: login_user_ldap(conf, *args) except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): raise Error(i18n.msg[21]) except LDAPSocketOpenError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) except LDAPExceptionError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) def login_user_ldap(conf, username, password): #set current LDAP superUser = SuperUsers(conf) #with connect_ldap(conf) as c: with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c: user_dn = find_user_dn(conf, c, username) # Note: raises LDAPUserNameIsMandatoryError when user_dn is None. with connect_ldap(conf, authentication=SIMPLE, user=user_dn, password=password) as c: c.bind() if is_trusted_device(conf, user_dn): newSession().set(get_user_data(user_dn, c)) #new_session(user_dn, c, conf, lambda: check_2fa_step1()) #update timestamp + ip address update_login_info(conf, user_dn) LOG.debug("%s logged in to %s" % (username, conf['base'])) #check if exists 2fa qr image if(newSession().get()['secureAuth']): tools.gen_qr(newSession().get()['authCode']) #if(newSession().get()['secureAuth'] and not newSession().secure_logged_in): #logout(newSession().get()['username']) ''' def new_session(user_dn, c, conf, two_factor_auth): while(two_factor_auth): newSession().set(get_user_data(user_dn, c)) update_login_info(conf, user_dn) if(newSession().get()['secureAuth']): tools.gen_qr(newSession().get()['authCode']) LOG.debug("%s logged in to %s" % (newSession().get()['username'], conf['base'])) ''' #LOGOUT def logout(username): n = N for key in (key for key in CONF.sections() if key == 'ldap' or key.startswith('ldap:')): LOG.debug("Logging out %s from %s" % (username, key)) n -= 1 try: logout_user(CONF[key], username) except Error as e: if n >=1: e = [] continue else: raise e break def logout_user(conf, *args): try: logout_user_ldap(conf, *args) except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): raise Error(i18n.msg[21]) except LDAPSocketOpenError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) except LDAPExceptionError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) def logout_user_ldap(conf, username): #set current LDAP superUser = SuperUsers(conf) #with connect_ldap(conf) as c: with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c: user_dn = find_user_dn(conf, c, username) c.unbind() #newSession().close() newSession().delete() LOG.info("%s LOGED OUT" % (username)) #SIGN UP def account_request(username, firstname, surname, password, email, isFake, device): created = [] for key in (key for key in CONF.sections() if key == 'ldap' or key.startswith('ldap:')): LOG.debug("Creating account for %s on %s server" % (username, key)) try: new_user_account(CONF[key], username, firstname, surname, password, email, isFake, device) created.append(key) except Error as e: for key in reversed(created): LOG.info("Reverting account creation in %s for %s" % (key, username)) try: #Akatsen bat gertatzen bada LDAP instantzia guztietan kontua ezabatu del_account(CONF[key], username) except Error as e2: LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) raise e def new_user_account(conf, *args): try: register(conf, *args) except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): raise Error(i18n.msg[21]) except LDAPSocketOpenError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[22]) except LDAPExceptionError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) def register(conf, username, firstname, surname, password, email, isFake, device): def to_ascii(str): ascii_str="" for c in str: if 0 <= ord(c) <= 127: ascii_str=ascii_str+c else: ascii_str=ascii_str+"X" return(ascii_str) #set current LDAP superUser = SuperUsers(conf) with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: try: if (find_user_dn(conf,c,username) is not None): raise Error(i18n.msg[24]) if (find_email(conf,c,email)): raise Error(i18n.msg[25]) except Error as e: raise e else: #create new account uidNumber = find_uid_number(conf,c)+1 directory = 'home/user/'+to_ascii(username) OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement'] ts = datetime.now().strftime('%Y%m%d%H%M%S')+'Z' attributes = {'gidNumber': '501', 'uidNumber': uidNumber, 'homeDirectory': directory, 'givenName': firstname, 'sn': surname, 'uid' : username, 'mail': email, 'active': False, 'fakeCn': isFake, 'devices':device, 'ip':request.environ.get('HTTP_X_REAL_IP', request.remote_addr), 'lastLogin': ts, 'secureAuth': False} new_user_dn = "cn="+firstname+" "+surname+" - "+username+",cn=users,"+conf['base'] c.add(dn=new_user_dn,object_class=OBJECT_CLASS, attributes=attributes) #create/change user password c.extend.standard.modify_password(new_user_dn, '', password) LOG.info("%s has registered on %s" % (username, conf)) #EDIT FULLNAME def edit_fullname(username, old_firstname, old_surname, firstname, surname,): changed = [] for key in (key for key in CONF.sections() if key == 'ldap' or key.startswith('ldap:')): LOG.debug("Changing fullname in %s for %s" % (key, username)) try: new_fullname(CONF[key], username, firstname, surname) changed.append(key) LOG.debug("%s changed fullname on %s" % (username, key)) except Error as e: for key in reversed(changed): LOG.info("Reverting fullname change in %s for %s" % (key, username)) try: new_fullname(CONF[key], username, old_firstname, old_surname) except Error as e2: LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) raise e def new_fullname(conf, *args): try: update_fullname(conf, *args) except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): raise Error(i18n.msg[26]) except LDAPConstraintViolationResult as e: # Extract useful part of the error message (for Samba 4 / AD). msg = e.message.split('check_password_restrictions: ')[-1].capitalize() raise Error(msg) except LDAPSocketOpenError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) except LDAPExceptionError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) def update_fullname(conf, username, firstname, surname): #set current LDAP superUser = SuperUsers(conf) with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: #with connect_ldap(conf) as c: user_dn = find_user_dn(conf, c, username) c.modify(user_dn, {'givenName': [( MODIFY_REPLACE, firstname )], 'sn': [( MODIFY_REPLACE, surname )]}) new_cn = "cn="+firstname+" "+ surname+" - "+ username c.modify_dn(user_dn, new_cn) new_user_dn = new_cn+",cn=users,"+conf['base'] base = ",cn=users," + conf['base'] fakeFullName = user_dn[3:-len(base)].split(" ") if(user_dn == new_user_dn): raise Error(i18n.msg[10]) c.modify(new_user_dn, {'fakeCn': [(MODIFY_REPLACE, 'false' )]}) newSession().set(get_user_data(new_user_dn, c)) #EDIT EMAIL def edit_email(username, old_email, new_email): changed = [] for key in (key for key in CONF.sections() if key == 'ldap' or key.startswith('ldap:')): LOG.debug("Changing email in %s for %s" % (key, username)) try: new_email_address(CONF[key], username, old_email, new_email) changed.append(key) LOG.debug("%s changed email address on %s" % (username, key)) except Error as e: for key in reversed(changed): LOG.info("Reverting email change in %s for %s" % (key, username)) try: new_email_address(CONF[key], username, new_email, old_email) except Error as e2: LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) raise e def new_email_address(conf, *args): try: update_email_address(conf, *args) except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): raise Error(i18n.msg[26]) except LDAPConstraintViolationResult as e: # Extract useful part of the error message (for Samba 4 / AD). msg = e.message.split('check_password_restrictions: ')[-1].capitalize() raise Error(msg) except LDAPSocketOpenError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) except LDAPExceptionError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) def update_email_address(conf, username, old_email, new_email): if(old_email == new_email): raise Error(i18n.msg[15]) #set current LDAP superUser = SuperUsers(conf) with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: user_dn = find_user_dn(conf, c, username) new_email_addresses = get_user_email_array(user_dn, c, old_email, new_email) c.modify(user_dn, {'mail': [( MODIFY_REPLACE, new_email_addresses )]}) newSession().set(get_user_data(user_dn, c)) # ADD AUTHCODE ATTRIBUTE - 2FA def add_auth_attribute_step1(username, code, action): changed = [] for key in (key for key in CONF.sections() if key == 'ldap' or key.startswith('ldap:')): LOG.debug("Adding secureAuth attribute %s to %s" % (key, username)) try: add_auth_attribute_step2(CONF[key], username, code, action) changed.append(key) LOG.debug("%s changed email address on %s" % (username, key)) except Error as e: for key in reversed(changed): LOG.info("Reverting email change in %s for %s" % (key, username)) try: new_email_address(CONF[key], username, new_email, old_email) except Error as e2: LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) raise e def add_auth_attribute_step2(conf, *args): try: add_auth_attribute_step3(conf, *args) except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): raise Error(i18n.msg[26]) except LDAPConstraintViolationResult as e: # Extract useful part of the error message (for Samba 4 / AD). msg = e.message.split('check_password_restrictions: ')[-1].capitalize() raise Error(msg) except LDAPSocketOpenError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) except LDAPNoSuchAttributeResult as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error('Dagoeneko desgaiturik zeneukan 2 urratseko autentifikazioa.') except LDAPExceptionError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) def add_auth_attribute_step3(conf, username, code, action): #set current LDAP superUser = SuperUsers(conf) print(action) with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: user_dn = find_user_dn(conf, c, username) if(action == 'enable'): c.modify(user_dn,{'authCode': [(MODIFY_ADD, [code])]}) c.modify(user_dn,{'secureAuth': [MODIFY_REPLACE, [True]]}) elif(action == 'disable'): c.modify(user_dn,{'authCode': [(MODIFY_DELETE, [])]}) c.modify(user_dn,{'secureAuth': [MODIFY_REPLACE, [False]]}) #remove file try: remove('static/tmp/'+newSession().get()['authCode']+'.png') except OSError as e: LOG.warning(str(e)) #raise Error(e) pass newSession().set(get_user_data(user_dn, c)) reload=add_auth_attribute_step1 # CHECK SECUREAUTH def check_2fa_step1(username): changed = [] for key in (key for key in CONF.sections() if key == 'ldap' or key.startswith('ldap:')): LOG.debug("Changing email in %s for %s" % (key, username)) try: return check_2fa_step2(CONF[key], username) changed.append(key) LOG.debug("%s changed email address on %s" % (username, key)) except Error as e: for key in reversed(changed): LOG.info("Reverting email change in %s for %s" % (key, username)) try: return check_2fa_step2(CONF[key], username) except Error as e2: LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) raise e def check_2fa_step2(conf, *args): try: return check_2fa_step3(conf, *args) except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): raise Error(i18n.msg[26]) except LDAPConstraintViolationResult as e: # Extract useful part of the error message (for Samba 4 / AD). msg = e.message.split('check_password_restrictions: ')[-1].capitalize() raise Error(msg) except LDAPSocketOpenError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) except LDAPExceptionError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) def check_2fa_step3(conf, username): #set current LDAP superUser = SuperUsers(conf) with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: user_dn = find_user_dn(conf, c, username) secure_auth_status = check_secure_auth(user_dn, c) print(secure_auth_status) return(secure_auth_status) #c.modify(user_dn, {'mail': [( MODIFY_REPLACE, new_email_addresses )]}) #newSession().set(get_user_data(user_dn, c)) #CHANGE PASSWORD def change_passwords(username, old_pass, new_pass): changed = [] for key in (key for key in CONF.sections() if key == 'ldap' or key.startswith('ldap:')): LOG.debug("Changing password in %s for %s" % (key, username)) try: change_password(CONF[key], username, old_pass, new_pass) changed.append(key) LOG.debug("%s changed pwd on %s" % (username, key)) except Error as e: for key in reversed(changed): LOG.info("Reverting password change in %s for %s" % (key, username)) try: change_password(CONF[key], username, new_pass, old_pass) except Error as e2: LOG.error('{}: {!s}'.format(e.__class__.__name__, e2)) raise e def change_password(conf, *args): try: if conf.get('type') == 'ad': change_password_ad(conf, *args) else: change_password_ldap(conf, *args) except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError): raise Error(i18n.msg[26]) except LDAPConstraintViolationResult as e: # Extract useful part of the error message (for Samba 4 / AD). msg = e.message.split('check_password_restrictions: ')[-1].capitalize() raise Error(msg) except LDAPSocketOpenError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) except LDAPExceptionError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) def change_password_ldap(conf, username, old_pass, new_pass): #set current LDAP superUser = SuperUsers(conf) with connect_ldap(conf, user=superUser.readonly_dn, password=superUser.readonly_pwd) as c: #with connect_ldap(conf) as c: user_dn = find_user_dn(conf, c, username) # Note: raises LDAPUserNameIsMandatoryError when user_dn is None. with connect_ldap(conf, authentication=SIMPLE, user=user_dn, password=old_pass) as c: c.bind() c.extend.standard.modify_password(user_dn, old_pass, new_pass) def change_password_ad(conf, username, old_pass, new_pass): user = username + '@' + conf['ad_domain'] with connect_ldap(conf, authentication=SIMPLE, user=user, password=old_pass) as c: c.bind() user_dn = find_user_dn(conf, c, username) c.extend.microsoft.modify_password(user_dn, new_pass, old_pass) #DELETE ACCOUNT def del_user(username): n = N for key in (key for key in CONF.sections() if key == 'ldap' or key.startswith('ldap:')): LOG.debug("Deleting account for %s from %s" % (username, key)) n -= 1 try: del_account(CONF[key], username) LOG.debug("Account for %s deleted on -> %s" % (username, CONF[key])) if(n == 0 and newSession().get()['username'] is not None): newSession().delete() except Error as e: raise e def del_account(conf, *args): try: delete(conf, *args) except LDAPSocketOpenError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) except LDAPExceptionError as e: LOG.error('{}: {!s}'.format(e.__class__.__name__, e)) raise Error(i18n.msg[23]) def delete(conf, username): #set current LDAP superUser = SuperUsers(conf) with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: try: user_dn = find_user_dn(conf, c, username) c.delete(user_dn) except Error as e: raise e #AUXILIARY FUNCTIONS #find user def find_user_dn(conf, conn, uid): search_filter = conf['search_filter'].replace('{uid}', uid) conn.search(conf['base'], "(%s)" % search_filter, SUBTREE) return conn.response[0]['dn'] if conn.response else None #find email def find_email(conf, conn, email): search_filter = '(uid=*)' if conn.search(conf['base'], search_filter, attributes=['mail']): for i in conn.response: for j in i['attributes']['mail']: if(j == email): return True return False #find device def find_device(user_dn, conn, device): search_filter = '(objectClass=*)' if conn.search(user_dn, search_filter, attributes=['devices']): for i in conn.response: for j in i['attributes']['devices']: if(j == device): return True return False #find highest uidNumber def find_uid_number(conf, conn): search_filter = '(uid=*)' if conn.search(conf['base'], search_filter, attributes=['uidNumber']): uidNumbersList=[] for i in conn.response: uidNumbersList.append(i['attributes']['uidNumber']) uidNumbersList = list(filter(lambda i: type(i) is int, uidNumbersList)) return max(uidNumbersList) else: return(999) def get_user_email_array(user_dn, conn, old_email, new_email): search_filter = '(objectClass=*)' conn.search(user_dn, search_filter, attributes=['mail']) emails = conn.entries[0].mail.values for i in range(len(emails)): if(emails[i] == old_email): emails[i] = new_email return(emails) def check_secure_auth(user_dn, conn): search_filter = '(objectClass=*)' conn.search(user_dn, search_filter, attributes=['secureAuth']) status = conn.entries[0].secureAuth return(status) def get_user_data(user_dn, conn): search_filter = '(objectClass=*)' conn.search(user_dn, search_filter, attributes=['active','fakeCn','givenName','sn','uid','mail','devices','ip','lastLogin','secureAuth', 'authCode']) data = [] data.append(conn.entries[0].active.values[0]) data.append(conn.entries[0].fakeCn.values[0]) data.append(conn.entries[0].givenName.values[0]) data.append(conn.entries[0].sn.values[0]) data.append(conn.entries[0].uid.values[0]) data.append(conn.entries[0].mail.values[0]) data.append(conn.entries[0].devices.values) data.append(conn.entries[0].ip.values[0]) #ts = conn.entries[0].lastLogin.values[0] #ts = datetime.strptime(ts, '%Y-%m-%d %H:%M:%S%z') #ts = datetime.strftime(t, '%Y-%m-%d %H:%M:%S') data.append(str(conn.entries[0].lastLogin.values[0])[:-6]) data.append(conn.entries[0].secureAuth.values[0]) if(conn.entries[0].authCode): data.append(conn.entries[0].authCode.values[0]) return(data) def read_config(): config = ConfigParser() config.read([path.join(BASE_DIR, 'settings.ini'), getenv('CONF_FILE', '')]) return config CONF = read_config() def ldaps_count(): keys = [] for i, key in enumerate(CONF.sections()): if key == 'ldap' or key.startswith('ldap:'): keys.append(key) #n=len(keys) return(len(keys)) N = ldaps_count() def reg(): if CONF['general']['allow_registration'] == 'True': return True allowed = reg() def get_dev(): ua_string = bottle.request.environ.get('HTTP_USER_AGENT') user_agent = ua_parse(ua_string) return str(user_agent) def is_trusted_device(conf, user_dn): superUser = SuperUsers(conf) with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: d = get_dev() try: if not find_device(user_dn, c, d): OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement'] c.modify(user_dn, {'devices': [( MODIFY_ADD, d )] }) ''' if find_device(user_dn, c, 'unknown'): OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement'] c.modify(user_dn, {'devices': [( MODIFY_REPLACE, d )] }) else: OBJECT_CLASS = ['top', 'inetOrgPerson', 'posixAccount', 'accountsManagement'] c.modify(user_dn, {'devices': [( MODIFY_ADD, d )] }) ''' c.unbind() return True except Exception as e: print(e) return True def update_login_info(conf, user_dn): superUser = SuperUsers(conf) with connect_ldap(conf, user=superUser.admin_dn, password=superUser.admin_pwd) as c: ip = request.environ.get('HTTP_X_REAL_IP', request.remote_addr) ts = datetime.now().strftime('%Y%m%d%H%M%S')+'Z' c.modify(user_dn, {'ip': [( MODIFY_REPLACE, str(ip) )], 'lastLogin': [( MODIFY_REPLACE, ts )] }) c.unbind() class Error(Exception): pass #SESSIONS MANAGEMENT def newSession(): class Session(object): """docstring for Session""" def __init__(self): super(Session, self).__init__() self.data = bottle.request.environ.get('beaker.session') self.id = tools.session_id() self.secure_logged_in = False self.lang = self.get_lang() #localization self.lang = self.get_lang() global i18n i18n = LocalizeTo(self.lang, CONF) def get_lang(self): if 'HTTP_ACCEPT_LANGUAGE' in bottle.request.environ: lang = bottle.request.get('HTTP_ACCEPT_LANGUAGE') return str(lang[:2]) else: return CONF['locale']['lang'] def get(self): if 'username' in self.data: return(self.data) else: raise Error(i18n.msg[27]) def set(self, data): self.active = data[0] self.fakeCn = data[1] self.firstname = data[2] self.surname = data[3] self.username = data[4] self.mail = data[5] self.devices = data[6] self.ip = data[7] self.lastLogin = data[8] self.secureAuth = data[9] try: self.authCode = data[10] except: self.authCode = None self.data['active'] = self.active self.data['fakeCn'] = self.fakeCn self.data['firstname'] = self.firstname self.data['surname'] = self.surname self.data['username'] = self.username self.data['mail'] = self.mail self.data['devices'] = self.devices self.data['ip'] = self.ip self.data['lastLogin'] = self.lastLogin self.data['secureAuth'] = self.secureAuth self.data['authCode'] = self.authCode def close(self): self.data.pop('username') def delete(self): self.data.delete() s=Session() return s if environ.get('DEBUG'): bottle.debug(True) # Set up logging. logging.basicConfig(format=LOG_FORMAT) LOG.setLevel(logging.INFO) LOG.info("Starting ldap-python-webui %s" % VERSION) session_opts = { 'session.type': CONF['session']['type'], 'session.cookie_expires': CONF['session']['expire'], 'session.data_dir': CONF['session']['data_dir'], 'session.auto': CONF['session']['auto'] } class SuperUsers(object): """docstring for Session""" def __init__(self, conf): super(SuperUsers, self).__init__() self.domain=conf['base'][conf['base'].find(","):] self.admin_dn="cn=admin"+self.domain self.admin_pwd=environ['LDAP_ADMIN_PASSWORD'] self.readonly_dn="cn=readonly"+self.domain self.readonly_pwd=environ['LDAP_READONLY_PASSWORD'] superUser = SuperUsers(CONF['ldap:0']) app = beaker.middleware.SessionMiddleware(bottle.app(), session_opts) bottle.TEMPLATE_PATH = [BASE_DIR] # Set default attributes to pass into templates. #SimpleTemplate.defaults = dict(CONF['html']) SimpleTemplate.defaults['url'] = bottle.url # Run bottle internal server when invoked directly (mainly for development). if __name__ == '__main__': bottle.run(app, **CONF['server']) # Run bottle in application mode (in production under uWSGI server). else: #application = bottle.default_app() application = app